Skip to content

Export Adaptive Routing Stats as broker metrics#18134

Open
timothy-e wants to merge 3 commits intoapache:masterfrom
timothy-e:timothye-enable-adaptive-routing-metrics
Open

Export Adaptive Routing Stats as broker metrics#18134
timothy-e wants to merge 3 commits intoapache:masterfrom
timothy-e:timothye-enable-adaptive-routing-metrics

Conversation

@timothy-e
Copy link
Copy Markdown
Contributor

@timothy-e timothy-e commented Apr 8, 2026

When multistage adaptive server selector stats collection is enabled (pinot.broker.adaptive.server.selector.enable.stats.collection=true), the broker tracks per-server routing stats (in-flight request count, latency EMA, hybrid score) in memory but has no way to observe them externally without hitting the broker's debug API. This makes it difficult to monitor the health and behavior of adaptive routing in production.

We add new metrics for the stats:

  • adaptiveServerNumInFlightRequests
  • adaptiveServerLatencyEma
  • adaptiveServerHybridScore

This introduces 3 new metrics for each broker x server combination on a tenant. Not all users / usecases will want or warrant this, so we disable it by default.

The resulting Pinot metric names are of the form: pinot.broker.adaptiveServerLatencyEma.server.Server_pinotdb1_8098

Hybrid Score Range

ServerRoutingStatsEntry.java::computeHybridScore defines a hybrid score as:

  public double computeHybridScore() {
    double estimatedQSize = _numInFlightRequests + _inFlighRequestsEMA.getAverage();
    return Math.pow(estimatedQSize, _hybridScoreExponent) * _latencyMsEMA.getAverage();
  }

_hybridScoreExponent defaults to 3.

