Conversation
…ovider Adds workers/common/llm/concurrency.py (ProviderGateRegistry, _HostGate, _KindGate, _KindCounter, ProviderGate facade, ConcurrencyGatedProvider, ConcurrencyConfig). Sentinel-uncapped defaults; tenacity predicate is a no-op until Phase 3; stream() is pass-through until Phase 6. Module is unwired — kill switch default-on but no factory threads it through yet (Phase 2). Adds workers/common/embedding/concurrency.py (ConcurrencyGatedEmbeddingProvider) sharing the same registry via kind="embedding". Extends workers/common/llm/fake.py with fail-mode kwargs (raise_on_attempts, exc, delay_seconds, responses queue) for Phase 3 fault-injection tests. Adds tenacity>=8.5.0 and aiolimiter>=1.2.1 to workers/pyproject.toml. 20 new unit tests (test_concurrency.py) all passing; full 572-test suite green; new files lint-clean. Refs CA-169 / plan v4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…servicers, CLI, and capability endpoint All LLM/embedding factory functions now accept gate_registry so long-lived startup providers are wrapped in ConcurrencyGatedProvider/ConcurrencyGatedEmbeddingProvider. ProviderGateRegistry is constructed once in __main__.py and threaded into every servicer. GetProviderCapabilities (Decision 12) now sources max_concurrent_calls from the gate registry's effective value for the resolved-context provider, with a WorkerConfig legacy fallback when the kill switch is off. Added cli_main.py with build_cli_runtime_provider helper; CLI entry points and benchmark runner updated to use it with proper registry shutdown. resolve_provider_for_context returns a third ProviderResolutionKey element so callers can look up the correct gate without re-wrapping. New conftest.py fixture and ~10 new Phase 2 tests cover factory wrapping, capability endpoint, metadata override path, kill-switch fallback, CLI helper, and benchmark provider construction. Refs: CA-169 / plan v4 Phase 2. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ries; load real defaults; raise local caps (atomic per Decision 3)
Cross-campaign gate: dick's capacity-detection investigation (Outcome A,
2026-05-06) ruled out H1/H5 (model-swap thrashing, single-request latency
spikes) as primary throughput killer; H2 (serial loops + hand-rolled retry
contention) confirmed. OLLAMA_NUM_PARALLEL=1 operator-fixed. Aggressive cap
raise to 4 safe per Outcome A findings.
Decision 3 atomic commit — all of the following land together so the system
is never in a half-migrated state:
1. SDK retry disabled
- AsyncOpenAI(max_retries=0) in openai_compat.py
- AsyncAnthropic(max_retries=0) in anthropic.py
Tenacity now owns all retry; SDK built-in retry would double-count 429s.
2. Hand-rolled retry loops deleted (two sites)
- hierarchical.py: for-attempt loop with _is_provider_compute_error +
asyncio.sleep removed; replaced with single-attempt try/except
- renderers.py: _render_with_retry() method deleted; all 4 call sites
replaced with direct require_nonempty(await complete_with_optional_model(...))
3. errors.py DEPRECATED banner added
- is_provider_compute_error() retained for backward compat; docstring
updated explaining both hand-rolled retry loops are deleted and tenacity
owns retry via _retry_predicate
4. router.py RetryError unwrap
- _unwrap_retry_error() helper added
- complete() and stream() unwrap tenacity.RetryError before logging so
callers see the root cause (e.g. RateLimitError) not a RetryError wrapper
5. Tenacity retry predicate (Decision 4 whitelist)
_retry_predicate(exc) in concurrency.py:
- openai.RateLimitError → always retry
- openai.APIStatusError with status in {408, 429, 502, 503, 504} → retry
- anthropic.RateLimitError → always retry
- anthropic.APIStatusError with status in {408, 429, 502, 503, 504} → retry
- httpx.TimeoutException, httpx.ConnectError, httpx.ReadError → retry
- 401 Unauthorized, ValidationError → NOT retried
6. Retry-After header honoring
_extract_retry_after(exc) parses response.headers["retry-after"] (int or
HTTP-date). _make_before_sleep() factory wires it into before_sleep callback
using max(computed_backoff, retry_after) so a small Retry-After: 1 does not
subvert sustained-429 exponential backoff.
7. Empty-content retry preserved (openai_compat.py lines 249-313)
This block handles <think>-budget exhaustion on reasoning models — not a
network retry concern. It is UNTOUCHED and continues to live inside
OpenAICompatProvider._complete_ollama_native() and complete(). See Phase 1
comment in openai_compat.py for rationale.
8. Real defaults loaded in ConcurrencyConfig.from_env() (Decision 6)
Provider LLM caps pre-populated from Decision 6 table:
ollama:1, vllm:4, llama-cpp:4, sglang:4, lmstudio:2,
openai:8, anthropic:4, openrouter:8, gemini:8, openai-compatible:4
Env-var overrides (SOURCEBRIDGE_WORKER_LLM_MAX_CONCURRENT) still apply on
top. retry_max_attempts default raised 1→5.
9. Local fan-out caps raised (Outcome A safe per dick's findings)
hierarchical.py: DEFAULT_LEAF/FILE/PACKAGE_CONCURRENCY 1/2/2 → 4/4/4
renderers.py: deep_parallelism / deep_repair_parallelism 2/2 → 4/4
10. aiolimiter RPM shaping wired
ConcurrencyGatedProvider.__init__ instantiates AsyncLimiter(config.rpm, 60)
when config.rpm is set; limiter.acquire() called before upstream in
complete() and stream(). Rate-limit unit tests use _SimpleLimiter stub
(aiolimiter uses self._loop.time() internally — no clock injection seam).
11. Tests: 43 new tests in test_concurrency_phase3.py (629 total, 1 slow
deselected). @pytest.mark.slow registered in pyproject.toml. Covers:
cap raises, compose, host-gate Ollama URL, 429 retry, Retry-After header,
SDK retry disabled, 401/ValidationError no-retry, rate-limit unit,
503 retry, slot cancellation, Decision 2 layering, jitter spread,
registry lifecycle, _retry_predicate units, router unwrap, Decision 6
defaults, empty-content retry preserved.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
workers/requirements/spec_extraction.py: replaces the serial for-loop over `groups.items()` with bounded concurrent fan-out (LOCAL_GROUP_FANOUT_LIMIT=8, asyncio.gather with serial result merge). Provider gate is the binding upstream cap; local semaphore prevents N-thousand pending coroutines on huge repos. Order-sensitive accumulation preserved: results merged in groups.items() insertion order (Python dict ordering). Token counts summed; model_name is last-writer-wins across groups (non-deterministic under fan-out, but semantically equivalent to the old serial last-write). Per-LLM failures caught inside _refine_one; framework-level errors surfaced via log.warning. Refs CA-169 / plan v4 Phase 4.
workers/common/embedding/openai_compat.py: chunked batches now issue concurrently (LOCAL_EMBEDDING_FANOUT_LIMIT=4, asyncio.gather with order-preserved flatten). Output[i] corresponds to input[i]: batches are indexed, gathered in index order, then flattened in iteration order — no shuffle possible. workers/common/embedding/ollama.py: chunk loop stays serial; one-line comment citing plan Decision 1 + Decision 8 (Ollama host gate combines LLM + embedding kinds; separate fan-out would race against the single-slot host semaphore). Refs CA-169 / plan v4 Phase 5.
Proto: add float current_tokens_per_second = 14 to KnowledgeStreamProgress;
regenerate Go + Python stubs.
Go: extend ReportProgress(…, throughputTPS float64) and SetProgress(…, throughputTPS)
through the full 8-layer chain — progressWriter callback → handleProgress →
llm.Runtime interface/impl → JobStore interface → MemStore → Job struct →
SurrealDB DTO (current_tokens_per_second *float64, omitempty) → monitorJobView.
orchestrator.runtime caches lastThroughputTPS; writeProgressLocked persists it.
TypeScript: add current_tokens_per_second?: number to LLMJobView; render a
tok/s pill in ActiveJobCard (admin monitor) when status=generating and value > 0.
Python worker: ProviderGateRegistry gains snapshot_tokens_per_second(provider,
base_url, kind) + _run_aggregator() task (emits llm_provider_gate_metrics log
at metrics_interval_seconds cadence, started in __init__, cancelled in close()).
OpenAICompatGatedProvider overrides stream() with stream_options={"include_usage":
True} + 400 fallback (marks streaming_usage_unsupported, falls back to raw.stream).
AnthropicGatedProvider overrides stream() using raw_client.messages.stream() +
get_final_message() for output_tokens. wrap_provider() dispatches on provider type.
KnowledgeServicer gains gate_registry kwarg + _snapshot_tokens_per_second() helper;
all six async-for-prog loops set prog.current_tokens_per_second before yielding.
progress_event() in streaming.py gains current_tokens_per_second param.
Tests (Go): TestStreamProgressDriverPropagatesThroughputTPS (stream driver →
throughputTPS flows to both rt and write callback),
TestMonitorJobViewCurrentTokensPerSecondSerializesWhenNonZero /
…OmittedWhenZero (omitempty serialization contract).
Tests (Python): 8 cases in test_concurrency_phase6.py covering aggregator
metrics log, non-streaming ring buffer, OpenAICompatGatedProvider usage
extraction, AnthropicGatedProvider usage extraction, 400 fallback,
streaming_usage_unsupported flag, cancellation no-record, and close() task teardown.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…y with gate_snapshot Adds GetLLMGateSnapshot RPC to ReasoningService (additive method, existing service; explicit empty request struct for forward-compat). Worker exposes ProviderGateRegistry.snapshot() returning per-(provider, base_url_normalized, kind) gates. Go REST handler proxies to the worker (1s in-memory cache), populates monitorActivityResponse.gate_snapshot. TS LLMGateEntry type + admin monitor page renders a new "LLM Gate Activity" table. No new endpoint, no new admin route, no new gRPC service — reuses /api/v1/admin/llm/activity and existing ReasoningService. Field is omitempty so old/kill-switched deployments don't surface misleading empty arrays. _UNCAPPED sentinel (sys.maxsize) is clamped to 0 before writing to int32 proto field, matching the (known=true, calls=0) unbounded encoding. Refs CA-169 / plan v4 Phase 7. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…t-refactors entry docs/admin/llm-config.md gains "Operator concurrency tuning" section: env-var resolution order (with legacy seed precedence per Decision 12), Decision 6 default-values table, capacity-contract rewiring note, RPM specifics, combined Ollama cap rule, server-side companion knobs (cites dick's investigation), Phase 7 admin monitor pointer. CLAUDE.md ## Recent refactors gains an entry mirroring the orchestrator-capacity- detection format, with load-bearing constraints (don't re-enable SDK retry; kill switch is the rollback path; registry construction conventions; empty-content retry preservation; gate authoritative for GetProviderCapabilities). Refs CA-169 / plan v4 Phase 8. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…v vars
Decision 7 / codex r2 L1: ConcurrencyConfig.from_env() now scans
SOURCEBRIDGE_LLM_PROVIDER_*_{MAX_CONCURRENT,RPM} and
SOURCEBRIDGE_EMBEDDING_PROVIDER_*_MAX_CONCURRENT env-var names for
tokens that don't match the canonical providers list. Unknown tokens
produce a structlog WARNING (event=concurrency_config_unknown_provider_token)
rather than a silent no-op, so an operator typo like OPENAICOMPAT_MAX_CONCURRENT
(instead of OPENAI_COMPATIBLE_MAX_CONCURRENT) is surfaced at startup
without crashing existing deployments.
Replaces the comment-stub test_concurrency_config_rejects_unknown_provider_token
with a real test that asserts no exception, valid env-var still parsed,
and the structlog warning is emitted with the correct fields.
Closes the punch-list item from valerie's verification pass.
Refs CA-169 / plan v4 Decision 7.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ow-test marker Lint (ruff): - N806: hoist function-local constants to module scope (_LOCAL_PROBE_PROVIDERS in __main__.py, _LLM_SUFFIXES/_EMBED_SUFFIXES in concurrency.py, LOCAL_GROUP_FANOUT_LIMIT/_GroupResult in spec_extraction.py). - F401: drop unused pydantic.model_validator import in common/config.py; drop unused ProbeBackend import in test_concurrency_probe.py; drop unused asyncio import in test_servicer_gate_snapshot.py. - B904: re-raise with `from err` in WorkerConfig int validator. - F541: drop f-prefix on f-string with no placeholders in __main__.py. - SIM105: use contextlib.suppress in concurrency_probe try/except/pass. - I001: ruff auto-fixed import sort order in __main__.py, reasoning/servicer.py, test_grpc_auth_interceptor.py, test_servicer_gate_snapshot.py, test_concurrency_phase4.py. Tests: - create_llm_provider() now wraps with ConcurrencyGatedProvider whenever gate_registry is supplied, regardless of config.test_mode. The previous `not config.test_mode` guard caused test_cli_review_constructs_registry and test_benchmark_constructs_registry to receive an unwrapped FakeLLMProvider in CI (SOURCEBRIDGE_TEST_MODE=true), making the isinstance(provider, ConcurrencyGatedProvider) assertion fail. - CI workflow now passes -m "not slow" to pytest so the real-clock aiolimiter spacing test (test_wrapper_rate_limits_with_real_aiolimiter) is consistently skipped in containerized CI. Refs CA-169 / PR #43. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Universal cross-subsystem LLM concurrency control for the Python worker. Every LLM and embedding call now passes through a per-
(provider, base_url)host or per-kind gate that owns the semaphore, jitter-aware tenacity retry, optional RPM limiter, and per-call tok/s ring buffer. Eliminates the 5×3=15-attempt storm from stacked hand-rolled retries; rewiresGetProviderCapabilities.max_concurrent_callsto the gate's effective cap so Go and Python agree on capacity by construction.Closes CA-169. Follow-up: CA-170 (Surreal round-trip test for
Job.CurrentTokensPerSecond).Cross-campaign gate (dick diagnose campaign)
Outcome A applies — investigation
thoughts/shared/investigations/2026-05-06-diagnose-llm-throughput-rotten.md. Verbatim go/no-go quote:H1 (concurrent model contention) and H5 (model-swap thrashing) ruled out as primary causes; H2 (worker serial loops) confirmed for spec extraction + OpenAI embeddings. Aggressive cap raise to 4 is safe under reproduced load.
What ships (9 commits, ~1700 LOC author code)
3274adeProviderGateRegistry, host gate + per-kind counters,ConcurrencyGatedProvider, sharedFakeLLMProvider653ff8fgate_registrythrough factories,__main__, CLI helpers, test fixtures; rewireGetProviderCapabilitiesto gate registry;resolve_provider_for_contextreturns 3-tuple withProviderResolutionKeya560a29openai_compat.py:249-313); Decision 4 tenacity predicate + Retry-After honoring; Decision 6 real defaults; local caps raised 1/2/2 → 4/4/4474befb_SPEC_GROUP_FANOUT_LIMIT=8)8212e30LOCAL_EMBEDDING_FANOUT_LIMIT=4); Ollama embedding stays serial19511c0stream_options400-fallback6eed915/api/v1/admin/llm/activitywithgate_snapshot; newGetLLMGateSnapshotRPC on existingReasoningService; admin monitor "LLM Gate Activity" table16f05ebdocs/admin/llm-config.md"Operator concurrency tuning";CLAUDE.mdRecent-refactors entry5eb1a55Load-bearing constraints
max_retries=0onAsyncOpenAI/AsyncAnthropic). Wrapper owns retry.SOURCEBRIDGE_LLM_CONCURRENCY_WRAPPER_ENABLED=false.workers/common/llm/openai_compat.py:249-313— it handles ``-budget exhaustion, not network errors.GetProviderCapabilities; bootstrapWorkerConfigis legacy fallback./api/v1/admin/llm/activityorKnowledgeStreamProgress— extend them.Test plan
cd workers && uv run python -m pytest tests/ -m "not slow"→ 652 passed, 1 deselectedgo build ./...→ cleango test ./internal/api/graphql/... ./internal/llm/... ./internal/db/... ./internal/api/rest/... ./internal/worker/...→ all packages greencd web && pnpm tsc --noEmit→ cleanopenai_compat.py:249-313confirmed untouched_render_with_retrydeleted (zero grep hits)ConcurrencyConfig.from_env()SOURCEBRIDGE_LLM_PROVIDER_OLLAMA_MAX_CONCURRENT=4againstOLLAMA_NUM_PARALLEL=4; wall-clock improves; no new 429s/503sllm_provider_gate_metricslines every 30s with non-trivial counters/admin/monitor→ "LLM Gate Activity" table populates with non-zero counters under loadArtifacts
thoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.mdthoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.decisions.mdthoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.state.mdthoughts/shared/investigations/2026-05-06-diagnose-llm-throughput-rotten.mddocs/admin/llm-config.md§ Operator concurrency tuning🤖 Generated with Claude Code