Add Adaptive Routing MSE inflight reqs stats#18553
Conversation
cc stripe-private-oss-forks/pinot-reviewers r? dang r? Records per-server in-flight request counts for MSE queries so that NumInFlightReqSelector and HybridSelector can start to factor in MSE load. - MultiStageBrokerRequestHandler: accept optional ServerRoutingStatsManager via new constructor overload; pass through to QueryDispatcher - BaseBrokerStarter: pass _serverRoutingStatsManager to the handler - QueryDispatcher.submitAndReduce: when statsManager is present, call recordStatsForQuerySubmission for each dispatched server after submit(), and decrement inflight for all servers in the finally block via recordStatsUponResponseArrival(..., -1) Latency (recordStatsUponResponseArrival with real values) and per-sender arrival granularity are left to a follow-up PR once per-sender EOS interception is in place (see TODO in submitAndReduce). Will be addressed by https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/596. [STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418) Before this deploy, a 60 minute PerformanceTestWorkflow with 5 users had [~70QPS and about 50-90 inflight requests per 5 minutes. ](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%22ai4%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22expr%22:%22sum%28increase%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_num_in_flight_requests%5C%22,%20pinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B5m%5D%29%29%22,%22hide%22:false,%22instant%22:false,%22legendFormat%22:%22num_inflight_requests%22,%22range%22:true,%22refId%22:%22B%22,%22interval%22:%225m%22%7D,%7B%22refId%22:%22A%22,%22expr%22:%22topk%28200,%5Cn%20%20sum%28%20%5Cn%20%20%20%20rate%28pinot_server_queries%7Bpinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B$__rate_interval%5D%29%5Cn%20%20%29%5Cn%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22QPS%22%7D%5D,%22range%22:%7B%22from%22:%221776173296750%22,%22to%22:%221776177660423%22%7D%7D%7D&orgId=1) With this deployed to rad-canary, the same workflow [shows ~70QPS but ~140 inflight requests. ](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%22ai4%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22expr%22:%22sum%28increase%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_num_in_flight_requests%5C%22,%20pinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B5m%5D%29%29%22,%22hide%22:false,%22instant%22:false,%22legendFormat%22:%22num_inflight_requests%22,%22range%22:true,%22refId%22:%22B%22,%22interval%22:%225m%22%7D,%7B%22refId%22:%22A%22,%22expr%22:%22topk%28200,%5Cn%20%20sum%28%5Cn%20%20%20%20rate%28pinot_server_queries%7Bpinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B$__rate_interval%5D%29%5Cn%20%20%29%5Cn%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22QPS%22%7D%5D,%22range%22:%7B%22from%22:%221776266161170%22,%22to%22:%221776270844104%22%7D%7D%7D&orgId=1). Inflight requests decrease back to baseline after the perf test. num_inflight_requests roughly doubled, which makes sense since the workflow issues half SSE and half MSE requests. (num_inflight_ops is captured every 10s. Since these canary queries run quickly, often the value of num_inflight_ops can be 0 at the time of the capture). Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-04-15T21:05:34Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/597
cc stripe-private-oss-forks/pinot-reviewers r? dang MSE and SSE now maintain independent per-server stats in ServerRoutingStatsManager, preventing one engine's infra issues from poisoning the other's adaptive routing decisions. There's a lot of changes here mostly because of interface-level changes to include QueryType in signatures. - Add separate ConcurrentHashMap for MSE stats in ServerRoutingStatsManager - Add QueryType-aware overloads for recording and fetching stats - Thread QueryType through ServerSelectionContext -> PriorityPoolInstanceSelector -> AdaptiveServerSelector - MSE queries record to and read from MSE-specific stats via QueryType.MSE - SSE queries record to and read from MSE-specific stats via QueryType.SSE. (removed existing no-arg methods to avoid future devs accidentally invoking the SSE option) [STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418) Deployed to rad-canary and [ran a chaos test that induced latency on MSE and SSE seperately](https://amp.qa.corp.stripe.com/chaos-scenarios/config/rad-canary-server-latency). We saw the MSE and SSE latency EMAs spike at different times. https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778699885144&to=1778700484021 <img width="1279" alt="Screenshot 2026-05-13 at 3 30 27 pm" src="https://git.corp.stripe.com/user-attachments/assets/71dea37a-582f-4290-9240-ec0f779017fa" /> Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-05-15T12:48:15Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/643
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18553 +/- ##
============================================
+ Coverage 63.68% 64.29% +0.60%
+ Complexity 1684 1126 -558
============================================
Files 3262 3311 +49
Lines 199835 203856 +4021
Branches 31034 31726 +692
============================================
+ Hits 127266 131064 +3798
+ Misses 62416 62282 -134
- Partials 10153 10510 +357
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@yashmayya @Jackie-Jiang Can you TAL at this PR that adds initial support for adaptive routing stats? I have a follow up PR that will enable latency tracking. Both are working well on our QA clusters. |
There was a problem hiding this comment.
The overall direction of separating MSE/SSE stats looks right. Six MAJOR concerns below, posted inline:
- A silent fall-through in
getStatsMap()lumpsTSEqueries into the SSE bucket and silently routes non-query-thread callers to SSE. - The
/debug/serverRoutingStatsendpoint andgetServerRoutingStatsStr()still expose only the SSE map — operators triaging MSE adaptive routing via the debug surface are blind after this PR. - The async
_executorServiceis a 2-thread pool with FIFO submission but unordered execution — the decrement task can land before the increment task, producing a transient negative inflight count and a wrong EMA sample. Preexisting for SSE, but amplified for MSE because the increment→decrement window shrinks torunReducerduration. recordStatsUponResponseArrival(..., -1)skips the latency update — so for MSE,_latencyMsEMAstays at_avgInitializationValforever, auto-decay never fires, and the newADAPTIVE_SERVER_MSE_LATENCY_EMA/ADAPTIVE_SERVER_MSE_HYBRID_SCOREgauges export inert values that nonetheless feedHYBRID/LATENCYMSE routing decisions via the standard instance selectors.BaseBrokerStarter.createMultiStageBrokerRequestHandleris aprotectedextension point whose signature changed — downstream broker starters that overrode it will silently stop being invoked. The constructor onMultiStageBrokerRequestHandleritself preserved its old form; this factory didn't.QueryDispatcherTest.testStatsManagerRecordsSubmissionAndArrivalForDispatchedServersuses aMockito.mockand only checks "method called at least once" — it doesn't verify ordering, doesn't exercise the partial-submit()-failure path the inline comments claim to defend, and doesn't cover the null-statsManageroverload.
| private final BrokerMetrics _brokerMetrics; | ||
| private volatile boolean _isEnabled; | ||
| private ConcurrentHashMap<String, ServerRoutingStatsEntry> _serverQueryStatsMap; | ||
| private ConcurrentHashMap<String, ServerRoutingStatsEntry> _mseServerQueryStatsMap; |
There was a problem hiding this comment.
MAJOR — debug surface silently omits the new MSE map.
The new _mseServerQueryStatsMap is not exposed by any of the public read APIs of this class:
getServerRoutingStats()(line 236) still returns only_serverQueryStatsMap, so the/debug/serverRoutingStatsREST endpoint (PinotBrokerDebug.java:280) returns SSE-only data after this PR.getServerRoutingStatsStr()(line 243) still iterates only_serverQueryStatsMap.
After this PR, the only programmatic view into MSE adaptive-routing state is the Prometheus gauges. Operators triaging routing decisions via the debug REST endpoint will be silently blind for MSE.
Suggest either growing the response payload to Map<QueryType, Map<String, ServerRoutingStatsEntry>> or adding a parallel debug endpoint.
| } | ||
|
|
||
| private ConcurrentHashMap<String, ServerRoutingStatsEntry> getStatsMap(QueryType queryType) { | ||
| return queryType == QueryType.MSE ? _mseServerQueryStatsMap : _serverQueryStatsMap; |
There was a problem hiding this comment.
MAJOR — two silent fall-throughs in the new dispatch.
QueryExecutionContext.QueryTypehas three values:SSE,MSE,TSE. This ternary lumpsTSEinto the SSE bucket. TodayTimeSeriesRequestHandlerdoesn't callrecordStats*, so this is dormant — but the moment anyone wires TSE adaptive routing, TSE traffic will silently poison SSE stats, which is the exact engine cross-talk the PR exists to prevent.- The no-arg
getStatsMap()below defaults to the SSE map whenQueryThreadContext.getIfAvailable()isnull. Any caller invoked from a non-query thread (debug REST handlers, periodic tasks, future SPI consumers) silently reads/writes the SSE map with no warning.
Suggest: switch exhaustively on the enum and either throw or log on an unknown/unset query type, or add a third TSE-map slot up front. At minimum, document on the method that the caller is expected to run with a QueryThreadContext set.
| // implementation details of `submit`. | ||
| if (statsManager != null) { | ||
| for (QueryServerInstance server : servers) { | ||
| statsManager.recordStatsForQuerySubmission(requestId, server.getInstanceId()); |
There was a problem hiding this comment.
MAJOR — increment/decrement can be reordered on the stats-manager executor, causing a transient negative inflight count and a wrong EMA sample.
recordStatsForQuerySubmission (here) and recordStatsUponResponseArrival (line 219 in the finally) both enqueue tasks on ServerRoutingStatsManager._executorService, which is a newFixedThreadPool(2) by default. The FIFO submission queue does NOT guarantee execution order across the two worker threads — for any single server X, the decrement task can acquire the per-server write lock before the increment task. When that happens:
_numInFlightRequestsmomentarily becomes-1, then0after the increment lands.- The transient
-1is read by any concurrentfetchNumInFlightRequestsForServerbecause_numInFlightRequestsisvolatile. - The EMA is sampled only inside
updateNumInFlightRequestsForQuerySubmission(ServerRoutingStatsEntry.java:99), so it gets sampled at0instead of the expected+1.
This race is preexisting for SSE (AsyncQueryResponse uses the same executor), but the increment→decrement window in MSE is just runReducer duration (often milliseconds) rather than a server-side roundtrip — meaningfully more likely to reorder.
Options: serialize per-server (per-server work channel keyed by serverInstanceId), single-thread the executor, or apply the decrement synchronously while submitting the EMA update asynchronously.
| } finally { | ||
| if (statsManager != null) { | ||
| for (QueryServerInstance server : incrementedServers) { | ||
| statsManager.recordStatsUponResponseArrival(requestId, server.getInstanceId(), -1); |
There was a problem hiding this comment.
MAJOR — latency EMA for MSE entries will be stuck at the init value forever, and the new MSE gauges will export that stuck value.
Passing latency = -1 is handled safely by updateStatsUponResponseArrival (it skips updateLatency), but the consequences go beyond "no latency yet":
_latencyMsEMA._lastUpdatedTimeMsstays at0for every MSE entry, so the EMA's auto-decay scheduled task never fires._latencyMsEMA.getAverage()returns_avgInitializationVal(default1.0) for the lifetime of the broker.ADAPTIVE_SERVER_MSE_LATENCY_EMAtherefore exports a flat constant — operators wiring dashboards will assume the gauge is broken and stop trusting it.ADAPTIVE_SERVER_MSE_HYBRID_SCOREisinflight^exp * latencyEma, which collapses toinflight^exp * initVal. If_avgInitializationVal == 0.0, the MSE hybrid score is identically0for all servers.
This isn't just a v0 inert-data concern: BalancedInstanceSelector and ReplicaGroupInstanceSelector (the common MSE leaf selectors via BaseInstanceSelector._priorityPoolInstanceSelector) DO invoke _adaptiveServerSelector — and via the new getStatsMap(), those reads now hit the MSE map, where these values are stuck. With HYBRID or LATENCY selector configured, MSE routing decisions will be derived from stuck data.
Until the TODO above lands, suggest either: gate ADAPTIVE_SERVER_MSE_LATENCY_EMA / ADAPTIVE_SERVER_MSE_HYBRID_SCORE from being exported (so dashboards don't show misleading flat lines), or fall back to the SSE entry's latency EMA when the MSE entry has never received a real latency sample, or document loudly in the metric Javadoc that these gauges are inert until follow-up.
|
|
||
| for (String instanceId : expectedInstanceIds) { | ||
| Mockito.verify(statsManager).recordStatsForQuerySubmission(requestId, instanceId); | ||
| Mockito.verify(statsManager).recordStatsUponResponseArrival(requestId, instanceId, -1L); |
There was a problem hiding this comment.
MAJOR — this test does not actually verify the contract its production code claims to defend.
Mockito.verify(statsManager).recordStatsForQuerySubmission(...) only checks the method was called at least once per server. It does not catch any of the bugs this PR's production code comments are explicitly guarding against:
- Ordering: a future refactor that moves the decrement out of the
finally(or breaks theincrementedServersset bookkeeping) still passes this test. - Partial-failure path: the inline comment in
submitAndReducesays "guarding against a partial failure in submit()". But the only error path exercised here isrunReducerthrowing NPE due to the mockedMailboxService— i.e., the path wheresubmit()succeeded. The actual partial-submit()-failure case isn't covered. - Legacy
submitAndReduce(... null)overload: no test asserts the null-statsManageroverload doesn't throw or change behavior. - Real-executor behavior: the manager is a Mockito mock, so the async executor and per-server lock ordering aren't exercised at all (see related comment about the executor race on
QueryDispatcher.java:201).
Suggested additions:
- A test where
submit()throws after partial dispatch and assertsincrementedServersis empty /recordStatsUponResponseArrivalis NOT called for any server. - A test exercising the legacy 4-arg
submitAndReduceoverload. - An end-to-end test using a real
ServerRoutingStatsManager(not a mock) that assertsnumInFlight == 0for every server aftersubmitAndReducereturns successfully — which is what the inline comments actually claim.
Add separate adaptive routing per-server stats for MSE and SSE, preventing one engine's infra issues from poisoning the other's adaptive routing decisions. (We saw a case on a Pinot cluster where MSE latency was impacted but SSE traffic was completely fine. We don't understand this scenario, but without splitting, the happy SSE queries would dilute the signal from MSE, and the MSE signals would cause unnecessry routing changes to SSE queries).
To start, we record per-server in-flight request counts for MSE queries. Latency (recordStatsUponResponseArrival with real values) and per-sender arrival granularity are left to a follow-up PR once per-sender EOS interception is in place (see TODO in submitAndReduce).
Gaps
Testing
With this deployed, we see MSE inflight_requests reported in the metrics.
When we ran MSE queries, then stopped, then ran SSE queries, we saw the inflight requests for each reflect the traffic.