Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions docs/design/session-archive-finalize-recovery-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Session Archive Finalize Recovery

## 结论

问题的核心不是 `.failed.json` 文件本身,而是 archive finalize 这一步过去只靠进程内后台任务承载。`commit_async()` 写出 `history/archive_NNN/messages.jsonl` 后,如果进程重启、后台任务丢失、summary 生成持续失败,archive 就可能长期没有 `.done`,后续 archive 再按顺序等待它,最终整个 session 的归档体验被卡住。

这次实现采用一个简版设计:常规路径信任 SQLite task log,不在启动或 commit 时全量扫描 `history`。commit 只负责把 archive 原文和 finalize task 持久化;后台 worker 从 SQLite 领取 task,生成 `.overview.md`、`.abstract.md`、`.meta.json`,最后写 `.done`。如果 finalize 连续 3 次失败,task 进入 `terminal_failed`,后续 commit 明确报错并给出 retry endpoint。用户或运维修复外部问题后,可以手动 retry 这个 archive;retry 会把 task 重置为 pending,再由 worker 重新 finalize。

`.failed.json` 只保留为诊断文件,不再作为常规 commit 的判定来源。也就是说,系统不因为看到历史目录里的 `.failed.json` 就扫描并永久阻塞,而是看 SQLite 里这个 archive 的 task 状态。memory extraction 也不再决定 `.done`;archive summary 成功后就写 `.done`,memory 相关副作用失败只记录日志,不阻塞后续归档。

## 目标流程

```mermaid
flowchart TD
A["commit_async()"] --> B{"live messages 为空?"}
B -- 是 --> B1["更新 meta<br/>返回 archived=false"]
B -- 否 --> C{"SQLite 中是否有<br/>terminal_failed archive?"}
C -- 是 --> C1["拒绝本次 commit<br/>返回 archive_id / last_error / retry_endpoint"]
C -- 否 --> D["获取 session path lock"]
D --> E{"messages <= keep_recent_count?"}
E -- 是 --> E1["只更新 meta<br/>返回 all_within_keep_window"]
E -- 否 --> F["分配 archive_NNN<br/>创建 archive_finalize task: preparing"]
F --> G["写 history/archive_NNN/messages.jsonl"]
G --> H["写回 live messages 尾部<br/>更新 session meta"]
H --> I["task 改为 pending<br/>释放 lock"]
I --> J["返回 accepted<br/>包含 task_id / archive_uri"]

J -. worker .-> K["SQLite 原子领取最早可运行 task"]
K --> L{"同 session 是否已有<br/>有效 running task?"}
L -- 是 --> L1["本轮不处理"]
L -- 否 --> M["设置 lease<br/>state=running"]
M --> N["严格读取 messages.jsonl"]
N --> O{"JSONL 是否可解析?"}
O -- 否 --> O1["记录失败<br/>最多重试 3 次"]
O -- 是 --> P["生成 archive summary"]
P --> Q{"summary 是否成功?"}
Q -- 否 --> O1
Q -- 是 --> R["写 .abstract.md / .overview.md / .meta.json"]
R --> S["写 .done"]
S --> T["task=completed"]
T --> U["best-effort memory side effects"]
```

## 真实 Case

服务重启导致任务丢失时,目录可能是这样:

```text
history/
archive_001/
messages.jsonl
.overview.md
.abstract.md
.meta.json
.done
archive_002/
messages.jsonl
```

旧行为的问题是 `archive_002` 没有 `.done`,但进程内后台任务已经没了。新的行为里,`archive_002` 创建时已经有 SQLite task。服务恢复后,worker 看到这个 task 仍是 `pending`,或者之前是 `running` 但 lease 已过期,就重新领取并补齐 `.overview.md`、`.abstract.md`、`.meta.json`、`.done`。这里不需要启动时扫所有 session 的 `history`,也不需要从目录结构反推任务。

如果外部模型持续失败,例如 summary 连续报:

```text
archive_summary_failed: provider timeout
```

worker 会把同一个 task 重试到第 3 次。第 3 次仍失败后,SQLite 状态变成 `terminal_failed`,并写诊断:

```text
archive_002/
messages.jsonl
.failed.json
```

后续再 commit 这个 session 时,系统不会静默跳过,也不会写一个假的 summary,而是返回明确错误:`archive_002` finalize 已失败 3 次,需要先调用 `/api/v1/sessions/{session_id}/archives/archive_002/retry`。手动 retry 会重置 attempts 和错误信息,创建新的 task tracker id,然后 worker 重新从 `messages.jsonl` finalize。成功后目录变成:

