Add MseMetrics with configurable emission mode for multi-stage engine#18550
Add MseMetrics with configurable emission mode for multi-stage engine#18550yashmayya wants to merge 3 commits into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18550 +/- ##
============================================
+ Coverage 55.76% 64.26% +8.50%
- Complexity 815 1126 +311
============================================
Files 2561 3315 +754
Lines 148577 203950 +55373
Branches 24019 31730 +7711
============================================
+ Hits 82850 131067 +48217
- Misses 58624 62377 +3753
- Partials 7103 10506 +3403
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| name: "pinot_broker_$1_$2" | ||
| # All global gauge/meters/timers. Group-flexible at the prefix so non-broker MBean groups | ||
| # registered in this JVM (e.g. pinot.mse.* from the multi-stage engine emitter) are also exported. | ||
| - pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?, name=\"?pinot\\.(\\w+)\\.(\\w+)\"?><>(\\w+)" |
There was a problem hiding this comment.
Do you need to do the same for server? Say we want to migrate to mse metric that can be emitted on server
There was a problem hiding this comment.
The server Prometheus agent config already had this flexible regex. The unit tests verify that the new pinot.mse* metrics get captured on the server side as well.
| * Subsequent calls in the same JVM are no-ops (compare-and-set with the NOOP placeholder). | ||
| */ | ||
| public static void registerFromConfig(PinotConfiguration instanceConfig, PinotMetricsRegistry metricsRegistry) { | ||
| String modeStr = instanceConfig.getProperty(Helix.CONFIG_OF_MSE_METRICS_MODE, Helix.DEFAULT_MSE_METRICS_MODE); |
There was a problem hiding this comment.
Why is this under Helix?
There was a problem hiding this comment.
I think the reasoning was that this is a global cluster level config, but I can move it elsewhere.
Edit: it doesn't really fit cleanly elsewhere and Helix section already has various other common cluster level configurations.
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue; see inline comment.
| // keys, and a racing put of the same key just dedupes to one entry on the next read. | ||
| // The DualPinotMeter itself fan-outs to two underlying handles which are themselves | ||
| // dedup'd by their registries, so a duplicate wrapper is harmless if it does happen. | ||
| return _dualMeterCache.computeIfAbsent(meter, |
There was a problem hiding this comment.
DUAL mode is introducing shared mutable state on the live query path, but _dualMeterCache is a plain EnumMap and computeIfAbsent() here is not safe under concurrent writes. MultiStageOperator.updateMseMetrics(...) and the other MSE emission paths can resolve meters from multiple query threads at the same time, so this can race in exactly the rollout mode operators are supposed to use for migration. Please switch the cache to a concurrent structure or prebuild the wrappers during registration.
There was a problem hiding this comment.
Thanks for the careful look here — I want to push back on this one. I think the race here is benign.
Walking through the concurrent first-call scenario for the same MseMeter:
- Both threads see no entry and invoke the mapping function.
- The mapping function calls
super.getMeteredValue(meter)andServerMetrics.get().getMeteredValue(serverMeter). Both are idempotent registry lookups (PinotMetricUtils.makePinotMeterreturns the existingPinotMeterfor an already-registered name), so both threads wrap the same two underlying handles. - Both threads then write to
vals[meter.ordinal()]. That's a single non-volatile reference assignment, atomic per the JMM. One value wins; thesizecounter can be over-incremented but it's never read by anything in this class. - Whichever wrapper a thread returns, every subsequent
.mark()hits the same registry counters via the same underlying handles. No double-counting, no lost emissions, no inconsistent state.
The only observable cost is one short-lived extra DualPinotMeter allocation, only on the first concurrent call per meter (15 meters total in the public enum). After the map is populated it's effectively read-only — and reads of a single Object[] slot are atomic, with final-field safe-publication semantics covering the DualPinotMeter contents.
ConcurrentHashMap.computeIfAbsent would add hash + CAS work to every emission to prevent an allocation that's already correctness-irrelevant. Pre-building eagerly constructs handles for meters that may never fire.
If the worry is that a future maintainer might modify this without realizing the race-correctness depends on (a) idempotent registry lookups and (b) wrappers being functionally interchangeable, happy to add a thread-safety note to the field comment documenting the invariant. WDYT?
Summary
Introduces a dedicated
MseMetricsabstraction for multi-stage engine (MSE) metrics, replacing directServerMetricscalls from the MSE engine call sites. The shim supports three emission modes selected by cluster config:SERVER(default): forward toServerMetricsso the existingpinot.server.mse*/pinot.server.multiStage*series stay exactly as today — no behavior change for existing deployments.MSE: emit to a new role-agnosticpinot.mse.*namespace.DUAL: emit to both during dashboard migration.Motivation
The multi-stage engine is logically separate from the segment-scanning/storage path that historically owned
ServerMetrics, but its metrics are co-mingled in thepinot.server.*namespace today. That makes it awkward to:ServerMetrics.ServerMetrics— the existing call sites hard-wireServerMetrics.get(), which is the noop singleton outside of server JVMs.This change establishes a per-engine namespace as the pattern going forward, gated behind a config so existing dashboards keep working unchanged until operators migrate.
Design
MseMeter/MseTimerenums inpinot-commoncovering the existingServerMeter.MSE_*/ServerMeter.MULTI_STAGE_*and correspondingServerTimerentries, each carrying a reference to its counterpart soSERVERandDUALmodes can forward.MseMetrics extends AbstractMetricssingleton withregister()/get()mirroringServerMetrics. Mode-awareaddMeteredGlobalValue/getMeteredValue/addTimedValueroute emissions to the right destination(s).DUALmode hands back a fan-outPinotMeterwrapper (cached perMseMeter) so callers that resolve once and mark repeatedly (e.g.MetricsExecutor) hit both registries on every increment.pinot.metrics.mse.mode(defaultSERVER). Picked up at startup inBaseServerStarterandBaseBrokerStartervia the existingapplyClusterConfig→MseMetrics.registerFromConfig(...), before any MSE runtime component is constructed so cachedPinotMeterhandles bind to the right mode.MseMetrics.get():QueryServer,QueryRunner,OpChainSchedulerService,MailboxSendOperator, and theMultiStageOperator.Typecallbacks (renamedupdateServerMetrics→updateMseMetrics). SSE leaf-stage paths still useServerMetricsdirectly.broker.ymlcatch-all generalized to the(\w+)group pattern already used byserver.yml/minion.yml/pinot.yml, sopinot.mse.*series exported from broker JVMs are scraped consistently with the other roles.Compatibility
SERVERpreserves all existingpinot.server.mse*/pinot.server.multiStage*series byte-for-byte. Existing dashboards and alerts continue to work without operator action.MseMetrics.NOOPis inSERVERmode and resolves throughServerMetrics.get(), so any code path that emits before explicit registration still hits the sameServerMetricshandle today's code would have hit — no regression in tests or call sites that never initializeMseMetrics.ServerMeter.MSE_*/MULTI_STAGE_*andServerTimerMSE entries is a separate follow-up; the migration path is documented onMseMetricsMode.Test plan
MseMetricsTest— 14 unit tests covering all three modes, NOOP-fallback semantics, the SERVER-mode-on-noop-ServerMetrics case (drops silently as documented), MSE-mode without registered ServerMetrics (still emits), cachedDualPinotMeteridentity for repeated lookups, andregisterFromConfigconfig parsing including the invalid-value fallback.mseMeterExportedFromBrokerJmx/mseTimerExportedFromBrokerJmx(15 + 5) andmseMeterExportedFromServerJmx/mseTimerExportedFromServerJmx(15 + 5) inBrokerPrometheusMetricsTestandServerPrometheusMetricsTest. Each starts a real JMX→Prometheus exporter against the role's actual yml config, emits the entry, and asserts it surfaces aspinot_mse_<name>_<measurement>. Covers both the broker.yml regex generalization and the server.yml catch-all path.QueryServerTestcontinues to pass unchanged — SERVER-mode forwarding preserves the sameServerMeter.MSE_QUERIESincrement behavior the test asserts on.pinot-common,pinot-spi,pinot-query-runtime,pinot-server,pinot-broker,pinot-dropwizard,pinot-yammer.