[SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch trigger progress event#55699
Open
DHRUV6029 wants to merge 1 commit intoapache:masterfrom
Open
[SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch trigger progress event#55699DHRUV6029 wants to merge 1 commit intoapache:masterfrom
DHRUV6029 wants to merge 1 commit intoapache:masterfrom
Conversation
… batch trigger progress event ### What changes were proposed in this pull request? This PR is a follow-up to SPARK-56464 (commit 930c303), which left a `TODO(SPARK-56537)` in `ProgressReporter#resetExecStatsForNoExecution` to track the remaining per-batch fields on `StateOperatorProgress` that were not being reset on no-data trigger progress events. Three changes: 1. Reset the per-batch time fields on no-data trigger progress events. `allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0 alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`, `numRowsDroppedByWatermark`) that were already handled by SPARK-56464. 2. Reset per-batch entries of `customMetrics` while preserving snapshot entries. `StateOperatorProgress.customMetrics` carries values from two metric registries (`StateStoreCustomMetric` for provider-level, `StatefulOperatorCustomMetric` for operator-level) and conflates per-batch counters/timings with snapshot reads of state-store status (current memory usage, key counts, file size). On a no-data trigger we now zero per-batch entries and preserve snapshot entries. The snapshot/per-batch distinction is encoded at the metric definition via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default `false`). The six snapshot Size metrics are marked at their definitions: - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`. - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`. `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep using the trait default (always per-batch). Operator-level `StatefulOperatorCustomSumMetric` instances (declared by `BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and `TransformWithStateExecBase`) are also always per-batch. 3. Centralize the reset semantics in a new `copyForNoExecution()` method on `StateOperatorProgress` instead of growing `copy(...)`'s parameter list further. The method takes no parameters; it inspects the operator instance's `snapshotCustomMetricNames` (a new `private[spark]` constructor field, defaulted to `Set.empty`, populated at progress build time by `StateStoreWriter.getProgress`) to decide which `customMetrics` keys to preserve. The existing 3-arg `copy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved)` signature is unchanged; its body is updated to thread `snapshotCustomMetricNames` through so `SessionWindowStateStoreSaveExec.getProgress` round-trip preserves it. The `TODO(SPARK-56537)` comment is removed from `ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a single delegating map: `originExecStats.stateOperators.map(_.copyForNoExecution())`. ### Why are the changes needed? Today, on a no-data ("idle") trigger progress event, `StateOperatorProgress` carries the previous batch's values for `allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`, and most of `customMetrics`. To a user reading `query.lastProgress` / `query.recentProgress` during an idle period this looks like work was performed when none was. It is also a known source of test flakiness. The `TODO(SPARK-56537)` left by SPARK-56464 in `ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this follow-up. The design was discussed on the JIRA ticket and confirmed before implementation: - Encode snapshot semantics at the metric definition (option (2b) in the audit comment), not via a hardcoded whitelist in the reset routine. - Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather than growing the existing `copy(...)` argument list further (3 args after SPARK-56464 would have become 6+). ### Does this PR introduce _any_ user-facing change? No public API change. User-visible behavior change: idle-trigger progress events emitted via `StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and `query.recentProgress` will now report `0` for all per-batch fields and per-batch `customMetrics` entries instead of carrying stale values from the previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`, `numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit. ### How was this patch tested? New and updated tests in `ProgressReporterSuite.scala`: 1. Extended the SPARK-56464 test with assertions that the three time fields (`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the idle trigger, alongside the existing row-count assertions. Test description updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch resets all per-batch StateOperatorProgress fields to zero" to reflect the broader scope. 2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one data batch, advances the manual clock to trigger an idle progress event, then asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`, `rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 snapshot RocksDB metrics (`rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`, `rocksdbSstFileSize`) are preserved across the idle trigger. Local verification: - `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass. - `build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass in 7m 16s. - `dev/mima` -> no exclusions required. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR is a follow-up to SPARK-56464 (commit
930c3039871), which left aTODO(SPARK-56537)inProgressReporter#resetExecStatsForNoExecutionto track the remaining per-batch fields onStateOperatorProgressthat were not being reset on no-data trigger progress events.Three changes:
Reset the per-batch time fields on no-data trigger progress events.
allUpdatesTimeMs,allRemovalsTimeMs, andcommitTimeMsare now reset to 0 alongside the row-count fields (numRowsUpdated,numRowsRemoved,numRowsDroppedByWatermark) that were already handled by SPARK-56464.Reset per-batch entries of
customMetricswhile preserving snapshot entries.StateOperatorProgress.customMetricscarries values from two metric registries (StateStoreCustomMetricfor provider-level,StatefulOperatorCustomMetricfor operator-level) and conflates per-batch counters/timings with snapshot reads of state-store status (current memory usage, key counts, file size). On a no-data trigger we now zero per-batch entries and preserve snapshot entries.The snapshot/per-batch distinction is encoded at the metric definition via a new
isSnapshot: Booleanflag onStateStoreCustomMetric(defaultfalse). The six snapshot Size metrics are marked at their definitions:rocksdbSstFileSize,rocksdbPinnedBlocksMemoryUsage,rocksdbNumInternalColFamiliesKeys,rocksdbNumExternalColumnFamilies,rocksdbNumInternalColumnFamilies.stateOnCurrentVersionSizeBytes.StateStoreCustomTimingMetricandStateStoreCustomSumMetrickeep using the trait default (always per-batch). Operator-levelStatefulOperatorCustomSumMetricinstances (declared byBaseStreamingDeduplicateExec,StreamingSymmetricHashJoinExec, andTransformWithStateExecBase) are also always per-batch.Centralize the reset semantics in a new
copyForNoExecution()method onStateOperatorProgressinstead of growingcopy(...)'s parameter list further. The method takes no parameters; it inspects the operator instance'ssnapshotCustomMetricNames(a newprivate[spark]constructor field, defaulted toSet.empty, populated at progress build time byStateStoreWriter.getProgress) to decide whichcustomMetricskeys to preserve. The existing 3-argcopy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved)signature is unchanged; its body is updated to threadsnapshotCustomMetricNamesthrough soSessionWindowStateStoreSaveExec.getProgressround-trip preserves it.The
TODO(SPARK-56537)comment is removed fromProgressReporter#resetExecStatsForNoExecution, whose body is reduced to a single delegating map:originExecStats.stateOperators.map(_.copyForNoExecution()).Why are the changes needed?
Today, on a no-data ("idle") trigger progress event,
StateOperatorProgresscarries the previous batch's values forallUpdatesTimeMs,allRemovalsTimeMs,commitTimeMs, and most ofcustomMetrics. To a user readingquery.lastProgress/query.recentProgressduring an idle period this looks like work was performed when none was. It is also a known source of test flakiness.The
TODO(SPARK-56537)left by SPARK-56464 inProgressReporter#resetExecStatsForNoExecutionexplicitly tracks this follow-up.The design was discussed on the JIRA ticket and confirmed before implementation:
copyForNoExecution()method onStateOperatorProgressrather than growing the existingcopy(...)argument list further (3 args after SPARK-56464 would have become 6+).Does this PR introduce any user-facing change?
No public API change.
User-visible behavior change: idle-trigger progress events emitted via
StreamingQueryListener.QueryProgressEvent,query.lastProgress, andquery.recentProgresswill now report0for all per-batch fields and per-batchcustomMetricsentries instead of carrying stale values from the previous data batch. Snapshot fields (numRowsTotal,memoryUsedBytes,numShufflePartitions,numStateStoreInstances, snapshot custom metrics) are unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit.How was this patch tested?
New and updated tests in
ProgressReporterSuite.scala:Extended the SPARK-56464 test with assertions that the three time fields (
allUpdatesTimeMs,allRemovalsTimeMs,commitTimeMs) are reset to 0 on the idle trigger, alongside the existing row-count assertions. Test description updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch resets all per-batch StateOperatorProgress fields to zero" to reflect the broader scope.New test "SPARK-56537: no-data batch resets per-batch customMetrics but preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot split end-to-end against a real
RocksDBStateStoreProvider. The test runs one data batch, advances the manual clock to trigger an idle progress event, then asserts that 3 per-batch RocksDB metrics (rocksdbCommitFlushLatency,rocksdbPutCount,rocksdbTotalBytesWritten) are reset to 0 on idle, while 5 snapshot RocksDB metrics (rocksdbPinnedBlocksMemoryUsage,rocksdbNumInternalColFamiliesKeys,rocksdbNumExternalColumnFamilies,rocksdbNumInternalColumnFamilies,rocksdbSstFileSize) are preserved across the idle trigger.Local verification:
build/sbt 'sql/testOnly *ProgressReporterSuite'-> 2/2 tests pass.build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'-> 240 tests pass in 7m 16s.dev/mima-> no exclusions required.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7