The hybrid score could be as low as 0 if there are no inflight requests, but as soon as there is an inflight request, we jump to at least _latencyMsEMA.getAverage(). An unhealthy server would be much higher because it would have an increased number of inflight requests (~ (inflight_reqs * infight_reqs_ema) ^ 3 => a server with 5 inflight requests would have it's latency_ema multiplied by ~ (5 + 5) ^ 3 = 1000.

Using a decimal data type would only help differentiate scores among servers with very low inflight requests, which isn't too helpful.

Testing

Unit tests verify that we:

  • don't export when adaptive routing stats are disabled
  • don't export when adaptive routing stats metric export is disabled

We deployed this to an internal stripe cluster with adaptive routing stats enabled, and saw metrics appearing in Grafana.

Setting the value

Currently this requires restarting the brokers. #18135 will allow these values to be toggled without a restart.

cc stripe-private-oss-forks/pinot-reviewers
r?

When multistage adaptive server selector stats collection is enabled (pinot.broker.adaptive.server.selector.enable.stats.collection=true), the broker tracks per-server routing stats (in-flight request count, latency EMA, hybrid score) in memory but has no way to observe them externally without hitting the broker's debug API. This makes it difficult to monitor the health and behavior of adaptive routing in production.

This PR adds `setValueOfGaugeWithTag(gauge, tag, value)` to `AbstractMetrics`, which composes the metric name as `gaugeName.tag` without going through `getTableName()`. This ensures the server instance ID is always present in the metric name regardless of the `enableTableLevelMetrics` broker config. Also adds `BrokerMetrics.getTagForServer(serverInstanceId)` returning "server.<instanceId>", following the same pattern as `getTagForPreferredPool`.

 The resulting Pinot metric names are of the form: `pinot.broker.adaptiveServerLatencyEma.server.Server_pinotdb1_8098`

**PluginConfig.yaml + statsd reporter**

Following https://trailhead.corp.stripe.com/docs/stream-analytics-internal/pinot/pinot-observability/pinot-metrics-guide, adds a labeled metric pattern that extracts the server instance ID as a server tag, so the metrics arrive in Veneur/Prometheus as:
  - Name: pinot_broker_adaptive_server_latency_ema
  - Tags: server=Server_pinotdb1_8098 (plus standard host tags)

[STREAMANALYTICS-4390](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4390)
Unit tests to verify that we:
* don't export when adaptive routing stats are disabled
* don't export when adaptive routing stats metric export is disabled
* transform the metric name properly.

[Deployed to QA rad-canary](https://amp.qa.corp.stripe.com/deploy/qa-deploy1.pdx.deploy.stripe.net%2Fdeploy_qBdrdOKZQ7iya8JplJbdaw), and [metrics appeared in grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%223yy%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22refId%22:%22B%22,%22expr%22:%22%7B__name__%3D~%5C%22pinot_broker_adaptive_server_%28latency_ema%7Cnum_in_flight_requests%7Chybrid_score%29%5C%22,%5Cn%20%20pinot_cluster%3D%5C%22rad-canary%5C%22,host%3D%5C%22qa-pinotdbbroker--06621e6804f2c92db.northwest.stripe.io%5C%22%7D%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheus%22,%22uid%22:%22zb219lV4k%22%7D,%22editorMode%22:%22code%22,%22legendFormat%22:%22__auto%22%7D%5D,%22range%22:%7B%22from%22:%221775151966533%22,%22to%22:%221775152978766%22%7D%7D%7D&orgId=1) as `{__name__=~"pinot_broker_adaptive_server_(latency_ema|num_in_flight_requests|hybrid_score)",
  pinot_cluster="rad-canary"}`.

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-04-07T17:09:47Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/581
(cherry picked from commit 8d84c8980f2122e5290b15d3889282eee8f5286f)
@timothy-e timothy-e marked this pull request as ready for review April 8, 2026 19:48
@timothy-e
Copy link
Copy Markdown
Contributor Author

@Jackie-Jiang @vvivekiyer can you please take a look at this when you have a chance?

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 8, 2026

Codecov Report

❌ Patch coverage is 84.61538% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.99%. Comparing base (739043c) to head (dd68062).
⚠️ Report is 306 commits behind head on master.

Files with missing lines Patch % Lines
...erver/routing/stats/ServerRoutingStatsManager.java 90.47% 2 Missing ⚠️
...va/org/apache/pinot/spi/utils/CommonConstants.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18134      +/-   ##
============================================
+ Coverage     63.22%   63.99%   +0.77%     
- Complexity     1454     1617     +163     
============================================
  Files          3176     3192      +16     
  Lines        191002   193960    +2958     
  Branches      29204    29938     +734     
============================================
+ Hits         120763   124131    +3368     
+ Misses        60818    60026     -792     
- Partials       9421     9803     +382     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.97% <84.61%> (+0.79%) ⬆️
java-21 63.96% <84.61%> (+0.77%) ⬆️
temurin 63.99% <84.61%> (+0.77%) ⬆️
unittests 63.99% <84.61%> (+0.77%) ⬆️
unittests1 55.90% <84.61%> (+0.35%) ⬆️
unittests2 34.35% <15.38%> (+0.25%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a potential thread-safety issue in the exportStatsAsMetrics() method:

The iteration over _serverQueryStatsMap.entrySet() at line 408 is not protected. While ConcurrentHashMap is used, iterating over entrySet() while the map is being concurrently modified (via recordStatsForQuerySubmission()) can result in:

  1. ConcurrentModificationException
  2. Missed entries in the metrics export
  3. Inconsistent snapshots across iterations

Since this runs on a periodic scheduled task that could coincide with ongoing request processing, this race condition is likely to manifest under load.

Consider wrapping the iteration with explicit synchronization or taking a snapshot of the entries:

private void exportStatsAsMetrics() {
  try {
    // Option 1: Snapshot the entries
    List<Map.Entry<String, ServerRoutingStatsEntry>> entries = 
      new ArrayList<>(_serverQueryStatsMap.entrySet());
    for (Map.Entry<String, ServerRoutingStatsEntry> entry : entries) {

Or use an iterator with exception handling:

Iterator<Map.Entry<String, ServerRoutingStatsEntry>> it = 
  _serverQueryStatsMap.entrySet().iterator();
while (it.hasNext()) {
  try {
    Map.Entry<String, ServerRoutingStatsEntry> entry = it.next();

@timothy-e
Copy link
Copy Markdown
Contributor Author

I found a potential thread-safety issue in the exportStatsAsMetrics() method:

The iteration over _serverQueryStatsMap.entrySet() at line 408 is not protected. While ConcurrentHashMap is used, iterating over entrySet() while the map is being concurrently modified (via recordStatsForQuerySubmission()) can result in:

  1. ConcurrentModificationException
  2. Missed entries in the metrics export
  3. Inconsistent snapshots across iterations

I don't think this is actually a problem. The concurrent hash map java doc says:

Similarly, Iterators, Spliterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. They do not throw ConcurrentModificationException.

There are already 4 patterns like this in the file: getServerRoutingStats, fetchNumInFlightRequestsForAllServers. fetchEMALatencyForAllServers, fetchHybridScoreForAllServers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants