Skip to content

Add MseMetrics with configurable emission mode for multi-stage engine#18550

Open
yashmayya wants to merge 3 commits into
apache:masterfrom
yashmayya:feat/data-2210-mse-metrics-shim
Open

Add MseMetrics with configurable emission mode for multi-stage engine#18550
yashmayya wants to merge 3 commits into
apache:masterfrom
yashmayya:feat/data-2210-mse-metrics-shim

Conversation

@yashmayya
Copy link
Copy Markdown
Contributor

Summary

Introduces a dedicated MseMetrics abstraction for multi-stage engine (MSE) metrics, replacing direct ServerMetrics calls from the MSE engine call sites. The shim supports three emission modes selected by cluster config:

  • SERVER (default): forward to ServerMetrics so the existing pinot.server.mse* / pinot.server.multiStage* series stay exactly as today — no behavior change for existing deployments.
  • MSE: emit to a new role-agnostic pinot.mse.* namespace.
  • DUAL: emit to both during dashboard migration.

Motivation

The multi-stage engine is logically separate from the segment-scanning/storage path that historically owned ServerMetrics, but its metrics are co-mingled in the pinot.server.* namespace today. That makes it awkward to:

  • Attribute load to MSE vs SSE on shared JVMs.
  • Build engine-specific dashboards without globbing on metric-name prefixes.
  • Evolve MSE metric naming and units independently of ServerMetrics.
  • Surface MSE-engine emissions from any JVM that does not register ServerMetrics — the existing call sites hard-wire ServerMetrics.get(), which is the noop singleton outside of server JVMs.

This change establishes a per-engine namespace as the pattern going forward, gated behind a config so existing dashboards keep working unchanged until operators migrate.

Design

  • New MseMeter / MseTimer enums in pinot-common covering the existing ServerMeter.MSE_* / ServerMeter.MULTI_STAGE_* and corresponding ServerTimer entries, each carrying a reference to its counterpart so SERVER and DUAL modes can forward.
  • New MseMetrics extends AbstractMetrics singleton with register() / get() mirroring ServerMetrics. Mode-aware addMeteredGlobalValue / getMeteredValue / addTimedValue route emissions to the right destination(s).
  • DUAL mode hands back a fan-out PinotMeter wrapper (cached per MseMeter) so callers that resolve once and mark repeatedly (e.g. MetricsExecutor) hit both registries on every increment.
  • New cluster-config key pinot.metrics.mse.mode (default SERVER). Picked up at startup in BaseServerStarter and BaseBrokerStarter via the existing applyClusterConfigMseMetrics.registerFromConfig(...), before any MSE runtime component is constructed so cached PinotMeter handles bind to the right mode.
  • All MSE engine call sites migrated to MseMetrics.get(): QueryServer, QueryRunner, OpChainSchedulerService, MailboxSendOperator, and the MultiStageOperator.Type callbacks (renamed updateServerMetricsupdateMseMetrics). SSE leaf-stage paths still use ServerMetrics directly.
  • Prometheus JMX broker.yml catch-all generalized to the (\w+) group pattern already used by server.yml / minion.yml / pinot.yml, so pinot.mse.* series exported from broker JVMs are scraped consistently with the other roles.

Compatibility

  • Default mode SERVER preserves all existing pinot.server.mse* / pinot.server.multiStage* series byte-for-byte. Existing dashboards and alerts continue to work without operator action.
  • The pre-registration MseMetrics.NOOP is in SERVER mode and resolves through ServerMetrics.get(), so any code path that emits before explicit registration still hits the same ServerMetrics handle today's code would have hit — no regression in tests or call sites that never initialize MseMetrics.
  • Removing the legacy ServerMeter.MSE_* / MULTI_STAGE_* and ServerTimer MSE entries is a separate follow-up; the migration path is documented on MseMetricsMode.

Test plan

  • MseMetricsTest — 14 unit tests covering all three modes, NOOP-fallback semantics, the SERVER-mode-on-noop-ServerMetrics case (drops silently as documented), MSE-mode without registered ServerMetrics (still emits), cached DualPinotMeter identity for repeated lookups, and registerFromConfig config parsing including the invalid-value fallback.
  • mseMeterExportedFromBrokerJmx / mseTimerExportedFromBrokerJmx (15 + 5) and mseMeterExportedFromServerJmx / mseTimerExportedFromServerJmx (15 + 5) in BrokerPrometheusMetricsTest and ServerPrometheusMetricsTest. Each starts a real JMX→Prometheus exporter against the role's actual yml config, emits the entry, and asserts it surfaces as pinot_mse_<name>_<measurement>. Covers both the broker.yml regex generalization and the server.yml catch-all path.
  • Existing QueryServerTest continues to pass unchanged — SERVER-mode forwarding preserves the same ServerMeter.MSE_QUERIES increment behavior the test asserts on.
  • Pre-commit checks (spotless / checkstyle / license) clean on pinot-common, pinot-spi, pinot-query-runtime, pinot-server, pinot-broker, pinot-dropwizard, pinot-yammer.

@yashmayya yashmayya added the multi-stage Related to the multi-stage query engine label May 20, 2026
@yashmayya yashmayya requested a review from Jackie-Jiang May 20, 2026 23:04
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 20, 2026

Codecov Report

❌ Patch coverage is 78.91156% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.26%. Comparing base (9471163) to head (ddf4cd3).
⚠️ Report is 19 commits behind head on master.

