Skip to content

Worker LLM concurrency refactor: universal gate + retry + tok/s observability (CA-169)#43

Merged
jstuart0 merged 10 commits intomainfrom
feat/worker-llm-concurrency
May 7, 2026
Merged

Worker LLM concurrency refactor: universal gate + retry + tok/s observability (CA-169)#43
jstuart0 merged 10 commits intomainfrom
feat/worker-llm-concurrency

Conversation

@jstuart0
Copy link
Copy Markdown
Collaborator

@jstuart0 jstuart0 commented May 6, 2026

Summary

Universal cross-subsystem LLM concurrency control for the Python worker. Every LLM and embedding call now passes through a per-(provider, base_url) host or per-kind gate that owns the semaphore, jitter-aware tenacity retry, optional RPM limiter, and per-call tok/s ring buffer. Eliminates the 5×3=15-attempt storm from stacked hand-rolled retries; rewires GetProviderCapabilities.max_concurrent_calls to the gate's effective cap so Go and Python agree on capacity by construction.

Closes CA-169. Follow-up: CA-170 (Surreal round-trip test for Job.CurrentTokensPerSecond).

Cross-campaign gate (dick diagnose campaign)

Outcome A applies — investigation thoughts/shared/investigations/2026-05-06-diagnose-llm-throughput-rotten.md. Verbatim go/no-go quote:

"All 5 queue and execute serially. Effective throughput: 1 page LLM call at a time. ... With num_parallel=8 (128GB RAM allows): ~8 calls/60s"

H1 (concurrent model contention) and H5 (model-swap thrashing) ruled out as primary causes; H2 (worker serial loops) confirmed for spec extraction + OpenAI embeddings. Aggressive cap raise to 4 is safe under reproduced load.

What ships (9 commits, ~1700 LOC author code)

Phase Commit Scope
1 3274ade Foundations: ProviderGateRegistry, host gate + per-kind counters, ConcurrencyGatedProvider, shared FakeLLMProvider
2 653ff8f Wire gate_registry through factories, __main__, CLI helpers, test fixtures; rewire GetProviderCapabilities to gate registry; resolve_provider_for_context returns 3-tuple with ProviderResolutionKey
3 a560a29 Atomic activation: SDK retry off; hand-rolled retries deleted (preserves empty-content retry at openai_compat.py:249-313); Decision 4 tenacity predicate + Retry-After honoring; Decision 6 real defaults; local caps raised 1/2/2 → 4/4/4
4 474befb Parallelize spec extraction group loop (_SPEC_GROUP_FANOUT_LIMIT=8)
5 8212e30 OpenAI-compat embedding fan-out (LOCAL_EMBEDDING_FANOUT_LIMIT=4); Ollama embedding stays serial
6 19511c0 Per-call tok/s + 8-layer Go persistence chain (proto → progressWriter → streamProgressDriver → Runtime → JobStore → MemStore → Job → Surreal DTO+SQL → monitorJobView); provider-specific gated streaming adapters with stream_options 400-fallback
7 6eed915 Extend /api/v1/admin/llm/activity with gate_snapshot; new GetLLMGateSnapshot RPC on existing ReasoningService; admin monitor "LLM Gate Activity" table
8 16f05eb Docs: docs/admin/llm-config.md "Operator concurrency tuning"; CLAUDE.md Recent-refactors entry
punch 5eb1a55 Validator: warn on unknown provider token in concurrency env vars (Decision 7 / codex r2 L1)

Load-bearing constraints

  • Don't re-enable SDK retry (max_retries=0 on AsyncOpenAI/AsyncAnthropic). Wrapper owns retry.
  • Kill switch is the rollback path: SOURCEBRIDGE_LLM_CONCURRENCY_WRAPPER_ENABLED=false.
  • Don't delete the empty-content retry at workers/common/llm/openai_compat.py:249-313 — it handles ``-budget exhaustion, not network errors.
  • Registry is constructed-once-passed-by-reference (no module-level singletons).
  • Host gate vs. per-kind gate is per-provider classification, not per-call.
  • Gate is authoritative for GetProviderCapabilities; bootstrap WorkerConfig is legacy fallback.
  • Don't fork /api/v1/admin/llm/activity or KnowledgeStreamProgress — extend them.

