Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bfde107
feat(memory): add exact-lock stale patch rewrite
huangruiteng May 30, 2026
89518bb
fix(memory): fail fast on exact-lock read and link errors
huangruiteng May 30, 2026
7866b7f
style(memory): format exact-lock changes
huangruiteng May 31, 2026
6c61bcf
feat(memory): add graph health validation hooks
huangruiteng May 31, 2026
8153b78
fix(memory): clean graph links on file delete
huangruiteng May 31, 2026
533cefc
feat(memory): gate exact apply by merge semantics
huangruiteng May 31, 2026
fa4f5e4
style(memory): format exact-lock test
huangruiteng May 31, 2026
8b3c04c
test(server): isolate embedded service config
huangruiteng May 31, 2026
aac25bd
feat(memory): synthesize stale string replacements
huangruiteng May 31, 2026
4c82f4a
fix(memory): require read base for exact replacement
huangruiteng May 31, 2026
7807e18
fix(memory): harden exact agent apply boundary
huangruiteng Jun 1, 2026
7ce2c1f
test(tau2): record corpus provenance in memory eval
huangruiteng Jun 1, 2026
26ca37b
test(memory): cover session skill extraction switch
huangruiteng Jun 1, 2026
d37c0df
feat(memory): write agent phase memory diffs
huangruiteng Jun 1, 2026
7c57fee
Merge remote-tracking branch 'origin/main' into feat/memory-file-lock…
huangruiteng Jun 1, 2026
fb5e856
style(session): sort imports after main merge
huangruiteng Jun 1, 2026
c66b5eb
test(memory): cover stale rewrite apply traces
huangruiteng Jun 1, 2026
eb35705
test(memory): keep session skill fake phase signature current
huangruiteng Jun 1, 2026
f1936b7
fix(memory): preserve subsecond trajectory timestamp keys
huangruiteng Jun 1, 2026
fe114b2
fix(memory): dedupe duplicate deletes in apply batch
huangruiteng Jun 1, 2026
afea3b5
fix(memory): skip stale exact upsert races
huangruiteng Jun 1, 2026
9ef7e46
fix(memory): treat missing exact deletes as stale
huangruiteng Jun 1, 2026
9c22aeb
style(memory): format exact stale delete telemetry
huangruiteng Jun 1, 2026
42da595
refactor(memory): share exact-lock text digest helper
huangruiteng Jun 1, 2026
217fba1
fix(memory): record stale rewrite telemetry
huangruiteng Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions benchmark/tau2/llm/scripts/analyze_memory_v2_corpus_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path
from typing import Any

REPO_ROOT = Path(__file__).resolve().parents[4]
sys.path.insert(0, str(REPO_ROOT))


def _read_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))


def _memory_root_uri(search_uri: str) -> str:
prefix, separator, path = search_uri.partition("://")
if not separator:
raise ValueError(f"not a Viking URI: {search_uri}")
segments = path.split("/")
try:
memory_index = segments.index("memories")
except ValueError as exc:
raise ValueError(
f"search_uri does not contain a memories path segment: {search_uri}"
) from exc
return f"{prefix}{separator}{'/'.join(segments[: memory_index + 1])}"


def _run_plan_manifests(run_plan: Path) -> list[Path]:
data = _read_json(run_plan)
manifests: list[Path] = []
for cell in data.get("cells") or []:
artifacts = cell.get("artifacts") or {}
raw = artifacts.get("corpus_manifest")
if raw:
manifests.append(Path(raw))
continue
corpus_dir = cell.get("corpus_dir")
if corpus_dir:
manifests.append(Path(corpus_dir) / "corpus_manifest.json")
return manifests


def _existing_manifests(paths: list[Path], *, allow_missing: bool) -> list[Path]:
existing = [path for path in dict.fromkeys(paths) if path.is_file()]
missing = [path for path in dict.fromkeys(paths) if not path.is_file()]
if missing and not allow_missing:
rendered = "\n".join(f"- {path}" for path in missing[:20])
extra = "" if len(missing) <= 20 else f"\n... and {len(missing) - 20} more"
raise FileNotFoundError(
"missing corpus_manifest.json files; pass --allow-missing-manifests "
f"only for incomplete diagnostic reads:\n{rendered}{extra}"
)
return existing