Files with missing lines Patch % Lines
...va/org/apache/pinot/common/metrics/MseMetrics.java 69.56% 19 Missing and 2 partials ⚠️
...java/org/apache/pinot/common/metrics/MseMeter.java 89.28% 3 Missing ⚠️
...java/org/apache/pinot/common/metrics/MseTimer.java 81.25% 3 Missing ⚠️
...not/query/runtime/operator/MultiStageOperator.java 83.33% 2 Missing ⚠️
...e/pinot/broker/broker/helix/BaseBrokerStarter.java 0.00% 1 Missing ⚠️
.../pinot/server/starter/helix/BaseServerStarter.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18550      +/-   ##
============================================
+ Coverage     55.76%   64.26%   +8.50%     
- Complexity      815     1126     +311     
============================================
  Files          2561     3315     +754     
  Lines        148577   203950   +55373     
  Branches      24019    31730    +7711     
============================================
+ Hits          82850   131067   +48217     
- Misses        58624    62377    +3753     
- Partials       7103    10506    +3403     
Flag Coverage Δ
custom-integration1 100.00% <ø> (?)
integration 100.00% <ø> (?)
integration1 100.00% <ø> (?)
integration2 0.00% <ø> (?)
java-21 64.26% <78.91%> (+8.50%) ⬆️
temurin 64.26% <78.91%> (+8.50%) ⬆️
unittests 64.26% <78.91%> (+8.49%) ⬆️
unittests1 56.73% <80.00%> (+0.97%) ⬆️
unittests2 35.46% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Jackie-Jiang Jackie-Jiang added metrics Related to metrics emission and collection observability Related to observability (logging, tracing, metrics) labels May 21, 2026
name: "pinot_broker_$1_$2"
# All global gauge/meters/timers. Group-flexible at the prefix so non-broker MBean groups
# registered in this JVM (e.g. pinot.mse.* from the multi-stage engine emitter) are also exported.
- pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?, name=\"?pinot\\.(\\w+)\\.(\\w+)\"?><>(\\w+)"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to do the same for server? Say we want to migrate to mse metric that can be emitted on server

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server Prometheus agent config already had this flexible regex. The unit tests verify that the new pinot.mse* metrics get captured on the server side as well.

Comment thread pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java Outdated
* Subsequent calls in the same JVM are no-ops (compare-and-set with the NOOP placeholder).
*/
public static void registerFromConfig(PinotConfiguration instanceConfig, PinotMetricsRegistry metricsRegistry) {
String modeStr = instanceConfig.getProperty(Helix.CONFIG_OF_MSE_METRICS_MODE, Helix.DEFAULT_MSE_METRICS_MODE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this under Helix?

Copy link
Copy Markdown
Contributor Author

@yashmayya yashmayya May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reasoning was that this is a global cluster level config, but I can move it elsewhere.

Edit: it doesn't really fit cleanly elsewhere and Helix section already has various other common cluster level configurations.

Comment thread pinot-common/src/main/java/org/apache/pinot/common/metrics/MseGauge.java Outdated
Comment thread pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java Outdated
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

// keys, and a racing put of the same key just dedupes to one entry on the next read.
// The DualPinotMeter itself fan-outs to two underlying handles which are themselves
// dedup'd by their registries, so a duplicate wrapper is harmless if it does happen.
return _dualMeterCache.computeIfAbsent(meter,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DUAL mode is introducing shared mutable state on the live query path, but _dualMeterCache is a plain EnumMap and computeIfAbsent() here is not safe under concurrent writes. MultiStageOperator.updateMseMetrics(...) and the other MSE emission paths can resolve meters from multiple query threads at the same time, so this can race in exactly the rollout mode operators are supposed to use for migration. Please switch the cache to a concurrent structure or prebuild the wrappers during registration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful look here — I want to push back on this one. I think the race here is benign.

Walking through the concurrent first-call scenario for the same MseMeter:

  1. Both threads see no entry and invoke the mapping function.
  2. The mapping function calls super.getMeteredValue(meter) and ServerMetrics.get().getMeteredValue(serverMeter). Both are idempotent registry lookups (PinotMetricUtils.makePinotMeter returns the existing PinotMeter for an already-registered name), so both threads wrap the same two underlying handles.
  3. Both threads then write to vals[meter.ordinal()]. That's a single non-volatile reference assignment, atomic per the JMM. One value wins; the size counter can be over-incremented but it's never read by anything in this class.
  4. Whichever wrapper a thread returns, every subsequent .mark() hits the same registry counters via the same underlying handles. No double-counting, no lost emissions, no inconsistent state.

The only observable cost is one short-lived extra DualPinotMeter allocation, only on the first concurrent call per meter (15 meters total in the public enum). After the map is populated it's effectively read-only — and reads of a single Object[] slot are atomic, with final-field safe-publication semantics covering the DualPinotMeter contents.

ConcurrentHashMap.computeIfAbsent would add hash + CAS work to every emission to prevent an allocation that's already correctness-irrelevant. Pre-building eagerly constructs handles for meters that may never fire.

If the worry is that a future maintainer might modify this without realizing the race-correctness depends on (a) idempotent registry lookups and (b) wrappers being functionally interchangeable, happy to add a thread-safety note to the field comment documenting the invariant. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

metrics Related to metrics emission and collection multi-stage Related to the multi-stage query engine observability Related to observability (logging, tracing, metrics)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants