Skip to content

feat(session): 持久化 archive finalization 任务#2075

Open
qin-ctx wants to merge 7 commits into
mainfrom
feat/session-archive-finalize-recovery
Open

feat(session): 持久化 archive finalization 任务#2075
qin-ctx wants to merge 7 commits into
mainfrom
feat/session-archive-finalize-recovery

Conversation

@qin-ctx
Copy link
Copy Markdown
Collaborator

@qin-ctx qin-ctx commented May 15, 2026

Description

This PR makes session archive finalization durable and recoverable. commit_async() now persists the archive payload, live tail, session metadata, and a SQLite-backed finalize task inside the session lock; a background worker then claims those tasks, finalizes archives in session order, and writes .done only after summary artifacts are complete.

The latest update reconciles this flow with recent main changes: large externalized tool outputs are hydrated before archive summary and memory extraction, archive finalization is split into SessionArchiveFinalizer, session context now handles rapid consecutive commits by advancing the latest overview only when the earlier archive has a .done marker, the conflict resolution adapts archive finalization to the async TaskTracker API from main, and the new session-skill extraction result fields are surfaced from durable archive finalization.

Related Issue

N/A

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test update

Changes Made

  • Added a SQLite-backed ArchiveFinalizeTaskStore with preparing, pending, running, completed, retry, and terminal_failed states, leases, ordered claiming, retry accounting, and manual recovery support.
  • Changed session commit so phase 1 only writes archive messages, retained live messages, metadata, and finalize task state; summary generation, archive metadata, .done, and task completion now run in the service worker.
  • Added SessionArchiveFinalizer to own archive summary files, .done markers, failure markers, token metadata, and best-effort post-finalize side effects.
  • Added retry support for terminal failed or orphaned archives through POST /api/v1/sessions/{session_id}/archives/{archive_id}/retry.
  • Preserved context correctness for incomplete archives: pending archive messages are merged into current context until the stable .done boundary advances, including rapid multi-commit cases.
  • Updated archive finalization code and tests for main's async task tracker API and session-skill extraction after resolving merge conflicts.
  • Updated English and Chinese docs plus session, API, archive task, and task tracker tests for the persistent finalize flow.

Testing

  • Commit hooks passed: ruff (legacy alias) and ruff format.

  • .venv/bin/python -m py_compile openviking/session/session.py openviking/session/archive_finalizer.py tests/session/test_session_commit.py

  • Earlier validation before the latest main merge: .venv/bin/python -m pytest tests/session/test_archive_finalize_tasks.py tests/session/test_session_commit.py tests/session/test_session_context.py tests/session/test_tool_result_externalization.py tests/server/test_api_sessions.py tests/test_session_task_tracking.py tests/test_task_tracker.py

    • Result: 113 passed, with existing dependency/deprecation warnings.
  • Latest main conflict merge unit tests were not rerun per maintainer request.

  • I have added tests that prove my fix is effective or that my feature works

  • New and existing unit tests pass locally with my changes

  • I have tested this on the following platforms:

    • Linux
    • macOS
    • Windows

Checklist

  • My code follows the project's coding style
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Screenshots (if applicable)

N/A

Additional Notes

  • Resolved conflicts with origin/main in merge commits e58b7f6b and c3d039f5.
  • Latest base merged: origin/main at 982b4cda.
  • Summary provider failures now retry and eventually become terminal_failed; later commits for the same session are blocked until the archive is retried successfully.
  • Memory extraction, session-skill extraction, usage relation linking, and active count updates run after .done as best-effort side effects, so those failures no longer prevent archive completion.
  • Historical orphan archives can be recovered by the retry endpoint when messages.jsonl exists but no finalize task was recorded.

Move session archive finalization into a SQLite-backed task queue so archive recovery survives process restarts and terminal failures can be retried explicitly.
@qin-ctx qin-ctx changed the title feat(session): persist archive finalization tasks wip feat(session): persist archive finalization tasks May 15, 2026
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 80
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add SQLite-backed archive finalize task store

Relevant files:

  • openviking/session/archive_finalize_tasks.py
  • tests/session/test_archive_finalize_tasks.py

Sub-PR theme: Add background worker and retry API for archive finalization

Relevant files:

  • openviking/service/session_service.py
  • openviking/service/core.py
  • openviking/server/routers/sessions.py

Sub-PR theme: Move archive finalization to task store and update behavior

Relevant files:

  • openviking/session/session.py
  • tests/session/test_session_context.py
  • tests/test_session_task_tracking.py

⚡ Recommended focus areas for review

Performance

The _claim_next method fetches all tasks in (STATE_PREPARING, STATE_PENDING, STATE_RETRY, STATE_RUNNING) first, then filters in Python. For large task queues, this is inefficient. Consider pushing more filtering (e.g., lease_until <= now, next_run_at <= now, preparing stale check) into SQL.