Test plan

  • cd workers && uv run python -m pytest tests/ -m "not slow"652 passed, 1 deselected
  • go build ./... → clean
  • go test ./internal/api/graphql/... ./internal/llm/... ./internal/db/... ./internal/api/rest/... ./internal/worker/... → all packages green
  • cd web && pnpm tsc --noEmit → clean
  • Empty-content retry at openai_compat.py:249-313 confirmed untouched
  • _render_with_retry deleted (zero grep hits)
  • Decision 6 default-values table verified against ConcurrencyConfig.from_env()
  • 8-layer Go chain compiles end-to-end
  • Manual smoke: hot run against Ollama, default config; wall-clock no worse than pre-refactor
  • Manual smoke: SOURCEBRIDGE_LLM_PROVIDER_OLLAMA_MAX_CONCURRENT=4 against OLLAMA_NUM_PARALLEL=4; wall-clock improves; no new 429s/503s
  • Manual smoke: kill switch off → bit-identical structured-log shape vs. pre-refactor
  • Manual smoke: tail worker logs → llm_provider_gate_metrics lines every 30s with non-trivial counters
  • Manual smoke: hit /admin/monitor → "LLM Gate Activity" table populates with non-zero counters under load
  • Manual smoke: per-job tok/s pill renders during Living Wiki generation

Artifacts

  • Plan: thoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.md
  • Decisions log: thoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.decisions.md
  • State: thoughts/shared/plans/active-2026-05-06-deliver-worker-llm-concurrency.state.md
  • Investigation (cross-campaign): thoughts/shared/investigations/2026-05-06-diagnose-llm-throughput-rotten.md
  • Runbook: docs/admin/llm-config.md § Operator concurrency tuning

🤖 Generated with Claude Code

jstuart0 and others added 10 commits May 6, 2026 16:20
…ovider

Adds workers/common/llm/concurrency.py (ProviderGateRegistry, _HostGate,
_KindGate, _KindCounter, ProviderGate facade, ConcurrencyGatedProvider,
ConcurrencyConfig). Sentinel-uncapped defaults; tenacity predicate is a
no-op until Phase 3; stream() is pass-through until Phase 6. Module is
unwired — kill switch default-on but no factory threads it through yet
(Phase 2).

Adds workers/common/embedding/concurrency.py (ConcurrencyGatedEmbeddingProvider)
sharing the same registry via kind="embedding".

Extends workers/common/llm/fake.py with fail-mode kwargs (raise_on_attempts,
exc, delay_seconds, responses queue) for Phase 3 fault-injection tests.

Adds tenacity>=8.5.0 and aiolimiter>=1.2.1 to workers/pyproject.toml.

20 new unit tests (test_concurrency.py) all passing; full 572-test suite
green; new files lint-clean.

Refs CA-169 / plan v4.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…servicers, CLI, and capability endpoint

All LLM/embedding factory functions now accept gate_registry so long-lived
startup providers are wrapped in ConcurrencyGatedProvider/ConcurrencyGatedEmbeddingProvider.
ProviderGateRegistry is constructed once in __main__.py and threaded into every
servicer. GetProviderCapabilities (Decision 12) now sources max_concurrent_calls
from the gate registry's effective value for the resolved-context provider, with
a WorkerConfig legacy fallback when the kill switch is off. Added cli_main.py
with build_cli_runtime_provider helper; CLI entry points and benchmark runner
updated to use it with proper registry shutdown. resolve_provider_for_context
returns a third ProviderResolutionKey element so callers can look up the correct
gate without re-wrapping. New conftest.py fixture and ~10 new Phase 2 tests cover
factory wrapping, capability endpoint, metadata override path, kill-switch fallback,
CLI helper, and benchmark provider construction. Refs: CA-169 / plan v4 Phase 2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ries; load real defaults; raise local caps (atomic per Decision 3)