```text
archive_002/
messages.jsonl
.overview.md
.abstract.md
.meta.json
.done
.failed.json # 仅历史诊断,不参与常规判定
```

如果 `messages.jsonl` 本身损坏,例如 JSONL 半截中断,worker 严格解析时也会失败。它同样按 3 次失败进入 `terminal_failed`,后续 commit 被明确阻塞,直到人工修复这个文件后手动 retry。当前版本不做自动 quarantine,也不提供“跳过坏 archive 继续”的默认路径,因为这会制造上下文缺口,而且复杂度高于当前需要。

## 实现要点

SQLite 表 `session_archive_finalize_tasks` 存 archive finalize 状态,主键是 `(account_id, user_id, session_id, archive_id)`。关键字段包括 `state`、`attempt_count`、`task_tracker_id`、`archive_uri`、`lease_owner`、`lease_until`、`last_error` 和 `usage_records_json`。

状态流转保持简单:

```text
preparing -> pending -> running -> completed
|
+-> retry -> running
|
+-> terminal_failed
```

`preparing` 用来覆盖 commit 写 archive 过程中进程退出的情况。正常 commit 会在写完 `messages.jsonl`、live tail 和 meta 后把 task 改成 `pending`。worker 领取 task 时用 SQLite 原子更新设置 lease,同一个 session 同一时间只允许一个有效 `running` finalize。后续 archive 可以先被 commit 写入并入队,但 finalize 顺序由 worker 保证。

手动 retry 只对 `terminal_failed` task 生效。如果 task 已经 `completed`,接口返回 `already_completed`;如果 task 仍在有效 `running` lease 内,接口返回当前 running 状态;如果 task 不存在但 archive 的 `messages.jsonl` 存在,接口可以补一条 pending task,用于修复历史版本遗留的孤儿 archive。

## 非目标

