Skip to content

Add Adaptive Routing MSE inflight reqs stats#18553

Open
timothy-e wants to merge 5 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-inflght-reqs
Open

Add Adaptive Routing MSE inflight reqs stats#18553
timothy-e wants to merge 5 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-inflght-reqs

Conversation

@timothy-e
Copy link
Copy Markdown
Contributor

@timothy-e timothy-e commented May 21, 2026

Add separate adaptive routing per-server stats for MSE and SSE, preventing one engine's infra issues from poisoning the other's adaptive routing decisions. (We saw a case on a Pinot cluster where MSE latency was impacted but SSE traffic was completely fine. We don't understand this scenario, but without splitting, the happy SSE queries would dilute the signal from MSE, and the MSE signals would cause unnecessry routing changes to SSE queries).

To start, we record per-server in-flight request counts for MSE queries. Latency (recordStatsUponResponseArrival with real values) and per-sender arrival granularity are left to a follow-up PR once per-sender EOS interception is in place (see TODO in submitAndReduce).

  • MultiStageBrokerRequestHandler: accept optional ServerRoutingStatsManager via new constructor overload; pass through to QueryDispatcher
  • BaseBrokerStarter: pass _serverRoutingStatsManager to the handler
  • QueryDispatcher.submitAndReduce: when statsManager is present, call recordStatsForQuerySubmission for each dispatched server after submit(), and decrement inflight for all servers in the finally block via recordStatsUponResponseArrival(..., -1)

Gaps

  1. Submit could fail part way through so we never get to the incrementing. To handle this, we either have to:
  • split up calculating the server list from the actual dispatch.
  • send the stats manager (or a hook) into the dispatch
  • be okay with sometimes under-reporting the number of inflight ops for a few servers. This approach is the simplest. Under-reporting is unlikely to result in a systemic bias towards a set of servers, and the stats are only used for adaptive routing.
  1. This isn't as accurate as MSE. Servers are marked as inflight from the beginning of the query until the end, which is not equal to the amount of time they were actually working. Resolving this would require passing live stats through the entire execution tree during query execution. Since this is only used for adaptive routing, perfect accuracy isn't required.
  • If a cluster generally handles queries that fan out to all/most servers, then most servers will have the same number of inflight requests, so this metric will have neglible impact.
  • If a cluster generally handles queries that target a smaller number of servers: every query that involves a degraded server will result in higher inflight requests for each server involved in the query. But every query that does not involve the degraded server will have shorter inflight requests. Eventually, this should average out to identifying the degraded server as the one with the most inflight requests.
  • Ultimately, we'll get more signal from the latency - it will just take a few seconds longer to appear.

Testing

With this deployed, we see MSE inflight_requests reported in the metrics.

When we ran MSE queries, then stopped, then ran SSE queries, we saw the inflight requests for each reflect the traffic.

timothy-e and others added 3 commits May 20, 2026 21:59
cc stripe-private-oss-forks/pinot-reviewers
r? dang
r?

Records per-server in-flight request counts for MSE queries so that NumInFlightReqSelector and HybridSelector can start to factor in MSE load.

- MultiStageBrokerRequestHandler: accept optional ServerRoutingStatsManager via new constructor overload; pass through to QueryDispatcher
- BaseBrokerStarter: pass _serverRoutingStatsManager to the handler
- QueryDispatcher.submitAndReduce: when statsManager is present, call recordStatsForQuerySubmission for each dispatched server after submit(), and decrement inflight for all servers in the finally block via recordStatsUponResponseArrival(..., -1)

Latency (recordStatsUponResponseArrival with real values) and per-sender arrival granularity are left to a follow-up PR once per-sender EOS interception is in place (see TODO in submitAndReduce). Will be addressed by https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/596.

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

Before this deploy, a 60 minute PerformanceTestWorkflow with 5 users had [~70QPS and about 50-90 inflight requests per 5 minutes. ](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%22ai4%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22expr%22:%22sum%28increase%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_num_in_flight_requests%5C%22,%20pinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B5m%5D%29%29%22,%22hide%22:false,%22instant%22:false,%22legendFormat%22:%22num_inflight_requests%22,%22range%22:true,%22refId%22:%22B%22,%22interval%22:%225m%22%7D,%7B%22refId%22:%22A%22,%22expr%22:%22topk%28200,%5Cn%20%20sum%28%20%5Cn%20%20%20%20rate%28pinot_server_queries%7Bpinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B$__rate_interval%5D%29%5Cn%20%20%29%5Cn%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22QPS%22%7D%5D,%22range%22:%7B%22from%22:%221776173296750%22,%22to%22:%221776177660423%22%7D%7D%7D&orgId=1)

With this deployed to rad-canary, the same workflow [shows ~70QPS but ~140 inflight requests. ](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%22ai4%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22expr%22:%22sum%28increase%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_num_in_flight_requests%5C%22,%20pinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B5m%5D%29%29%22,%22hide%22:false,%22instant%22:false,%22legendFormat%22:%22num_inflight_requests%22,%22range%22:true,%22refId%22:%22B%22,%22interval%22:%225m%22%7D,%7B%22refId%22:%22A%22,%22expr%22:%22topk%28200,%5Cn%20%20sum%28%5Cn%20%20%20%20rate%28pinot_server_queries%7Bpinot_cluster%3D~%5C%22rad-canary%5C%22,pinot_tenant%3D~%5C%22long-lived-a%5C%22,host_cluster%3D~%5C%22northwest%5C%22%7D%5B$__rate_interval%5D%29%5Cn%20%20%29%5Cn%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22QPS%22%7D%5D,%22range%22:%7B%22from%22:%221776266161170%22,%22to%22:%221776270844104%22%7D%7D%7D&orgId=1). Inflight requests decrease back to baseline after the perf test.

num_inflight_requests roughly doubled, which makes sense since the workflow issues half SSE and half MSE requests.

(num_inflight_ops is captured every 10s. Since these canary queries run quickly, often the value of num_inflight_ops can be 0 at the time of the capture).

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-04-15T21:05:34Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/597
cc stripe-private-oss-forks/pinot-reviewers
r? dang

MSE and SSE now maintain independent per-server stats in ServerRoutingStatsManager, preventing one engine's infra issues from poisoning the other's adaptive routing decisions.

There's a lot of changes here mostly because of interface-level changes to include QueryType in signatures.

- Add separate ConcurrentHashMap for MSE stats in ServerRoutingStatsManager
- Add QueryType-aware overloads for recording and fetching stats
- Thread QueryType through ServerSelectionContext -> PriorityPoolInstanceSelector -> AdaptiveServerSelector
- MSE queries record to and read from MSE-specific stats via QueryType.MSE
- SSE queries record to and read from MSE-specific stats via QueryType.SSE. (removed existing no-arg methods to avoid future devs accidentally invoking the SSE option)

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

Deployed to rad-canary and [ran a chaos test that induced latency on MSE and SSE seperately](https://amp.qa.corp.stripe.com/chaos-scenarios/config/rad-canary-server-latency). We saw the MSE and SSE latency EMAs spike at different times.

https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778699885144&to=1778700484021
<img width="1279" alt="Screenshot 2026-05-13 at 3 30 27 pm" src="https://git.corp.stripe.com/user-attachments/assets/71dea37a-582f-4290-9240-ec0f779017fa" />

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-15T12:48:15Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/643
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 21, 2026

Codecov Report

❌ Patch coverage is 98.03922% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 64.29%. Comparing base (d88887e) to head (780f0ea).
⚠️ Report is 44 commits behind head on master.

Files with missing lines Patch % Lines
...requesthandler/MultiStageBrokerRequestHandler.java 66.66% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18553      +/-   ##
============================================
+ Coverage     63.68%   64.29%   +0.60%     
+ Complexity     1684     1126     -558     
============================================
  Files          3262     3311      +49     
  Lines        199835   203856    +4021     
  Branches      31034    31726     +692     
============================================
+ Hits         127266   131064    +3798     
+ Misses        62416    62282     -134     
- Partials      10153    10510     +357     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.29% <98.03%> (+0.60%) ⬆️
temurin 64.29% <98.03%> (+0.60%) ⬆️
unittests 64.28% <98.03%> (+0.60%) ⬆️
unittests1 56.74% <100.00%> (+0.96%) ⬆️
unittests2 35.52% <39.21%> (+0.57%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@timothy-e timothy-e marked this pull request as ready for review May 21, 2026 15:45
@timothy-e
Copy link
Copy Markdown
Contributor Author

@yashmayya @Jackie-Jiang Can you TAL at this PR that adds initial support for adaptive routing stats? I have a follow up PR that will enable latency tracking. Both are working well on our QA clusters.

Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall direction of separating MSE/SSE stats looks right. Six MAJOR concerns below, posted inline:

  • A silent fall-through in getStatsMap() lumps TSE queries into the SSE bucket and silently routes non-query-thread callers to SSE.
  • The /debug/serverRoutingStats endpoint and getServerRoutingStatsStr() still expose only the SSE map — operators triaging MSE adaptive routing via the debug surface are blind after this PR.
  • The async _executorService is a 2-thread pool with FIFO submission but unordered execution — the decrement task can land before the increment task, producing a transient negative inflight count and a wrong EMA sample. Preexisting for SSE, but amplified for MSE because the increment→decrement window shrinks to runReducer duration.
  • recordStatsUponResponseArrival(..., -1) skips the latency update — so for MSE, _latencyMsEMA stays at _avgInitializationVal forever, auto-decay never fires, and the new ADAPTIVE_SERVER_MSE_LATENCY_EMA / ADAPTIVE_SERVER_MSE_HYBRID_SCORE gauges export inert values that nonetheless feed HYBRID/LATENCY MSE routing decisions via the standard instance selectors.
  • BaseBrokerStarter.createMultiStageBrokerRequestHandler is a protected extension point whose signature changed — downstream broker starters that overrode it will silently stop being invoked. The constructor on MultiStageBrokerRequestHandler itself preserved its old form; this factory didn't.
  • QueryDispatcherTest.testStatsManagerRecordsSubmissionAndArrivalForDispatchedServers uses a Mockito.mock and only checks "method called at least once" — it doesn't verify ordering, doesn't exercise the partial-submit()-failure path the inline comments claim to defend, and doesn't cover the null-statsManager overload.

private final BrokerMetrics _brokerMetrics;
private volatile boolean _isEnabled;
private ConcurrentHashMap<String, ServerRoutingStatsEntry> _serverQueryStatsMap;
private ConcurrentHashMap<String, ServerRoutingStatsEntry> _mseServerQueryStatsMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR — debug surface silently omits the new MSE map.

The new _mseServerQueryStatsMap is not exposed by any of the public read APIs of this class:

  • getServerRoutingStats() (line 236) still returns only _serverQueryStatsMap, so the /debug/serverRoutingStats REST endpoint (PinotBrokerDebug.java:280) returns SSE-only data after this PR.
  • getServerRoutingStatsStr() (line 243) still iterates only _serverQueryStatsMap.

After this PR, the only programmatic view into MSE adaptive-routing state is the Prometheus gauges. Operators triaging routing decisions via the debug REST endpoint will be silently blind for MSE.

Suggest either growing the response payload to Map<QueryType, Map<String, ServerRoutingStatsEntry>> or adding a parallel debug endpoint.

}

private ConcurrentHashMap<String, ServerRoutingStatsEntry> getStatsMap(QueryType queryType) {
return queryType == QueryType.MSE ? _mseServerQueryStatsMap : _serverQueryStatsMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR — two silent fall-throughs in the new dispatch.

  1. QueryExecutionContext.QueryType has three values: SSE, MSE, TSE. This ternary lumps TSE into the SSE bucket. Today TimeSeriesRequestHandler doesn't call recordStats*, so this is dormant — but the moment anyone wires TSE adaptive routing, TSE traffic will silently poison SSE stats, which is the exact engine cross-talk the PR exists to prevent.
  2. The no-arg getStatsMap() below defaults to the SSE map when QueryThreadContext.getIfAvailable() is null. Any caller invoked from a non-query thread (debug REST handlers, periodic tasks, future SPI consumers) silently reads/writes the SSE map with no warning.

Suggest: switch exhaustively on the enum and either throw or log on an unknown/unset query type, or add a third TSE-map slot up front. At minimum, document on the method that the caller is expected to run with a QueryThreadContext set.

// implementation details of `submit`.
if (statsManager != null) {
for (QueryServerInstance server : servers) {
statsManager.recordStatsForQuerySubmission(requestId, server.getInstanceId());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR — increment/decrement can be reordered on the stats-manager executor, causing a transient negative inflight count and a wrong EMA sample.

recordStatsForQuerySubmission (here) and recordStatsUponResponseArrival (line 219 in the finally) both enqueue tasks on ServerRoutingStatsManager._executorService, which is a newFixedThreadPool(2) by default. The FIFO submission queue does NOT guarantee execution order across the two worker threads — for any single server X, the decrement task can acquire the per-server write lock before the increment task. When that happens:

  • _numInFlightRequests momentarily becomes -1, then 0 after the increment lands.
  • The transient -1 is read by any concurrent fetchNumInFlightRequestsForServer because _numInFlightRequests is volatile.
  • The EMA is sampled only inside updateNumInFlightRequestsForQuerySubmission (ServerRoutingStatsEntry.java:99), so it gets sampled at 0 instead of the expected +1.

This race is preexisting for SSE (AsyncQueryResponse uses the same executor), but the increment→decrement window in MSE is just runReducer duration (often milliseconds) rather than a server-side roundtrip — meaningfully more likely to reorder.

Options: serialize per-server (per-server work channel keyed by serverInstanceId), single-thread the executor, or apply the decrement synchronously while submitting the EMA update asynchronously.

} finally {
if (statsManager != null) {
for (QueryServerInstance server : incrementedServers) {
statsManager.recordStatsUponResponseArrival(requestId, server.getInstanceId(), -1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR — latency EMA for MSE entries will be stuck at the init value forever, and the new MSE gauges will export that stuck value.

Passing latency = -1 is handled safely by updateStatsUponResponseArrival (it skips updateLatency), but the consequences go beyond "no latency yet":

  1. _latencyMsEMA._lastUpdatedTimeMs stays at 0 for every MSE entry, so the EMA's auto-decay scheduled task never fires.
  2. _latencyMsEMA.getAverage() returns _avgInitializationVal (default 1.0) for the lifetime of the broker.
  3. ADAPTIVE_SERVER_MSE_LATENCY_EMA therefore exports a flat constant — operators wiring dashboards will assume the gauge is broken and stop trusting it.
  4. ADAPTIVE_SERVER_MSE_HYBRID_SCORE is inflight^exp * latencyEma, which collapses to inflight^exp * initVal. If _avgInitializationVal == 0.0, the MSE hybrid score is identically 0 for all servers.

This isn't just a v0 inert-data concern: BalancedInstanceSelector and ReplicaGroupInstanceSelector (the common MSE leaf selectors via BaseInstanceSelector._priorityPoolInstanceSelector) DO invoke _adaptiveServerSelector — and via the new getStatsMap(), those reads now hit the MSE map, where these values are stuck. With HYBRID or LATENCY selector configured, MSE routing decisions will be derived from stuck data.

Until the TODO above lands, suggest either: gate ADAPTIVE_SERVER_MSE_LATENCY_EMA / ADAPTIVE_SERVER_MSE_HYBRID_SCORE from being exported (so dashboards don't show misleading flat lines), or fall back to the SSE entry's latency EMA when the MSE entry has never received a real latency sample, or document loudly in the metric Javadoc that these gauges are inert until follow-up.


for (String instanceId : expectedInstanceIds) {
Mockito.verify(statsManager).recordStatsForQuerySubmission(requestId, instanceId);
Mockito.verify(statsManager).recordStatsUponResponseArrival(requestId, instanceId, -1L);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR — this test does not actually verify the contract its production code claims to defend.

Mockito.verify(statsManager).recordStatsForQuerySubmission(...) only checks the method was called at least once per server. It does not catch any of the bugs this PR's production code comments are explicitly guarding against:

  1. Ordering: a future refactor that moves the decrement out of the finally (or breaks the incrementedServers set bookkeeping) still passes this test.
  2. Partial-failure path: the inline comment in submitAndReduce says "guarding against a partial failure in submit()". But the only error path exercised here is runReducer throwing NPE due to the mocked MailboxService — i.e., the path where submit() succeeded. The actual partial-submit()-failure case isn't covered.
  3. Legacy submitAndReduce(... null) overload: no test asserts the null-statsManager overload doesn't throw or change behavior.
  4. Real-executor behavior: the manager is a Mockito mock, so the async executor and per-server lock ordering aren't exercised at all (see related comment about the executor race on QueryDispatcher.java:201).

Suggested additions:

  • A test where submit() throws after partial dispatch and asserts incrementedServers is empty / recordStatsUponResponseArrival is NOT called for any server.
  • A test exercising the legacy 4-arg submitAndReduce overload.
  • An end-to-end test using a real ServerRoutingStatsManager (not a mock) that asserts numInFlight == 0 for every server after submitAndReduce returns successfully — which is what the inline comments actually claim.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants