Skip to content

Add optimistic concurrency control and dedup score guardrail for memory writes#2127

Open
Jay-ju wants to merge 4 commits into
volcengine:mainfrom
Jay-ju:fix/memory-occ-write-protection
Open

Add optimistic concurrency control and dedup score guardrail for memory writes#2127
Jay-ju wants to merge 4 commits into
volcengine:mainfrom
Jay-ju:fix/memory-occ-write-protection

Conversation

@Jay-ju
Copy link
Copy Markdown
Contributor

@Jay-ju Jay-ju commented May 19, 2026

Summary

Phase 2 of session commit (memory extraction) runs as a background asyncio.create_task without
distributed locks. When the same user has multiple sessions committing concurrently, this creates
read-modify-write races that can cause:

  1. Profile merge lost-updates — Session A merges into profile.md, then Session B overwrites
    with a stale base, losing A's changes.
  2. Tool/Skill statistics regression — Both sessions read total_calls=10, A writes 15, B writes
    13 — A's increment is lost.
  3. Unprotected DELETE — The deduplicator stores _dedup_score on similar memories with the
    comment "for later destructive-action guardrails" but never checks it. An LLM-hallucinated
    DELETE on a low-similarity match can silently erase user memories.

Changes

1. OCC (Compare-And-Swap) writes

Added VikingFS.cas_write_file(uri, content, expected_mod_time) which reads the file's modTime
before the expensive LLM call, then verifies it hasn't changed before writing. If a concurrent
modification is detected, the write is aborted (treated as skipped) and the next commit cycle will
re-attempt.

Applied to:

  • SessionCompressor._merge_into_existing (preferences / entities / patterns)
  • MemoryExtractor._append_to_profile (profile.md)
  • MemoryExtractor._merge_tool_memory (tools/{name}.md)
  • MemoryExtractor._merge_skill_memory (skills/{name}.md)

