feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363
Open
magiccao wants to merge 1 commit into
Open
feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363magiccao wants to merge 1 commit into
magiccao wants to merge 1 commit into
Conversation
… consistency
Several process-local singletons (EmbeddingTaskTracker, RequestWaitTracker,
SemanticQueue coalesce versions, request stats) were backed by plain
dict + threading.Lock. On a single machine this is fine; across multiple
load-balanced instances the state silently diverges.
This PR introduces a Coordinator abstraction that unifies these behind a
small set of generic KV primitives:
- InProcessCoordinator (default): zero new dependencies, behaviour identical
to the existing singletons. Single-machine deployments are unaffected.
- RedisCoordinator (opt-in): maps each primitive onto an atomic Redis command
(INCRBY, SET NX EX, SADD, RPUSH, etc.) for cross-instance consistency.
Requires `pip install 'openviking[coordination]'`.
Configuration (ov.conf):
storage:
coordination:
backend: redis # or "memory" (default)
dsn: redis://host:6379 # or env OPENVIKING_COORD_DSN
key_prefix: "ov:coord:"
ttl_sec: 3600
Bug fix: atomic memory semantic dedupe (volcengine#769)
The previous get_int → check → set_int pattern in SemanticQueue was a
TOCTOU race: two instances could both read 0 and both enqueue the same
memory semantic task. Replaced with a single atomic set_if_absent()
(Redis SET NX EX; in-process monotonic-deadline check under lock).
Memory safety: amortized claim pruning
_claim_deadlines in InProcessCoordinator is swept in bulk once the map
exceeds a threshold, so one-shot mem_dedupe:* keys that are never
revisited cannot grow the map unboundedly.
Refactor: extract RequestQueueStats / RequestStatsAccumulator
The inline stats dict+lock duplicated across SemanticProcessor and
collection_schemas is unified into openviking/telemetry/request_queue_stats.py.
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨No code suggestions found for the PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Several process-local singletons (
EmbeddingTaskTracker,RequestWaitTracker,SemanticQueuecoalesce versions, request stats) are backed by plaindict + threading.Lock. On a single machine this is fine; across multipleload-balanced instances the state silently diverges — coalesce versions reset
per instance, dedupe windows only protect within one process, and
request-wait tracking sees a partial view.
Changes
New:
Coordinatorabstraction (openviking/service/coordinator.py)A small KV protocol with two concrete backends:
InProcessCoordinatorRedisCoordinatorredispackageActivated via
storage.coordination.backend = "redis"inov.conf:Install the optional extra:
pip install 'openviking[coordination]'Bug fix: atomic memory semantic dedupe (#769)
The previous
get_int → check → set_intinSemanticQueuewas a TOCTOUrace: two instances could both read 0 and both enqueue the same memory
semantic task. Replaced with a single atomic
set_if_absent()call(Redis
SET NX EX; in-process monotonic-deadline check under lock).Memory safety: amortized claim pruning
_claim_deadlinesinInProcessCoordinatoris swept in bulk once itexceeds a threshold, preventing unbounded growth from one-shot
mem_dedupe:*keys that are never revisited.Refactor: extract
RequestQueueStats/RequestStatsAccumulatorThe inline stats dict+lock duplicated in
SemanticProcessorandcollection_schemasis unified intoopenviking/telemetry/request_queue_stats.py.Compatibility
Single-machine deployments: no behavior change, no new dependencies.
The
InProcessCoordinatoris the default and behaves identically to theprior singletons.
Test plan
tests/misc/test_coordinator.py— InProcess full coverage + Redis parity viafakeredis(skipped when absent)tests/misc/test_semantic_coalesce.py— staleness checktests/misc/test_request_queue_stats.py— RequestStatsAccumulatortests/storage/test_embedding_tracker.py— Coordinator-backed trackertests/storage/test_semantic_queue_memory_dedupe.py— atomic dedupe regressionruff check / ruff format— all clean🤖 Generated with Claude Code