def _claim_next(self, owner: str) -> Optional[ArchiveFinalizeTask]:
    now = time.time()
    lease_until = now + ARCHIVE_FINALIZE_LEASE_SECONDS
    with self._connect() as conn:
        conn.execute("BEGIN IMMEDIATE")
        rows = conn.execute(
            """
            SELECT * FROM session_archive_finalize_tasks
            WHERE state IN (?, ?, ?, ?)
            ORDER BY created_at ASC
            """,
            (STATE_PREPARING, STATE_PENDING, STATE_RETRY, STATE_RUNNING),
        ).fetchall()
        for row in rows:
            task = self._task_from_row(row)
            if (
                task.state == STATE_PREPARING
                and task.created_at + PREPARING_STALE_SECONDS > now
            ):
                continue
            if task.state == STATE_RETRY and task.next_run_at > now:
                continue
            if task.state == STATE_RUNNING and task.lease_until > now:
                continue
            if self._has_active_session_task(conn, task, now):
                continue
            if self._has_incomplete_prior_task(conn, task):
                continue
            conn.execute(
                """
                UPDATE session_archive_finalize_tasks
                SET state=?, lease_owner=?, lease_until=?, updated_at=?
                WHERE account_id=? AND user_id=? AND session_id=? AND archive_id=?
                """,
                (
                    STATE_RUNNING,
                    owner,
                    lease_until,
                    now,
                    task.account_id,
                    task.user_id,
                    task.session_id,
                    task.archive_id,
                ),
            )
            conn.commit()
            claimed = conn.execute(
                """
                SELECT * FROM session_archive_finalize_tasks
                WHERE account_id=? AND user_id=? AND session_id=? AND archive_id=?
                """,
                (task.account_id, task.user_id, task.session_id, task.archive_id),
            ).fetchone()
            return replace(self._task_from_row(claimed), claimed_from_state=task.state)
        conn.commit()
    return None
Bug

In _read_archive_messages_strict, self._viking_fs is used without checking if it's None. While session_service checks VikingFS initialization, adding a local guard or using an existing check would be safer.

async def _read_archive_messages_strict(self, archive_uri: str) -> List[Message]:
    """Read archived messages and fail on malformed JSONL."""
    content = await self._viking_fs.read_file(f"{archive_uri}/messages.jsonl", ctx=self.ctx)
    messages: List[Message] = []
    for lineno, line in enumerate(content.splitlines(), start=1):
        if not line.strip():
            continue
        try:
            messages.append(Message.from_dict(json.loads(line)))
        except Exception as exc:
            raise ValueError(
                f"Invalid messages.jsonl at {archive_uri} line {lineno}: {exc}"
            ) from exc
    return messages

@qin-ctx qin-ctx changed the title wip feat(session): persist archive finalization tasks feat(session): 持久化 archive finalization 任务 May 15, 2026
@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Offload blocking SQLite claim_next to thread pool

The claim_next method is a blocking SQLite call running in an async worker loop,
which can block the event loop. Wrap it with asyncio.to_thread to offload the
blocking operation to a thread pool.

openviking/service/session_service.py [94]

-task = store.claim_next(owner)
+task = await asyncio.to_thread(store.claim_next, owner)
Suggestion importance[1-10]: 5

__

Why: Correctly identifies a blocking SQLite call in an async worker loop; offloading to a thread pool improves event loop responsiveness, a moderate performance improvement.

Low
Offload blocking SQLite task store calls to thread pool

The create_preparing, mark_pending, and delete methods are blocking SQLite calls in
an async function. Wrap them with asyncio.to_thread to avoid blocking the event
loop.

openviking/session/session.py [683-718]

-task_store.create_preparing(...)
-task_store.mark_pending(...)
-task_store.delete(...)
+await asyncio.to_thread(task_store.create_preparing, ...)
+await asyncio.to_thread(task_store.mark_pending, self.ctx, self.session_id, archive_id)
+await asyncio.to_thread(task_store.delete, self.ctx, self.session_id, archive_id)
Suggestion importance[1-10]: 5

__

Why: Correctly identifies blocking SQLite calls in an async commit path; wrapping with asyncio.to_thread avoids blocking the event loop during session commits, a moderate improvement.

Low

qin-ctx added 6 commits May 20, 2026 17:21
# Conflicts:
#	openviking/service/session_service.py
#	openviking/session/session.py
…finalize-recovery

# Conflicts:
#	openviking/session/session.py
#	tests/session/test_session_commit.py
#	tests/session/test_session_context.py
#	tests/test_session_task_tracking.py
#	tests/test_task_tracker.py
…finalize-recovery

# Conflicts:
#	openviking/session/session.py
#	tests/session/test_session_commit.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

1 participant