Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/ov_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ enum Commands {
AddResource {
/// Local path or URL to import
path: String,
/// Exact target URI (must not exist yet) (cannot be used with --parent)
/// Exact target URI (existing targets are updated incrementally) (cannot be used with --parent)
#[arg(long)]
to: Option<String>,
/// Target parent URI (must already exist and be a directory) (cannot be used with --to)
Expand Down
1 change: 1 addition & 0 deletions openviking/session/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ async def _flush_semantic_operations(self, ctx: RequestContext) -> None:
user_id=ctx.user.user_id,
agent_id=ctx.user.agent_id,
role=ctx.role.value,
update_mode="incremental",
changes=changes_dict,
telemetry_id=telemetry.telemetry_id,
)
Expand Down
2 changes: 2 additions & 0 deletions openviking/storage/content_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ async def _enqueue_semantic_refresh(
role=ctx.role.value,
skip_vectorization=False,
telemetry_id=telemetry.telemetry_id,
update_mode="incremental",
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
coalesce_key=(
build_semantic_coalesce_key(
Expand Down Expand Up @@ -446,6 +447,7 @@ async def _enqueue_memory_refresh(
role=ctx.role.value,
skip_vectorization=False,
telemetry_id=telemetry.telemetry_id,
update_mode="incremental",
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
coalesce_key=build_semantic_coalesce_key(
context_type="memory",
Expand Down
15 changes: 8 additions & 7 deletions openviking/storage/queuefs/semantic_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Awaitable, Callable, Dict, List, Optional

from openviking.server.identity import RequestContext
from openviking.server.error_mapping import is_not_found_error
from openviking.storage.queuefs.semantic_sidecar import write_semantic_sidecars
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
Expand Down Expand Up @@ -122,7 +123,7 @@ def __init__(
self._refresh_task: Optional[asyncio.Task] = None

def _create_on_complete_callback(self) -> Callable[[], Awaitable[None]]:
"""Create on_complete callback for incremental update or full update."""
"""Create on_complete callback for syncing temp content to target."""

async def noop_callback() -> None:
return
Expand All @@ -133,12 +134,10 @@ async def noop_callback() -> None:
if self._target_uri == self._root_uri:
return noop_callback

# If full update, move temp uri to target uri has been handled in the processor
if not self._incremental_update:
return noop_callback

async def sync_diff_callback() -> None:
try:
# When semantic generation runs on a temp tree, we still need to sync
# the finalized content into target_uri after DAG completion, even for full imports.
diff = await self._processor._sync_topdown_recursive(
self._root_uri,
self._target_uri,
Expand Down Expand Up @@ -294,11 +293,13 @@ async def _dispatch_dir(self, dir_uri: str, parent_uri: Optional[str]) -> None:
elif self._root_done:
self._root_done.set()

async def _list_dir(self, uri: str) -> tuple[list[str], list[str]]:
async def _list_dir(self, uri: str, *, missing_ok: bool = False) -> tuple[list[str], list[str]]:
"""List directory entries and return (child_dirs, file_paths)."""
try:
entries = await self._viking_fs.ls(uri, ctx=self._ctx)
except Exception as e:
if missing_ok and is_not_found_error(e):
return [], []
logger.warning(f"Failed to list directory {uri}: {e}")
return [], []

Expand Down Expand Up @@ -438,7 +439,7 @@ async def _check_dir_children_changed(
if not target_path:
return True
try:
target_dirs, target_files = await self._list_dir(target_path)
target_dirs, target_files = await self._list_dir(target_path, missing_ok=True)
current_file_names = {f.split("/")[-1] for f in current_files}
target_file_names = {f.split("/")[-1] for f in target_files}
if current_file_names != target_file_names:
Expand Down
14 changes: 13 additions & 1 deletion openviking/storage/queuefs/semantic_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional
from uuid import uuid4


Expand All @@ -20,6 +20,9 @@ def build_semantic_coalesce_key(
return "|".join([context_type, account_id, user_id, agent_id, uri.rstrip("/")])


SemanticUpdateMode = Literal["full", "incremental"]


@dataclass
class SemanticMsg:
"""Semantic extraction queue message.
Expand Down Expand Up @@ -50,6 +53,7 @@ class SemanticMsg:
skip_vectorization: bool = False
telemetry_id: str = ""
target_uri: str = ""
update_mode: SemanticUpdateMode = "full"
lifecycle_lock_handle_id: str = ""
is_code_repo: bool = False
coalesce_key: str = ""
Expand All @@ -70,6 +74,7 @@ def __init__(
skip_vectorization: bool = False,
telemetry_id: str = "",
target_uri: str = "",
update_mode: SemanticUpdateMode = "full",
lifecycle_lock_handle_id: str = "",
is_code_repo: bool = False,
coalesce_key: str = "",
Expand All @@ -87,6 +92,7 @@ def __init__(
self.skip_vectorization = skip_vectorization
self.telemetry_id = telemetry_id
self.target_uri = target_uri
self.update_mode = update_mode
self.lifecycle_lock_handle_id = lifecycle_lock_handle_id
self.is_code_repo = is_code_repo
self.coalesce_key = coalesce_key
Expand Down Expand Up @@ -118,6 +124,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg":
missing.append("context_type")
raise ValueError(f"Missing required fields: {missing}")

raw_update_mode = data.get("update_mode", "full")
if raw_update_mode not in {"full", "incremental"}:
raise ValueError(f"Invalid update_mode: {raw_update_mode}")
update_mode: SemanticUpdateMode = raw_update_mode

obj = cls(
uri=uri,
context_type=context_type,
Expand All @@ -129,6 +140,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg":
skip_vectorization=data.get("skip_vectorization", False),
telemetry_id=data.get("telemetry_id", ""),
target_uri=data.get("target_uri", ""),
update_mode=update_mode,
lifecycle_lock_handle_id=data.get("lifecycle_lock_handle_id", ""),
is_code_repo=data.get("is_code_repo", False),
coalesce_key=data.get("coalesce_key", ""),
Expand Down
25 changes: 7 additions & 18 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ async def _enqueue_parent_refresh(self, msg: SemanticMsg, uri: str) -> None:
agent_id=msg.agent_id,
role=msg.role,
skip_vectorization=msg.skip_vectorization,
update_mode="incremental",
changes={"modified": [uri]},
coalesce_key=build_semantic_coalesce_key(
context_type=msg.context_type,
Expand Down Expand Up @@ -337,25 +338,13 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
is_incremental = False
target_uri = msg.target_uri
viking_fs = get_viking_fs()
if msg.target_uri:
target_exists = await viking_fs.exists(
msg.target_uri, ctx=self._current_ctx
)
# Check if target URI exists and is not the same as the source URI(避免重复处理)
if target_exists and msg.uri != msg.target_uri:
is_incremental = True
logger.info(
f"Target URI exists, using incremental update: {msg.target_uri}"
)
elif target_exists and msg.changes and msg.uri == msg.target_uri:
is_incremental = True
logger.info(
f"Using direct incremental semantic update for: {msg.uri}"
)
elif msg.changes:
if msg.update_mode == "incremental":
is_incremental = True
target_uri = msg.uri
logger.info(f"Using direct incremental semantic update for: {msg.uri}")
if not target_uri:
target_uri = msg.uri
logger.info(
f"Using enqueued incremental semantic update for: {target_uri}"
)

# Re-acquire lifecycle lock if handle was lost (e.g. server restart)
if msg.lifecycle_lock_handle_id:
Expand Down
6 changes: 6 additions & 0 deletions openviking/utils/resource_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ async def process_resource(
original_temp_uri = temp_uri # 保存原始 temp_uri 用于最终输出
candidate_uri = getattr(context_tree, "_candidate_uri", None) if context_tree else None
lifecycle_lock_handle_id = ""
update_mode_map: Dict[str, str] = {}

if root_uri and temp_uri:
from openviking.storage.transaction import get_lock_manager
Expand All @@ -262,12 +263,16 @@ async def process_resource(
viking_fs = get_viking_fs()
lock_manager = get_lock_manager()
try:
update_mode_map[root_uri] = (
"incremental" if await viking_fs.exists(root_uri, ctx=ctx) else "full"
)
if candidate_uri:
root_uri, lifecycle_lock_handle_id = await self._commit_unique_candidate(
candidate_uri=candidate_uri,
ctx=ctx,
)
result["root_uri"] = root_uri
update_mode_map = {root_uri: "full"}
else:
dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx)
handle = lock_manager.create_handle()
Expand Down Expand Up @@ -310,6 +315,7 @@ async def process_resource(
skip_vectorization=skip_vec,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
temp_uris=[temp_uri_for_summarize],
update_mode_map=update_mode_map,
is_code_repo=is_code_repo,
**kwargs,
)
Expand Down
18 changes: 18 additions & 0 deletions openviking/utils/summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]:
children.append((name, child_temp_uri))
return children

async def resolve_update_mode(target_uri: str, source_uri: str) -> str:
if not target_uri or target_uri == source_uri:
return "full"
viking_fs = get_viking_fs()
try:
return "incremental" if await viking_fs.exists(target_uri, ctx=ctx) else "full"
except Exception:
logger.debug(
"Failed to preflight target existence for %s", target_uri, exc_info=True
)
return "full"

update_mode_map = kwargs.get("update_mode_map") or {}

for uri, temp_uri in zip(resource_uris, temp_uris, strict=True):
# Determine context_type based on URI
context_type = context_type_for_uri(uri)
Expand All @@ -94,6 +108,9 @@ async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]:
enqueue_units.append((uri, temp_uri))

for target_uri, source_uri in enqueue_units:
update_mode = update_mode_map.get(target_uri)
if update_mode not in {"full", "incremental"}:
update_mode = await resolve_update_mode(target_uri, source_uri)
msg = SemanticMsg(
uri=source_uri,
context_type=context_type,
Expand All @@ -104,6 +121,7 @@ async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]:
skip_vectorization=skip_vectorization,
telemetry_id=telemetry.telemetry_id,
target_uri=target_uri if target_uri != source_uri else None,
update_mode=update_mode,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
is_code_repo=kwargs.get("is_code_repo", False),
)
Expand Down
59 changes: 59 additions & 0 deletions tests/misc/test_resource_processor_mv.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,62 @@ async def test_resource_processor_auto_candidate_skips_existing_and_busy(monkeyp
assert fake_lock_manager.acquired_exact_paths == []
assert fake_lock_manager.acquired_tree_paths == ["/mock/resources/root_2"]
assert summarize_calls[0]["temp_uris"] == ["viking://temp/root_tmp"]
assert summarize_calls[0]["resource_uris"] == ["viking://resources/root_2"]
assert summarize_calls[0]["update_mode_map"] == {"viking://resources/root_2": "full"}


@pytest.mark.asyncio
async def test_resource_processor_explicit_to_keeps_incremental_mode(monkeypatch):
from openviking.utils.resource_processor import ResourceProcessor

fake_fs = _FakeVikingFS(existing_uris={"viking://resources/root"})
fake_lock_manager = _FakeLockManager()
summarize_calls = []

monkeypatch.setattr(
"openviking.utils.resource_processor.get_current_telemetry",
lambda: _DummyTelemetry(),
)
monkeypatch.setattr("openviking.utils.resource_processor.get_viking_fs", lambda: fake_fs)
monkeypatch.setattr(
"openviking.storage.transaction.get_lock_manager",
lambda: fake_lock_manager,
)

rp = ResourceProcessor(vikingdb=_DummyVikingDB(), media_storage=None)
rp._get_media_processor = MagicMock()
rp._get_media_processor.return_value.process = AsyncMock(
return_value=SimpleNamespace(
temp_dir_path="viking://temp/tmpdir",
source_path="x",
source_format="text",
meta={},
warnings=[],
)
)

context_tree = SimpleNamespace(
root=SimpleNamespace(uri="viking://resources/root", temp_uri="viking://temp/root_tmp")
)
rp.tree_builder.finalize_from_temp = AsyncMock(return_value=context_tree)
rp._summarizer = SimpleNamespace(
summarize=AsyncMock(
side_effect=lambda *args, **kwargs: (
summarize_calls.append(kwargs) or {"status": "success"}
)
)
)

result = await rp.process_resource(
path="x",
ctx=object(),
build_index=True,
to="viking://resources/root",
)

assert result["status"] == "success"
assert result["root_uri"] == "viking://resources/root"
assert summarize_calls[0]["resource_uris"] == ["viking://resources/root"]
assert summarize_calls[0]["update_mode_map"] == {"viking://resources/root": "incremental"}
assert fake_lock_manager.acquired_exact_paths == []
assert fake_lock_manager.acquired_tree_paths == ["/mock/resources/root"]
Loading