def _summarize_health(manifest_path: Path, health: dict[str, Any]) -> dict[str, Any]:
manifest = _read_json(manifest_path)
counts = health.get("memory_type_counts") or {}
quality = health.get("experience_quality") or {}
source_links = quality.get("source_links_per_experience") or {}
duplicate_examples = quality.get("duplicate_exact_source_set_examples") or []
samples = health.get("samples") or health.get("violation_samples") or []
issue_keys = (
"parse_error_count",
"malformed_link_count",
"owner_mismatch_count",
"duplicate_link_count",
"broken_endpoint_count",
"missing_backlink_count",
"missing_forward_link_count",
)
issue_total = sum(int(health.get(key) or 0) for key in issue_keys)
issue_breakdown = {key: int(health.get(key) or 0) for key in issue_keys}
return {
"manifest": str(manifest_path),
"domain": manifest.get("domain"),
"committed_sessions": manifest.get("committed_session_count"),
"skipped_failed_sessions": manifest.get("skipped_failed_session_count"),
"commit_concurrency": manifest.get("corpus_session_commit_worker_count"),
"root_uri": health.get("root_uri"),
"healthy": health.get("healthy", issue_total == 0),
"issue_total": issue_total,
"issue_breakdown": issue_breakdown,
"memory_files": health.get("memory_file_count"),
"experiences": counts.get("experiences", 0),
"trajectories": counts.get("trajectories", 0),
"tools": counts.get("tools", 0),
"skills": counts.get("skills", 0),
"exp_per_session": _ratio(
counts.get("experiences", 0), manifest.get("committed_session_count")
),
"traj_per_session": _ratio(
counts.get("trajectories", 0), manifest.get("committed_session_count")
),
"source_links_avg": source_links.get("avg"),
"source_links_p50": source_links.get("p50"),
"source_links_p90": source_links.get("p90"),
"source_linkless": source_links.get("linkless"),
"single_source_rate": source_links.get("single_source_rate"),
"pair_scan_skipped": quality.get("pair_scan_skipped"),
"duplicate_exact_source_set_count": quality.get("duplicate_exact_source_set_count"),
"name_similar_pair_count": quality.get("name_similar_pair_count"),
"content_similar_pair_count": quality.get("content_similar_pair_count"),
"source_overlap_pair_count": quality.get("source_overlap_pair_count"),
"duplicate_exact_source_set_examples": duplicate_examples[:3],
"samples": samples[:3],
}


def _ratio(numerator: Any, denominator: Any) -> float | None:
try:
den = float(denominator)
if den == 0:
return None
return round(float(numerator or 0) / den, 4)
except (TypeError, ValueError):
return None


def _collect(
manifest_paths: list[Path], *, node_limit: int, sample_limit: int
) -> list[dict[str, Any]]:
from openviking_cli.client.sync_http import SyncHTTPClient

rows: list[dict[str, Any]] = []
for manifest_path in manifest_paths:
manifest = _read_json(manifest_path)
openviking = manifest.get("openviking") or {}
search_uri = str(openviking.get("search_uri") or "")
if not search_uri:
raise ValueError(f"manifest missing openviking.search_uri: {manifest_path}")
client = SyncHTTPClient(
url=openviking.get("url"),
account=openviking.get("account"),
user=openviking.get("user"),
agent_id=openviking.get("agent_id"),
timeout=120.0,
)
client.initialize()
try:
health = client.memory_graph_health(
_memory_root_uri(search_uri),
node_limit=node_limit,
sample_limit=sample_limit,
)
finally:
client.close()
rows.append(_summarize_health(manifest_path, health))
return rows


def _print_markdown(rows: list[dict[str, Any]]) -> None:
headers = [
"domain",
"commit_concurrency",
"committed_sessions",
"experiences",
"trajectories",
"exp_per_session",
"source_links_avg",
"source_linkless",
"single_source_rate",
"issue_total",
"duplicate_exact_source_set_count",
"name_similar_pair_count",
"content_similar_pair_count",
"source_overlap_pair_count",
"pair_scan_skipped",
"healthy",
]
print("| " + " | ".join(headers) + " |")
print("| " + " | ".join(["---"] * len(headers)) + " |")
for row in rows:
print("| " + " | ".join(str(row.get(header, "")) for header in headers) + " |")

for row in rows:
breakdown = row.get("issue_breakdown") or {}
nonzero = {key: value for key, value in breakdown.items() if value}
if nonzero:
print(
f"\n### Graph issue breakdown: {row.get('domain')} c{row.get('commit_concurrency')}"
)
for key, value in nonzero.items():
print(f"- {key}: {value}")

for row in rows:
examples = row.get("duplicate_exact_source_set_examples") or []
if not examples:
continue
print(
f"\n### Duplicate source-set examples: {row.get('domain')} c{row.get('commit_concurrency')}"
)
for example in examples:
uris = example.get("uris") or []
print(
f"- source_count={example.get('source_count')}: "
+ ", ".join(_basename(uri) for uri in uris)
)


def _basename(uri: str) -> str:
return uri.rsplit("/", 1)[-1]