当前版本不做启动全量扫描 `history`,不做自动 quarantine,不做静默 skip,不写 fallback summary。memory extraction 也不拆成独立持久化 task;它只是从 `.done` 语义里解耦,作为 archive 完成后的 best-effort 副作用执行。
94 changes: 15 additions & 79 deletions docs/en/api/05-sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -789,16 +789,17 @@ await client.session_used(

#### 1. API Implementation Introduction

Commit a session. Message archiving (Phase 1) completes immediately. Summary generation and memory extraction (Phase 2) run asynchronously in the background. Returns a `task_id` for polling progress.
Commit a session. Archive payload writing completes immediately. Archive finalization runs asynchronously and returns a `task_id` for polling progress. Memory extraction, relation linking, and active-count updates run best-effort after the archive writes `.done`.

**Two-Phase Commit Flow:**
- **Phase 1 (Synchronous)**: Snapshot current messages, clear live session, create archive directory, write original messages
- **Phase 2 (Asynchronous)**: Generate summaries (L0/L1), extract long-term memories, update relations and active_count
**Commit Flow:**
- **Inline path**: Snapshot current messages, write the archive payload and retained live tail, and persist the finalize task
- **Finalize task**: Generate summaries (L0/L1), write archive metadata, and write `.done`
- **Best-effort side effects**: Extract long-term memories, update relations, and update active_count after `.done`

**Notes:**
- Rapid consecutive commits on the same session are accepted; each request gets its own `task_id`.
- Background Phase 2 work is serialized by archive order: archive `N+1` waits until archive `N` writes `.done`.
- If an earlier archive failed and left no `.done`, later commit requests fail with `FAILED_PRECONDITION` until that failure is resolved.
- Finalize work is serialized by archive order: archive `N+1` waits until archive `N` writes `.done`.
- If an earlier archive finalization enters terminal failure, later commit requests fail with `FAILED_PRECONDITION` until that archive is retried.

**Code Entries:**
- `openviking/session/session.py:Session.commit_async()` - Core implementation
Expand Down Expand Up @@ -840,17 +841,15 @@ import openviking as ov

client = ov.Client(base_url="http://localhost:1933", api_key="your-key")

# Commit returns immediately with task_id; summary + memory extraction runs in background
# Commit returns immediately with task_id; archive finalization runs in background
result = await client.commit_session("a1b2c3d4")
print(f"Status: {result['status']}")
print(f"Task ID: {result['task_id']}")

# Poll background task progress
# Poll archive finalization progress
task = await client.get_task(result["task_id"])
if task["status"] == "completed":
memories = task["result"]["memories_extracted"]
total = sum(memories.values())
print(f"Memories extracted: {total}")
print(f"Archive finalized: {task['result']['archive_uri']}")
```

**CLI**
Expand Down Expand Up @@ -917,7 +916,9 @@ The endpoint returns the extracted memory write results as a JSON list. The exac

#### 1. API Implementation Introduction

Query background task status (e.g., commit summary generation and memory extraction progress).
Query background task status. For session commits, the task tracks archive finalization:
generating the archive summary files and writing `.done`. Memory extraction and
usage side effects run best-effort after `.done` and are not part of task success.

**Task Statuses:**
- `pending`: Task waiting to execute
Expand Down Expand Up @@ -985,13 +986,6 @@ print(f"Status: {task['status']}")
"result": {
"session_id": "a1b2c3d4",
"archive_uri": "viking://session/alice/a1b2c3d4/history/archive_001",
"memories_extracted": {
"profile": 1,
"preferences": 2,
"entities": 1,
"cases": 1
},
"active_count_updated": 2,
"token_usage": {
"llm": {
"prompt_tokens": 5200,
Expand All @@ -1010,8 +1004,6 @@ print(f"Status: {task['status']}")
}
```

`memories_extracted` in the completed task result reports per-category counts for this commit only. Sum its values when you want the total for this commit.

---

### list_tasks()
Expand Down Expand Up @@ -1099,65 +1091,11 @@ viking://session/{user_id}/{session_id}/
| +-- .abstract.md # Written in Phase 2 (background)
| +-- .overview.md # Written in Phase 2 (background)
| +-- .meta.json # Archive metadata
| +-- memory_diff.json # Written in Phase 2 (background, on memory changes)
| +-- .done # Phase 2 completion marker
| +-- .failed.json # Phase 2 failure marker
+-- archive_002/
```

### memory_diff.json Structure

Each commit writes a `memory_diff.json` to the archive directory, recording all memory changes for auditing and rollback:

```json
{
"archive_uri": "viking://session/{session_id}/history/archive_001",
"extracted_at": "2026-04-21T10:00:00Z",
"operations": {
"adds": [
{
"uri": "memory/user/xxx/identity.md",
"memory_type": "identity",
"after": "Newly created file content"
}
],
"updates": [
{
"uri": "memory/user/xxx/context/project.md",
"memory_type": "context",
"before": "Content before modification",
"after": "Content after modification"
}
],
"deletes": [
{
"uri": "memory/user/xxx/context/old.md",
"memory_type": "context",
"deleted_content": "Deleted file content"
}
]
},
"summary": {
"total_adds": 1,
"total_updates": 1,
"total_deletes": 1
}
}
```

| Field | Type | Description |
|-------|------|-------------|
| `archive_uri` | str | Archive directory URI for this commit |
| `extracted_at` | str | ISO 8601 timestamp of extraction |
| `operations.adds` | array | New memories created (`uri`, `memory_type`, `after`) |
| `operations.updates` | array | Modified memories (`uri`, `memory_type`, `before`, `after`) |
| `operations.deletes` | array | Deleted memories (`uri`, `memory_type`, `deleted_content`) |
| `summary.total_adds` | int | Number of new memories |
| `summary.total_updates` | int | Number of modified memories |
| `summary.total_deletes` | int | Number of deleted memories |

An empty `memory_diff.json` (all counts zero) is written even when no memory operations occurred.

---

## Memory Categories
Expand Down Expand Up @@ -1222,16 +1160,14 @@ if results.resources:
contexts=[results.resources[0].uri]
)

# Commit session (returns immediately; summary + memory extraction runs in background)
# Commit session (returns immediately; archive finalization runs in background)
commit_result = await client.commit_session(session_id)
print(f"Task ID: {commit_result['task_id']}")

# Optional: poll for completion
task = await client.get_task(commit_result["task_id"])
if task and task["status"] == "completed":
memories = task["result"]["memories_extracted"]
total = sum(memories.values())
print(f"Memories extracted: {total}")
print(f"Archive finalized: {task['result']['archive_uri']}")
```

**HTTP API**
Expand Down
4 changes: 2 additions & 2 deletions docs/en/concepts/02-context-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Memories are divided into user memories and Agent memories, representing learned
# Memories are auto-extracted from sessions
session = client.session()
await session.add_message("user", [{"type": "text", "text": "I prefer dark mode"}])
commit = await session.commit() # Starts background memory extraction
task = await client.get_task(commit["task_id"]) # Poll until task["status"] == "completed"
commit = await session.commit() # Starts archive finalization
task = await client.get_task(commit["task_id"]) # Poll until archive finalize completes

# Search memories
results = await client.find(
Expand Down
43 changes: 23 additions & 20 deletions docs/en/concepts/08-session.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ session.commit()
|--------|-------------|
| `add_message(role, parts)` | Add message |
| `used(contexts, skill)` | Record used contexts/skills |
| `commit()` | Commit: archive (sync) + summary generation and memory extraction (async background) |
| `commit()` | Commit: archive payload write (inline) + archive finalization (async background) |
| `get_task(task_id)` | Query background task status |

### add_message
Expand Down Expand Up @@ -66,10 +66,10 @@ result = session.commit()
# "archived": True
# }

# Poll background task progress
# Poll archive finalization progress
task = client.get_task(result["task_id"])
# task["status"]: "pending" | "running" | "completed" | "failed"
# sum(task["result"]["memories_extracted"].values()): 3
# task["result"]["archive_uri"]: "viking://session/.../history/archive_001"
```

## Message Structure
Expand Down Expand Up @@ -97,20 +97,22 @@ class Message:

### Archive Flow

commit() executes in two phases:
commit() separates the durable archive path from best-effort side effects:

**Phase 1 (synchronous, returns immediately)**:
**Inline path (returns immediately)**:
1. Increment compression_index
2. Write messages to archive directory (`messages.jsonl`)
3. Clear current messages list
4. Return `task_id`
3. Persist the retained live messages and session metadata
4. Persist the archive finalize task and return `task_id`

**Phase 2 (asynchronous background)**:
5. Generate structured summary (LLM) → write `.abstract.md` and `.overview.md`
6. Extract long-term memories
7. Write `memory_diff.json` (memory change audit log) to archive directory
8. Update active_count
9. Write `.done` completion marker
**Archive finalize task (asynchronous background)**:
5. Generate structured summary (LLM) → write `.abstract.md`, `.overview.md`, and `.meta.json`
6. Write `.done` completion marker

**Best-effort side effects (after `.done`)**:
7. Extract long-term memories
8. Write `memory_diff.json` when memory operations are applied
9. Link usage records and update active_count

### Summary Format

Expand Down Expand Up @@ -171,7 +173,7 @@ Write to AGFS → Vectorize

## Memory Diff

Each `session.commit()` writes a `memory_diff.json` to the archive directory, recording all memory changes from that commit for auditing and rollback.
When commit memory extraction applies memory operations, it writes a `memory_diff.json` to the archive directory, recording those changes for auditing and rollback. This is a best-effort side effect after archive finalization; it is not part of the commit task result.

```json
{
Expand Down Expand Up @@ -218,7 +220,7 @@ Each `session.commit()` writes a `memory_diff.json` to the archive directory, re
| `operations.deletes` | Deleted memories (with `deleted_content`) |
| `summary` | Counts per operation type |

An empty `memory_diff.json` (all counts zero) is written even when no memory operations occurred.
An empty `memory_diff.json` may be written when extraction runs with no resulting operations, but commits without memory extraction do not create this file.

## Storage Structure

Expand All @@ -229,11 +231,12 @@ viking://session/{session_id}/
├── .overview.md # Current overview
├── history/
│ ├── archive_001/
│ │ ├── messages.jsonl # Written in Phase 1
│ │ ├── .abstract.md # Written in Phase 2 (background)
│ │ ├── .overview.md # Written in Phase 2 (background)
│ │ ├── memory_diff.json # Written in Phase 2 (background, memory change audit)
│ │ └── .done # Phase 2 completion marker
│ │ ├── messages.jsonl # Written by the inline commit path
│ │ ├── .abstract.md # Written by the finalize task
│ │ ├── .overview.md # Written by the finalize task
│ │ ├── .meta.json # Written by the finalize task
│ │ ├── memory_diff.json # Optional best-effort memory audit
│ │ └── .done # Finalize completion marker
│ └── archive_NNN/
└── tools/
└── {tool_id}/tool.json
Expand Down
2 changes: 1 addition & 1 deletion docs/en/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ This strategy finds semantically matching fragments while understanding the comp

1. **Ensure `commit()` was called**
```python
await session.commit() # Triggers memory extraction
await session.commit() # Finalizes archive, then best-effort memory extraction
```

2. **Check VLM configuration**
Expand Down
Loading
Loading