Cross-campaign gate: dick's capacity-detection investigation (Outcome A,
2026-05-06) ruled out H1/H5 (model-swap thrashing, single-request latency
spikes) as primary throughput killer; H2 (serial loops + hand-rolled retry
contention) confirmed. OLLAMA_NUM_PARALLEL=1 operator-fixed. Aggressive cap
raise to 4 safe per Outcome A findings.

Decision 3 atomic commit — all of the following land together so the system
is never in a half-migrated state:

1. SDK retry disabled
   - AsyncOpenAI(max_retries=0) in openai_compat.py
   - AsyncAnthropic(max_retries=0) in anthropic.py
   Tenacity now owns all retry; SDK built-in retry would double-count 429s.

2. Hand-rolled retry loops deleted (two sites)
   - hierarchical.py: for-attempt loop with _is_provider_compute_error +
     asyncio.sleep removed; replaced with single-attempt try/except
   - renderers.py: _render_with_retry() method deleted; all 4 call sites
     replaced with direct require_nonempty(await complete_with_optional_model(...))

3. errors.py DEPRECATED banner added
   - is_provider_compute_error() retained for backward compat; docstring
     updated explaining both hand-rolled retry loops are deleted and tenacity
     owns retry via _retry_predicate

4. router.py RetryError unwrap
   - _unwrap_retry_error() helper added
   - complete() and stream() unwrap tenacity.RetryError before logging so
     callers see the root cause (e.g. RateLimitError) not a RetryError wrapper

5. Tenacity retry predicate (Decision 4 whitelist)
   _retry_predicate(exc) in concurrency.py:
   - openai.RateLimitError → always retry
   - openai.APIStatusError with status in {408, 429, 502, 503, 504} → retry
   - anthropic.RateLimitError → always retry
   - anthropic.APIStatusError with status in {408, 429, 502, 503, 504} → retry
   - httpx.TimeoutException, httpx.ConnectError, httpx.ReadError → retry
   - 401 Unauthorized, ValidationError → NOT retried

6. Retry-After header honoring
   _extract_retry_after(exc) parses response.headers["retry-after"] (int or
   HTTP-date). _make_before_sleep() factory wires it into before_sleep callback
   using max(computed_backoff, retry_after) so a small Retry-After: 1 does not
   subvert sustained-429 exponential backoff.

7. Empty-content retry preserved (openai_compat.py lines 249-313)
   This block handles <think>-budget exhaustion on reasoning models — not a
   network retry concern. It is UNTOUCHED and continues to live inside
   OpenAICompatProvider._complete_ollama_native() and complete(). See Phase 1
   comment in openai_compat.py for rationale.

8. Real defaults loaded in ConcurrencyConfig.from_env() (Decision 6)
   Provider LLM caps pre-populated from Decision 6 table:
     ollama:1, vllm:4, llama-cpp:4, sglang:4, lmstudio:2,
     openai:8, anthropic:4, openrouter:8, gemini:8, openai-compatible:4
   Env-var overrides (SOURCEBRIDGE_WORKER_LLM_MAX_CONCURRENT) still apply on
   top. retry_max_attempts default raised 1→5.