def main() -> int:
parser = argparse.ArgumentParser(
description="Read completed TAU-2 Memory V2 corpus manifests and summarize graph health."
)
parser.add_argument("--manifest", action="append", type=Path, default=[])
parser.add_argument("--run-plan", action="append", type=Path, default=[])
parser.add_argument("--node-limit", type=int, default=200000)
parser.add_argument("--sample-limit", type=int, default=20)
parser.add_argument(
"--allow-missing-manifests",
action="store_true",
help="Allow incomplete diagnostic reads when --run-plan expands unfinished cells.",
)
parser.add_argument("--format", choices=("markdown", "json"), default="markdown")
args = parser.parse_args()

manifests = list(args.manifest)
for run_plan in args.run_plan:
manifests.extend(_run_plan_manifests(run_plan))
manifests = _existing_manifests(manifests, allow_missing=args.allow_missing_manifests)
if not manifests:
parser.error("no completed corpus_manifest.json files found")

rows = _collect(manifests, node_limit=args.node_limit, sample_limit=args.sample_limit)
if args.format == "json":
print(json.dumps(rows, ensure_ascii=False, indent=2, sort_keys=True))
else:
_print_markdown(rows)
return 0


if __name__ == "__main__":
raise SystemExit(main())
41 changes: 41 additions & 0 deletions benchmark/tau2/llm/scripts/run_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ def _enabled(value: Any) -> bool:
return str(value).strip().lower() in {"1", "true", "yes", "on"}


def _positive_int(value: Any, *, name: str) -> int:
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{name} must be a positive integer, got {value!r}") from exc
if parsed < 1:
raise ValueError(f"{name} must be a positive integer, got {parsed!r}")
return parsed


def _corpus_session_commit_concurrency(config: dict[str, Any]) -> int:
openviking = config.get("openviking", {})
benchmark = config.get("benchmark", {})
value = openviking.get("corpus_session_commit_concurrency")
if value is None:
value = benchmark.get("corpus_session_commit_concurrency", 1)
return _positive_int(value, name="corpus_session_commit_concurrency")


def _require_fixed_first_user(config: dict[str, Any]) -> bool:
return _enabled(config.get("eval", {}).get("require_fixed_first_user"))

Expand Down Expand Up @@ -278,6 +297,7 @@ def _tau2_command(
and strategy.get("train_memory_mode") == "experience_only"
):
openviking = config["openviking"]
corpus_session_commit_concurrency = _corpus_session_commit_concurrency(config)
corpus_id = str(strategy.get("corpus_id") or strategy["id"])
resolved_train_num_tasks = (
train_num_tasks if train_num_tasks is not None else strategy.get("train_num_tasks")
Expand Down Expand Up @@ -351,6 +371,12 @@ def _tau2_command(
user,
"--openviking-agent-id",
agent_id,
"--openviking-timeout",
str(openviking.get("timeout_seconds", 600.0)),
"--openviking-wait-timeout",
str(openviking.get("wait_timeout_seconds", 600)),
"--corpus-session-commit-concurrency",
str(corpus_session_commit_concurrency),
"--search-uri",
search_uri,
"--retrieval-top-k",
Expand Down Expand Up @@ -606,6 +632,9 @@ def _build_plan(
"train_skip_failed_sessions": _train_skip_failed_sessions(strategy),
"train_tool_output_max_chars": _train_tool_output_max_chars(strategy),
"retrieval_budget": _retrieval_budget(config, strategy),
"corpus_session_commit_concurrency": _corpus_session_commit_concurrency(
config
),
"search_memory_type": strategy.get("search_memory_type", "experiences"),
"adapter_status": strategy.get("adapter_status", "ready"),
"executable": command is not None,
Expand Down Expand Up @@ -730,13 +759,25 @@ def _prepare_memory_corpus(cell: dict[str, Any], repo: Path, out: Path) -> dict[
f"{key}: {cached_skip_failed!r} != {requested_skip_failed!r}; "
"use a distinct corpus_id or rebuild the corpus"
)
cached_commit_concurrency = int(manifest.get("corpus_session_commit_concurrency") or 1)
requested_commit_concurrency = int(cell.get("corpus_session_commit_concurrency") or 1)
if cached_commit_concurrency != requested_commit_concurrency:
raise RuntimeError(
"cached corpus_session_commit_concurrency mismatch for "
f"{key}: {cached_commit_concurrency!r} != {requested_commit_concurrency!r}; "
"use a distinct corpus_id or rebuild the corpus"
)
row = {
"domain": cell["domain"],
"strategy_id": cell["strategy_id"],
"corpus_id": str(cell.get("corpus_id") or cell["strategy_id"]),
"corpus_key": key,
"returncode": 0,
"reused": True,
"corpus_session_commit_concurrency": cached_commit_concurrency,
"corpus_session_commit_worker_count": manifest.get(
"corpus_session_commit_worker_count"
),
"artifacts": {"corpus_manifest": str(manifest_path)},
}
write_json(out / "corpus_prepare_results" / f"{key}.json", row)
Expand Down
Loading
Loading