When modTime is unavailable (e.g. stat failed or file doesn't exist yet), the code falls back to
the original write_file — no behavior change for the non-concurrent path.

2. Dedup score guardrail for DELETE

_delete_existing_memory now refuses to delete memories with _dedup_score < 0.5. This implements
the guardrail that was documented but never enforced — the _dedup_score field was stored with the
comment "for later destructive-action guardrails" but was never checked before destructive
operations.

Testing

Design Notes

  • Why OCC instead of distributed locks? Phase 2 involves multiple LLM calls (2–10s each).
    Holding a distributed lock for that duration would block all other sessions for the same user.
    OCC only checks at the write instant — zero blocking.
  • Why abort instead of retry? The next commit cycle will re-dedup and re-attempt the merge
    automatically. Retrying inside the same commit would add complexity without meaningful benefit.
  • Why 0.5 as the DELETE floor? The existing SIMILARITY_THRESHOLD is 0.0, meaning any vector
    search result is sent to the LLM. A score below 0.5 indicates the memories are only loosely
    related, making a DELETE decision unreliable. This threshold can be tuned via a constant if
    needed.

Trae AI added 2 commits May 19, 2026 17:35
…ry writes

Phase 2 of session commit (memory extraction) runs without distributed
locks, which creates read-modify-write races when the same user has
multiple sessions committing concurrently. This can cause:

1. Profile merge lost-updates: Session A merges into profile.md,
   then Session B overwrites with a stale base, losing A's changes.
2. Tool/Skill statistics regression: Both sessions read total_calls=10,
   A writes 15, B writes 13 — A's increment is lost.
3. Unprotected DELETE: LLM hallucinates a DELETE decision for a
   low-similarity memory, causing user data loss.

Fix 1 - OCC (Compare-And-Swap) writes:
Add VikingFS.cas_write_file() which reads the file's modTime before
the expensive LLM call, then verifies it hasn't changed before
writing. If a concurrent modification is detected, the write is
aborted (treated as skipped) and the next commit cycle will re-attempt.

Applied to:
- SessionCompressor._merge_into_existing (preferences/entities/patterns)
- MemoryExtractor._append_to_profile (profile.md)
- MemoryExtractor._merge_tool_memory (tools/{name}.md)
- MemoryExtractor._merge_skill_memory (skills/{name}.md)

Fix 2 - Dedup score guardrail for DELETE:
The deduplicator stores _dedup_score on each similar memory with the
comment "for later destructive-action guardrails" but never checks it.
Now _delete_existing_memory refuses to delete memories with
_dedup_score < 0.5, preventing LLM-hallucinated deletes on
low-similarity matches.
TestDeleteGuardrail:
- delete blocked when _dedup_score < 0.5
- delete allowed when _dedup_score >= 0.5
- delete allowed when no _dedup_score (backward compatible)
- delete blocked for zero and negative scores

TestMergeOCC:
- merge succeeds when modTime unchanged (CAS passes)
- merge aborted when modTime changed (CAS fails)
- merge falls back to write_file when modTime unavailable

TestCasWriteFile:
- cas_write_file succeeds when modTime matches
- cas_write_file fails when modTime differs
- cas_write_file fails when stat raises exception
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 75
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add dedup score guardrail for memory deletion

Relevant files:

  • openviking/session/compressor.py
  • tests/session/test_memory_occ.py

Sub-PR theme: Implement OCC write protection for memory merge operations

Relevant files:

  • openviking/storage/viking_fs.py
  • openviking/session/compressor.py
  • openviking/session/memory_extractor.py
  • tests/session/test_memory_occ.py

⚡ Recommended focus areas for review

Race Condition in CAS Write

The cas_write_file method uses separate stat and write operations, which are not atomic. This creates a race condition where a concurrent modification between the stat check and write can still result in lost updates, defeating the purpose of the compare-and-swap mechanism.

async def cas_write_file(
    self,
    uri: str,
    content: Union[str, bytes],
    expected_mod_time: str,
    ctx: Optional[RequestContext] = None,
) -> bool:
    """Compare-and-swap write: only succeeds if the file's modTime matches.

    Returns True if the write succeeded, False if a concurrent modification
    was detected (modTime changed since the read).
    """
    self._ensure_access(uri, ctx)
    path = self._uri_to_path(uri, ctx=ctx)

    try:
        stat = await self._run_in_threadpool(self.agfs.stat, path)
    except Exception:
        return False

    current_mod_time = ""
    if isinstance(stat, dict):
        current_mod_time = str(stat.get("modTime", stat.get("mtime", "")))

    if current_mod_time != expected_mod_time:
        logger.warning(
            "OCC conflict on %s: expected modTime=%s, current=%s",
            uri,
            expected_mod_time,
            current_mod_time,
        )
        return False

    await self._ensure_parent_dirs(path)

    if isinstance(content, str):
        content = content.encode("utf-8")

    content = await self._encrypt_content(content, ctx=ctx)
    await self._run_in_threadpool(self.agfs.write, path, content)
    return True
Mismatched Docstring for Delete

The _delete_existing_memory docstring claims to use optimistic concurrency control (OCC), but the implementation does not include any OCC checks for concurrent modifications before deletion.

async def _delete_existing_memory(
    self, memory: Context, viking_fs, ctx: RequestContext
) -> bool:
    """Hard delete an existing memory file and clean up its vector record.

    Uses optimistic concurrency control (OCC): verifies the file hasn't
    been modified since the dedup search read it. If a concurrent
    modification is detected, the delete is aborted to avoid clobbering
    another session's merge.
    """
    dedup_score = (memory.meta or {}).get("_dedup_score", 0)
    if isinstance(dedup_score, (int, float)) and dedup_score < 0.5:
        logger.warning(
            "Refusing to delete memory %s with low dedup_score=%.4f "
            "(floor=0.50). LLM may have hallucinated the DELETE decision.",
            memory.uri,
            float(dedup_score),
        )
        return False

    try:
        await viking_fs.rm(memory.uri, recursive=False, ctx=ctx)
    except Exception as e:
        logger.error(f"Failed to delete memory file {memory.uri}: {e}")
        return False

@Jay-ju Jay-ju changed the title Fix/memory occ write protection Add optimistic concurrency control and dedup score guardrail for memory writes May 19, 2026
@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Extract modTime parsing helper

Extract the duplicated modTime parsing logic into a helper method to reduce code
repetition across compressor.py, memory_extractor.py, and viking_fs.py. This ensures
consistent handling of modTime/mtime keys.

openviking/storage/viking_fs.py [2109-2149]

+def _get_mod_time(self, stat: Union[dict, Any]) -> str:
+    if not isinstance(stat, dict):
+        return ""
+    return str(stat.get("modTime", stat.get("mtime", "")))
+
 async def cas_write_file(
     self,
     uri: str,
     content: Union[str, bytes],
     expected_mod_time: str,
     ctx: Optional[RequestContext] = None,
 ) -> bool:
     """Compare-and-swap write: only succeeds if the file's modTime matches.
 
     Returns True if the write succeeded, False if a concurrent modification
     was detected (modTime changed since the read).
     """
     self._ensure_access(uri, ctx)
     path = self._uri_to_path(uri, ctx=ctx)
 
     try:
         stat = await self._run_in_threadpool(self.agfs.stat, path)
     except Exception:
         return False
 
-    current_mod_time = ""
-    if isinstance(stat, dict):
-        current_mod_time = str(stat.get("modTime", stat.get("mtime", "")))
+    current_mod_time = self._get_mod_time(stat)
 
     if current_mod_time != expected_mod_time:
         logger.warning(
             "OCC conflict on %s: expected modTime=%s, current=%s",
             uri,
             expected_mod_time,
             current_mod_time,
         )
         return False
 
     await self._ensure_parent_dirs(path)
 
     if isinstance(content, str):
         content = content.encode("utf-8")
 
     content = await self._encrypt_content(content, ctx=ctx)
     await self._run_in_threadpool(self.agfs.write, path, content)
     return True
Suggestion importance[1-10]: 5

__

Why: Valid refactor to reduce code duplication and ensure consistent modTime/mtime parsing across the codebase. The existing code snippet matches the PR's new cas_write_file method, improving maintainability.

Low

Trae AI added 2 commits May 19, 2026 17:55
R7 (Error Handling): Replace bare 'except Exception: pass' and
'except Exception: return False' with specific exception types
and logging, per .pr_agent.toml rules.

- cas_write_file: split into FileNotFoundError (debug) + general
  Exception (warning) with descriptive messages
- _merge_into_existing: log warning on vector cleanup failure
  instead of silently swallowing
- _delete_existing_memory: fix docstring to match implementation
  (dedup score guardrail, not OCC)
Replace 'except Exception: pass' with specific FileNotFoundError
(debug-level) and general Exception (warning-level) with
descriptive messages, per .pr_agent.toml R7 rule.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

1 participant