Export Adaptive Routing Stats as broker metrics#18134
Export Adaptive Routing Stats as broker metrics#18134timothy-e wants to merge 3 commits intoapache:masterfrom
Conversation
cc stripe-private-oss-forks/pinot-reviewers r? When multistage adaptive server selector stats collection is enabled (pinot.broker.adaptive.server.selector.enable.stats.collection=true), the broker tracks per-server routing stats (in-flight request count, latency EMA, hybrid score) in memory but has no way to observe them externally without hitting the broker's debug API. This makes it difficult to monitor the health and behavior of adaptive routing in production. This PR adds `setValueOfGaugeWithTag(gauge, tag, value)` to `AbstractMetrics`, which composes the metric name as `gaugeName.tag` without going through `getTableName()`. This ensures the server instance ID is always present in the metric name regardless of the `enableTableLevelMetrics` broker config. Also adds `BrokerMetrics.getTagForServer(serverInstanceId)` returning "server.<instanceId>", following the same pattern as `getTagForPreferredPool`. The resulting Pinot metric names are of the form: `pinot.broker.adaptiveServerLatencyEma.server.Server_pinotdb1_8098` **PluginConfig.yaml + statsd reporter** Following https://trailhead.corp.stripe.com/docs/stream-analytics-internal/pinot/pinot-observability/pinot-metrics-guide, adds a labeled metric pattern that extracts the server instance ID as a server tag, so the metrics arrive in Veneur/Prometheus as: - Name: pinot_broker_adaptive_server_latency_ema - Tags: server=Server_pinotdb1_8098 (plus standard host tags) [STREAMANALYTICS-4390](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4390) Unit tests to verify that we: * don't export when adaptive routing stats are disabled * don't export when adaptive routing stats metric export is disabled * transform the metric name properly. [Deployed to QA rad-canary](https://amp.qa.corp.stripe.com/deploy/qa-deploy1.pdx.deploy.stripe.net%2Fdeploy_qBdrdOKZQ7iya8JplJbdaw), and [metrics appeared in grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%223yy%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22refId%22:%22B%22,%22expr%22:%22%7B__name__%3D~%5C%22pinot_broker_adaptive_server_%28latency_ema%7Cnum_in_flight_requests%7Chybrid_score%29%5C%22,%5Cn%20%20pinot_cluster%3D%5C%22rad-canary%5C%22,host%3D%5C%22qa-pinotdbbroker--06621e6804f2c92db.northwest.stripe.io%5C%22%7D%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22__auto%22%7D%5D,%22range%22:%7B%22from%22:%221775151966533%22,%22to%22:%221775152978766%22%7D%7D%7D&orgId=1) as `{__name__=~"pinot_broker_adaptive_server_(latency_ema|num_in_flight_requests|hybrid_score)", pinot_cluster="rad-canary"}`. Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-04-07T17:09:47Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/581 (cherry picked from commit 8d84c8980f2122e5290b15d3889282eee8f5286f)
|
@Jackie-Jiang @vvivekiyer can you please take a look at this when you have a chance? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18134 +/- ##
============================================
+ Coverage 63.22% 63.99% +0.77%
- Complexity 1454 1617 +163
============================================
Files 3176 3192 +16
Lines 191002 193960 +2958
Branches 29204 29938 +734
============================================
+ Hits 120763 124131 +3368
+ Misses 60818 60026 -792
- Partials 9421 9803 +382
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
I found a potential thread-safety issue in the exportStatsAsMetrics() method:
The iteration over _serverQueryStatsMap.entrySet() at line 408 is not protected. While ConcurrentHashMap is used, iterating over entrySet() while the map is being concurrently modified (via recordStatsForQuerySubmission()) can result in:
- ConcurrentModificationException
- Missed entries in the metrics export
- Inconsistent snapshots across iterations
Since this runs on a periodic scheduled task that could coincide with ongoing request processing, this race condition is likely to manifest under load.
Consider wrapping the iteration with explicit synchronization or taking a snapshot of the entries:
private void exportStatsAsMetrics() {
try {
// Option 1: Snapshot the entries
List<Map.Entry<String, ServerRoutingStatsEntry>> entries =
new ArrayList<>(_serverQueryStatsMap.entrySet());
for (Map.Entry<String, ServerRoutingStatsEntry> entry : entries) {Or use an iterator with exception handling:
Iterator<Map.Entry<String, ServerRoutingStatsEntry>> it =
_serverQueryStatsMap.entrySet().iterator();
while (it.hasNext()) {
try {
Map.Entry<String, ServerRoutingStatsEntry> entry = it.next();
I don't think this is actually a problem. The concurrent hash map java doc says:
There are already 4 patterns like this in the file: |
When multistage adaptive server selector stats collection is enabled (
pinot.broker.adaptive.server.selector.enable.stats.collection=true), the broker tracks per-server routing stats (in-flight request count, latency EMA, hybrid score) in memory but has no way to observe them externally without hitting the broker's debug API. This makes it difficult to monitor the health and behavior of adaptive routing in production.We add new metrics for the stats:
This introduces 3 new metrics for each broker x server combination on a tenant. Not all users / usecases will want or warrant this, so we disable it by default.
The resulting Pinot metric names are of the form:
pinot.broker.adaptiveServerLatencyEma.server.Server_pinotdb1_8098Hybrid Score Range
ServerRoutingStatsEntry.java::computeHybridScoredefines a hybrid score as:_hybridScoreExponent defaults to 3.
The hybrid score could be as low as 0 if there are no inflight requests, but as soon as there is an inflight request, we jump to at least _latencyMsEMA.getAverage(). An unhealthy server would be much higher because it would have an increased number of inflight requests (~
(inflight_reqs * infight_reqs_ema) ^ 3=> a server with 5 inflight requests would have it's latency_ema multiplied by ~(5 + 5) ^ 3 = 1000.Using a decimal data type would only help differentiate scores among servers with very low inflight requests, which isn't too helpful.
Testing
Unit tests verify that we:
We deployed this to an internal stripe cluster with adaptive routing stats enabled, and saw metrics appearing in Grafana.
Setting the value
Currently this requires restarting the brokers. #18135 will allow these values to be toggled without a restart.