9. Local fan-out caps raised (Outcome A safe per dick's findings)
   hierarchical.py: DEFAULT_LEAF/FILE/PACKAGE_CONCURRENCY 1/2/2 → 4/4/4
   renderers.py: deep_parallelism / deep_repair_parallelism 2/2 → 4/4

10. aiolimiter RPM shaping wired
    ConcurrencyGatedProvider.__init__ instantiates AsyncLimiter(config.rpm, 60)
    when config.rpm is set; limiter.acquire() called before upstream in
    complete() and stream(). Rate-limit unit tests use _SimpleLimiter stub
    (aiolimiter uses self._loop.time() internally — no clock injection seam).

11. Tests: 43 new tests in test_concurrency_phase3.py (629 total, 1 slow
    deselected). @pytest.mark.slow registered in pyproject.toml. Covers:
    cap raises, compose, host-gate Ollama URL, 429 retry, Retry-After header,
    SDK retry disabled, 401/ValidationError no-retry, rate-limit unit,
    503 retry, slot cancellation, Decision 2 layering, jitter spread,
    registry lifecycle, _retry_predicate units, router unwrap, Decision 6
    defaults, empty-content retry preserved.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
workers/requirements/spec_extraction.py: replaces the serial
for-loop over `groups.items()` with bounded concurrent fan-out
(LOCAL_GROUP_FANOUT_LIMIT=8, asyncio.gather with serial result
merge). Provider gate is the binding upstream cap; local
semaphore prevents N-thousand pending coroutines on huge repos.

Order-sensitive accumulation preserved: results merged in
groups.items() insertion order (Python dict ordering). Token
counts summed; model_name is last-writer-wins across groups
(non-deterministic under fan-out, but semantically equivalent
to the old serial last-write). Per-LLM failures caught inside
_refine_one; framework-level errors surfaced via log.warning.

Refs CA-169 / plan v4 Phase 4.
workers/common/embedding/openai_compat.py: chunked batches now
issue concurrently (LOCAL_EMBEDDING_FANOUT_LIMIT=4, asyncio.gather
with order-preserved flatten). Output[i] corresponds to input[i]:
batches are indexed, gathered in index order, then flattened in
iteration order — no shuffle possible.

workers/common/embedding/ollama.py: chunk loop stays serial;
one-line comment citing plan Decision 1 + Decision 8 (Ollama host
gate combines LLM + embedding kinds; separate fan-out would race
against the single-slot host semaphore).

Refs CA-169 / plan v4 Phase 5.
Proto: add float current_tokens_per_second = 14 to KnowledgeStreamProgress;
regenerate Go + Python stubs.

Go: extend ReportProgress(…, throughputTPS float64) and SetProgress(…, throughputTPS)
through the full 8-layer chain — progressWriter callback → handleProgress →
llm.Runtime interface/impl → JobStore interface → MemStore → Job struct →
SurrealDB DTO (current_tokens_per_second *float64, omitempty) → monitorJobView.
orchestrator.runtime caches lastThroughputTPS; writeProgressLocked persists it.

TypeScript: add current_tokens_per_second?: number to LLMJobView; render a
tok/s pill in ActiveJobCard (admin monitor) when status=generating and value > 0.

Python worker: ProviderGateRegistry gains snapshot_tokens_per_second(provider,
base_url, kind) + _run_aggregator() task (emits llm_provider_gate_metrics log
at metrics_interval_seconds cadence, started in __init__, cancelled in close()).
OpenAICompatGatedProvider overrides stream() with stream_options={"include_usage":
True} + 400 fallback (marks streaming_usage_unsupported, falls back to raw.stream).
AnthropicGatedProvider overrides stream() using raw_client.messages.stream() +
get_final_message() for output_tokens. wrap_provider() dispatches on provider type.
KnowledgeServicer gains gate_registry kwarg + _snapshot_tokens_per_second() helper;
all six async-for-prog loops set prog.current_tokens_per_second before yielding.
progress_event() in streaming.py gains current_tokens_per_second param.

Tests (Go): TestStreamProgressDriverPropagatesThroughputTPS (stream driver →
throughputTPS flows to both rt and write callback),
TestMonitorJobViewCurrentTokensPerSecondSerializesWhenNonZero /
…OmittedWhenZero (omitempty serialization contract).

Tests (Python): 8 cases in test_concurrency_phase6.py covering aggregator
metrics log, non-streaming ring buffer, OpenAICompatGatedProvider usage
extraction, AnthropicGatedProvider usage extraction, 400 fallback,
streaming_usage_unsupported flag, cancellation no-record, and close() task teardown.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…y with gate_snapshot

Adds GetLLMGateSnapshot RPC to ReasoningService (additive method, existing
service; explicit empty request struct for forward-compat). Worker exposes
ProviderGateRegistry.snapshot() returning per-(provider, base_url_normalized,
kind) gates. Go REST handler proxies to the worker (1s in-memory cache),
populates monitorActivityResponse.gate_snapshot. TS LLMGateEntry type +
admin monitor page renders a new "LLM Gate Activity" table.

No new endpoint, no new admin route, no new gRPC service — reuses
/api/v1/admin/llm/activity and existing ReasoningService. Field is omitempty
so old/kill-switched deployments don't surface misleading empty arrays.
_UNCAPPED sentinel (sys.maxsize) is clamped to 0 before writing to int32
proto field, matching the (known=true, calls=0) unbounded encoding.

Refs CA-169 / plan v4 Phase 7.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…t-refactors entry

docs/admin/llm-config.md gains "Operator concurrency tuning" section: env-var
resolution order (with legacy seed precedence per Decision 12), Decision 6
default-values table, capacity-contract rewiring note, RPM specifics, combined
Ollama cap rule, server-side companion knobs (cites dick's investigation),
Phase 7 admin monitor pointer.

CLAUDE.md ## Recent refactors gains an entry mirroring the orchestrator-capacity-
detection format, with load-bearing constraints (don't re-enable SDK retry;
kill switch is the rollback path; registry construction conventions; empty-content
retry preservation; gate authoritative for GetProviderCapabilities).

Refs CA-169 / plan v4 Phase 8.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…v vars

Decision 7 / codex r2 L1: ConcurrencyConfig.from_env() now scans
SOURCEBRIDGE_LLM_PROVIDER_*_{MAX_CONCURRENT,RPM} and
SOURCEBRIDGE_EMBEDDING_PROVIDER_*_MAX_CONCURRENT env-var names for
tokens that don't match the canonical providers list. Unknown tokens
produce a structlog WARNING (event=concurrency_config_unknown_provider_token)
rather than a silent no-op, so an operator typo like OPENAICOMPAT_MAX_CONCURRENT
(instead of OPENAI_COMPATIBLE_MAX_CONCURRENT) is surfaced at startup
without crashing existing deployments.

Replaces the comment-stub test_concurrency_config_rejects_unknown_provider_token
with a real test that asserts no exception, valid env-var still parsed,
and the structlog warning is emitted with the correct fields.

Closes the punch-list item from valerie's verification pass.

Refs CA-169 / plan v4 Decision 7.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ow-test marker

Lint (ruff):
- N806: hoist function-local constants to module scope (_LOCAL_PROBE_PROVIDERS
  in __main__.py, _LLM_SUFFIXES/_EMBED_SUFFIXES in concurrency.py,
  LOCAL_GROUP_FANOUT_LIMIT/_GroupResult in spec_extraction.py).
- F401: drop unused pydantic.model_validator import in common/config.py;
  drop unused ProbeBackend import in test_concurrency_probe.py;
  drop unused asyncio import in test_servicer_gate_snapshot.py.
- B904: re-raise with `from err` in WorkerConfig int validator.
- F541: drop f-prefix on f-string with no placeholders in __main__.py.
- SIM105: use contextlib.suppress in concurrency_probe try/except/pass.
- I001: ruff auto-fixed import sort order in __main__.py,
  reasoning/servicer.py, test_grpc_auth_interceptor.py,
  test_servicer_gate_snapshot.py, test_concurrency_phase4.py.

Tests:
- create_llm_provider() now wraps with ConcurrencyGatedProvider whenever
  gate_registry is supplied, regardless of config.test_mode. The previous
  `not config.test_mode` guard caused test_cli_review_constructs_registry
  and test_benchmark_constructs_registry to receive an unwrapped
  FakeLLMProvider in CI (SOURCEBRIDGE_TEST_MODE=true), making the
  isinstance(provider, ConcurrencyGatedProvider) assertion fail.
- CI workflow now passes -m "not slow" to pytest so the real-clock
  aiolimiter spacing test (test_wrapper_rate_limits_with_real_aiolimiter)
  is consistently skipped in containerized CI.

Refs CA-169 / PR #43.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jstuart0 jstuart0 merged commit 3093791 into main May 7, 2026
13 checks passed
@jstuart0 jstuart0 deleted the feat/worker-llm-concurrency branch May 7, 2026 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant