fix(pusher): prevent ghost WebSocket connections leaking memory#7283
fix(pusher): prevent ghost WebSocket connections leaking memory#7283beastoin wants to merge 48 commits into
Conversation
Greptile SummaryThis PR reduces pusher pod resource requests from
Confidence Score: 4/5Safe to merge with monitoring in place; the scheduling math is correct and the change unblocks HPA scaling as intended. The change is a single-file, well-documented resource request reduction backed by real utilization data. The capacity arithmetic checks out end-to-end. The one area to watch is the memory outlier pod (2964Mi observed vs 2.5Gi new request): under Burstable QoS that pod is an eviction candidate during node memory pressure, which is a present operational risk rather than a purely theoretical one. Keeping limits at 4.5Gi and deploying to a dedicated node pool mitigates the worst outcomes, but the gap between request and observed peak is worth monitoring closely during rollout. Only backend/charts/pusher/prod_omi_pusher_values.yaml changed; watch memory eviction metrics for pusher pods after rollout, particularly the outlier pod that has previously reached ~2964Mi. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
HPA["HPA\n(min: 20, max: 40)\nmetric: activeConnectionsPerPod=30"]
HPA -->|"wants >24 pods\n(was blocked)"| SCHED
subgraph SCHED["Kubernetes Scheduler"]
direction TB
OLD["Before\nCPU req: 700m\nMem req: 4.5Gi\n→ 2 pods/node\n→ 24 max schedulable"]
NEW["After\nCPU req: 350m\nMem req: 2.5Gi\n→ 4 pods/node\n→ 48 max schedulable"]
end
SCHED --> NODE["12 pusher-pool-v3 nodes\nAllocatable: 1930m CPU / 13485Mi Mem\nUsable after DaemonSet: 1571m / 12362Mi"]
NODE --> QOS["QoS: Burstable\nLimits: 700m CPU / 4.5Gi Mem\n(unchanged)"]
QOS --> RISK["Risk: pods using >\nrequest memory\nare eviction candidates\nunder node pressure"]
QOS --> OK["HPA can now scale\nup to 40 replicas\n(42 with maxSurge=2)\n< 48 capacity ✓"]
Reviews (1): Last reviewed commit: "fix(pusher): increase memory request to ..." | Re-trigger Greptile |
| cpu: 0.7 | ||
| memory: 4.5Gi | ||
| cpu: 0.35 | ||
| memory: 3Gi |
There was a problem hiding this comment.
Memory request below observed peak usage
The new 2.5Gi request (2560Mi) is below the single observed outlier pod at 2964Mi. Under Burstable QoS, kubelet evicts pods whose actual memory usage exceeds their request when a node hits memory pressure — meaning that outlier pod becomes an eviction candidate even without approaching the 4.5Gi limit. Since this is a dedicated node pool the risk is low during normal operation, but a coordinated spike (e.g., multiple pods hitting jemalloc fragmentation peaks simultaneously) could trigger cascading evictions. Consider adding an eviction alert on kube_pod_container_status_restarts_total for the pusher namespace as a rollout safeguard, and evaluate whether 2.75Gi or 3Gi is a safer floor given the observed outlier.
Changed-path coverage checklist
L1 evidenceYAML validation:
Capacity math (automated script):
beast omi dev doctor: status=ok, no CRITICAL failures L1 synthesisBoth changed paths (P1, P2) validated via YAML parsing, constraint checking (requests ≤ limits), and capacity math proving 48 pods fit on the existing 12-node pool. Non-happy-path testing is not applicable — these are static numeric values in a Helm chart with no branching logic. No paths remain untested at L1. by AI for @beastoin |
|
@beastoin The pusher code change looks correct for the ghost-connection leak, but this PR changes listen/pusher timeout and shutdown processing ( by AI for @beastoin |
Test Evidence — CP8 + CP9A + CP9B CompleteCP8: Test Coverage (TESTS_APPROVED)31 unit tests across 7 test classes, all passing:
CP9A: Level 1 — Standalone Pusher Test
CP9B: Level 2 — Backend + Pusher Integrated
Changed-Path Coverage
by AI for @beastoin |
f662146 to
55ca69f
Compare
Pusher pods request 700m CPU / 4.5Gi memory (Guaranteed QoS) but actual usage is 9-226m CPU (avg 97m) and 445-2964Mi memory (avg 564Mi). With 12 nodes at 1930m allocatable CPU each and ~359m DaemonSet overhead, only 2 pods fit per node = 24 max, exactly at the HPA ceiling. Lower requests to 350m CPU / 2.5Gi memory while keeping limits at 700m / 4.5Gi. This doubles capacity from 24 to 48 pods (4 per node), giving the HPA (max 40) room to scale plus headroom for maxSurge:2 rolling updates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
GCP Monitoring p99-max memory usage: 3291 MiB post-jemalloc (May 13). Adjust memory request from 2.5Gi to 3Gi for safer margin while maintaining 4 pods/node capacity (both CPU and memory balanced). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…auge Root cause: when a background task (GCS upload, webhook, diarizer) hangs after WebSocket disconnect, asyncio.gather() for all 5 main tasks blocks forever. The finally block never executes, so the gauge is never decremented and ~15MB per connection is leaked. Evidence from prod: pod mhjxw shows gauge=216 but only 22 real TCP connections (194 ghosts), consuming 3.3GB vs 450MB baseline. Fix: - Add WS_RECEIVE_TIMEOUT (300s) to detect dead connections - Await receive_task separately instead of gathering all 5 tasks - Give background tasks BG_DRAIN_TIMEOUT (30s) to drain, then cancel - Ensure finally block always runs and gauge is always decremented Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests verify: - Receive timeout fires on dead connections - Background tasks are force-cancelled after drain timeout - Gauge is always decremented regardless of task state - Old gather-all pattern demonstrated to leak with hung tasks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d items Review feedback: speaker samples queued < 120s before disconnect were silently dropped by the 30s drain timeout. Now the age check is skipped on shutdown so pending samples get processed during drain. Also logs queue depths when the drain timeout fires for observability. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ructural tests Review feedback: constants now parsed from pusher.py via AST instead of copied. Added speaker sample shutdown drain test and structural tests verifying the source has expected patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… timing constants Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… verification Addresses tester feedback: adds exact value assertions for WS_RECEIVE_TIMEOUT (300s) and BG_DRAIN_TIMEOUT (30s), plus 9 AST-based structural tests that verify the actual production code flow in _websocket_util_trigger — gauge inc/dec placement, receive-then-drain ordering, force-cancel after timeout, bg_main_tasks separation from receive_task, and is_shutdown guard. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… connections Same asyncio.gather() antipattern as pusher — gathering up to 11 tasks means any hung bg task blocks cleanup and gauge dec forever. Separate receive task, drain bg with BG_DRAIN_TIMEOUT (30s), force-cancel stragglers. Add WS_RECEIVE_TIMEOUT (300s) to detect dead TCP connections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codifies receive-then-drain pattern, receive timeouts, gauge placement, task tracking, executor bounds, and process-scoped dict cleanup as mandatory rules for long-lived WebSocket handlers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both constants now exist in both pusher.py and transcribe.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replaces bare 'await receive_task' with asyncio.wait(FIRST_COMPLETED) so
bg task crashes are detected immediately during active sessions, not hours
later at drain. Adds task names (name=f"ws:{uid}:...") for production
debugging. Keeps bounded drain + force-cancel on disconnect.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pattern Same upgrade as pusher — detects bg task crashes immediately during active WS sessions. Adds task names for all 11 possible tasks. Cancels receive task if a bg crash triggered the supervisor exit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3 new tests: bg crash detected immediately, normal disconnect still works, task names assigned. Updated structural tests to verify asyncio.wait (FIRST_COMPLETED) pattern instead of bare await. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace receive-then-drain with asyncio.wait(FIRST_COMPLETED) supervisor as the canonical pattern. Add task naming requirement. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Finite bg tasks that complete normally during an active session no longer trigger supervisor exit. The loop re-waits for remaining tasks — only disconnect or crash exits the supervisor. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nversations process_pending_conversations completes after ~7s, speaker_id_task can return immediately when disabled. Supervisor loop re-waits for remaining tasks instead of tearing down a healthy session. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Verifies supervisor loop doesn't tear down a healthy session when a finite bg task (e.g. process_pending_conversations) completes normally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…k naming rule Supervisor pattern now shows the loop that handles finite bg tasks. Task naming rule narrowed to WebSocket lifetime tasks (not all create_task). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Heartbeat completing normally (90s inactivity) should tear down the session immediately, not re-wait until WS_RECEIVE_TIMEOUT (300s). Only finite tasks (pending_conversations, speaker_id) skip teardown on normal completion. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests Adds behavioral test for lifetime task teardown and structural tests verifying transcribe.py has finite_tasks set and supervisor pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Backend rules require all imports at module top level. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…k on early return Four early returns (bad uid, unsupported language, missing user, closed websocket) between the old gauge.inc() and the try/finally would leak the connection gauge. Moving inc() inside the try block pairs it with dec() in finally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e and supervisor Adds AST-verified tests for transcribe.py: gauge in try/finally, supervisor before drain ordering, no gauge before try block. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…s.py) Replaces raw asyncio.gather() patterns with typed, bounded, observable utilities: - supervise_tasks(): FIRST_COMPLETED supervisor loop for WS handlers - drain_tasks(): timeout-bounded task cancellation with force-cancel - gather_with_logging(): bounded fan-out with exception logging - gather_chunked(): chunked fan-out for large coroutine lists - create_named_task(): tracked task creation with done-callback cleanup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace inline supervisor loop and manual cancel-gather with supervise_tasks(), drain_tasks(), and create_named_task() from utils/async_tasks.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace inline supervisor loop, manual cancel-gather, and wait_for+gather drain patterns with supervise_tasks(), drain_tasks(), and create_named_task() from utils/async_tasks.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw asyncio.gather(return_exceptions=True) with gather_with_logging() — exceptions are now logged instead of silently swallowed, and concurrency is bounded via semaphore. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests cover all 5 utilities: create_named_task (3), drain_tasks (5), supervise_tasks (5), gather_with_logging (6), gather_chunked (3), plus 4 structural tests verifying routers use the utilities. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ities Update AST-based tests to verify supervise_tasks() and drain_tasks() usage instead of raw asyncio.wait/gather patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…finite hang Reviewer finding: after timeout, the force-cancel gather could block forever if a task suppresses CancelledError. Now uses asyncio.wait with 5s bound and logs stuck tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…osion
Reviewer finding: labels like pusher:{uid} create unbounded metric
cardinality at 1800 connections. Use static labels (pusher, pusher_bg,
pusher_cleanup) and log uid separately.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…explosion
Reviewer finding: labels like listen:{uid}:{session_id} create
unbounded metric cardinality. Use static labels (listen, listen_bg,
listen_cleanup, listen_speaker_rollover, listen_speaker_final).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds 7 new tests covering tester findings: - drain_tasks force-cancel count reporting - drain_tasks mixed done/running tasks - gather_with_logging all-fail case - gather_with_logging None return value (not confused with failure) - gather_chunked with failures and index tracking - no-raw-gather assertion in WS handlers - no-dynamic-uid-in-metric-labels structural check Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replaced verbose 70-line code example with concise bullet-point rules referencing utils/async_tasks.py utilities. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
55ca69f to
5673efb
Compare
…nnect Polling tasks mid-asyncio.sleep() can't observe shutdown, causing drain_tasks(cancel=False) to wait the full 30s timeout even for lightweight pollers. sleep_until_shutdown(event, seconds) wakes immediately when the shutdown event fires. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nnect Replaces raw asyncio.sleep() with sleep_until_shutdown() in: - _record_usage_periodically (60s -> instant wake) - send_heartbeat (10s -> instant wake) - _pusher_heartbeat (20s -> instant wake) - pusher reconnect backoff (variable -> instant wake) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replaces raw asyncio.sleep() with sleep_until_shutdown() in all queue processors: private_cloud, speaker_samples, transcripts, audio_bytes. Queues drain remaining items then exit cleanly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Check event.is_set() before wait_for to correctly return True when the event is already set, even with seconds=0. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…gaps) - drain_tasks with zero/negative timeout - gather_with_logging with max_concurrency=1 (serialization proof) - gather_chunked chunk ordering proof (chunk N finishes before N+1 starts) - sleep_until_shutdown with negative timeout - 88 total tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CP9 Changed-Path Coverage ChecklistChanged executable paths
L1 Live Test Evidence
by AI for @beastoin |
CP9B — Level 2 Live Test Evidence (Backend + Pusher Integrated)Services: Podcast 5m (integrated)
Disconnect (integrated)
Backend log tracesUnit tests
by AI for @beastoin |
Summary
Fixes ghost WebSocket connections in both pusher and transcribe that leak ~15MB each, and introduces production-grade structured concurrency utilities (
utils/async_tasks.py) following FastAPI best practices.Root cause
When a background task (GCS upload, webhook, diarizer call) hangs after the WebSocket disconnects, the old
asyncio.gather()pattern blocks forever — preventing cleanup. The connection gauge is never decremented and ~15MB per ghost connection is leaked.Fix: Structured concurrency utilities + supervisor pattern
New module:
utils/async_tasks.py— same pattern asutils/executors.py(thread pools) andutils/http_client.py(HTTP clients): named utilities with bounded resources, clean shutdown, and Prometheus observability.supervise_tasks()asyncio.wait(FIRST_COMPLETED)supervisor loopdrain_tasks()gather_with_logging()asyncio.gather(*coros, return_exceptions=True)(silent failures)gather_chunked()create_named_task()asyncio.create_task()sleep_until_shutdown()asyncio.sleep()in polling loopsInstant drain on disconnect (shutdown event pattern)
Problem:
drain_tasks(cancel=False)waits up to 30s for bg tasks, but polling tasks (e.g._record_usage_periodicallywith 60s sleep) are mid-asyncio.sleep()and can't observe shutdown — they sit idle until the sleep finishes or drain times out.Solution: Per-connection
asyncio.Event(shutdown_event) that fires on disconnect. All polling loops usesleep_until_shutdown(event, seconds)instead of rawasyncio.sleep(). Pollers wake instantly (<1ms) on disconnect while heavy tasks (LLM processing) still get their fullcancel=Falsegrace period.Affected loops:
_record_usage_periodically(60s),send_heartbeat(10s),_pusher_heartbeat(20s), pusher reconnect backoff (variable)process_private_cloud_queue(1s),process_speaker_sample_queue(15s),process_transcript_queue(1s),process_audio_bytes_queue(1s)Best practices applied (sourced from research)
BackgroundTasksonly for sub-second fire-and-forgetgather(return_exceptions=True)hides bugs — prefer explicit exception checkingGatherResult[T]type (codex recommendation): Return typed results instead ofNonefor failed itemsBug classes eliminated
gather(*coros, return_exceptions=True)with no inspectiongather_with_logging()logs every failure with labelgather()propagates exception but siblings keep runningsupervise_tasks()exits loop, thendrain_tasks()cleans upmax_concurrencyparameterasyncio.sleep()ignore shutdownsleep_until_shutdown()wakes pollers instantly via EventChanges
utils/async_tasks.pyrouters/pusher.pyrouters/transcribe.pyutils/app_integrations.pygather(return_exceptions=True)withgather_with_logging()tests/unit/test_async_tasks.pytests/unit/test_pusher_ghost_connections.pytest.shtest_async_tasks.pyCLAUDE.mdAGENTS.mddocs/.../listen_pusher_pipeline.mdxPrometheus metrics added
async_supervisor_exit_total{label,reason}async_drain_timeout_total{label}async_drain_duration_seconds{label}async_gather_failures_total{label}async_gather_duration_seconds{label}Test plan
Risks
drain_tasks(cancel=False)first waits, then force-cancels on timeout. The old inline pattern did the same but with explicitTimeoutErrorhandling. Risk is minimal since the utility implements the same logic..valueinstead. Onlyapp_integrations.pyis affected and its callers don't use return values from gather.finite_tasksset with only 2 members, plus structural tests.asyncio.Event. At 1800 concurrent connections this is ~1800 Event objects — negligible memory overhead (<1KB each).Closes #7280
🤖 Generated with Claude Code