Skip to content

[SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch trigger progress event#55699

Open
DHRUV6029 wants to merge 1 commit intoapache:masterfrom
DHRUV6029:SPARK-56537
Open

[SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch trigger progress event#55699
DHRUV6029 wants to merge 1 commit intoapache:masterfrom
DHRUV6029:SPARK-56537

Conversation

@DHRUV6029
Copy link
Copy Markdown

What changes were proposed in this pull request?

This PR is a follow-up to SPARK-56464 (commit 930c3039871), which left a TODO(SPARK-56537) in ProgressReporter#resetExecStatsForNoExecution to track the remaining per-batch fields on StateOperatorProgress that were not being reset on no-data trigger progress events.

Three changes:

  1. Reset the per-batch time fields on no-data trigger progress events. allUpdatesTimeMs, allRemovalsTimeMs, and commitTimeMs are now reset to 0 alongside the row-count fields (numRowsUpdated, numRowsRemoved, numRowsDroppedByWatermark) that were already handled by SPARK-56464.

  2. Reset per-batch entries of customMetrics while preserving snapshot entries. StateOperatorProgress.customMetrics carries values from two metric registries (StateStoreCustomMetric for provider-level, StatefulOperatorCustomMetric for operator-level) and conflates per-batch counters/timings with snapshot reads of state-store status (current memory usage, key counts, file size). On a no-data trigger we now zero per-batch entries and preserve snapshot entries.

    The snapshot/per-batch distinction is encoded at the metric definition via a new isSnapshot: Boolean flag on StateStoreCustomMetric (default false). The six snapshot Size metrics are marked at their definitions:

    • RocksDB (5): rocksdbSstFileSize, rocksdbPinnedBlocksMemoryUsage, rocksdbNumInternalColFamiliesKeys, rocksdbNumExternalColumnFamilies, rocksdbNumInternalColumnFamilies.
    • HDFSBackedStateStoreProvider (1): stateOnCurrentVersionSizeBytes.

    StateStoreCustomTimingMetric and StateStoreCustomSumMetric keep using the trait default (always per-batch). Operator-level StatefulOperatorCustomSumMetric instances (declared by BaseStreamingDeduplicateExec, StreamingSymmetricHashJoinExec, and TransformWithStateExecBase) are also always per-batch.

  3. Centralize the reset semantics in a new copyForNoExecution() method on StateOperatorProgress instead of growing copy(...)'s parameter list further. The method takes no parameters; it inspects the operator instance's snapshotCustomMetricNames (a new private[spark] constructor field, defaulted to Set.empty, populated at progress build time by StateStoreWriter.getProgress) to decide which customMetrics keys to preserve. The existing 3-arg copy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved) signature is unchanged; its body is updated to thread snapshotCustomMetricNames through so SessionWindowStateStoreSaveExec.getProgress round-trip preserves it.

The TODO(SPARK-56537) comment is removed from ProgressReporter#resetExecStatsForNoExecution, whose body is reduced to a single delegating map: originExecStats.stateOperators.map(_.copyForNoExecution()).

Why are the changes needed?

Today, on a no-data ("idle") trigger progress event, StateOperatorProgress carries the previous batch's values for allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs, and most of customMetrics. To a user reading query.lastProgress / query.recentProgress during an idle period this looks like work was performed when none was. It is also a known source of test flakiness.

The TODO(SPARK-56537) left by SPARK-56464 in ProgressReporter#resetExecStatsForNoExecution explicitly tracks this follow-up.

The design was discussed on the JIRA ticket and confirmed before implementation:

  • Encode snapshot semantics at the metric definition (option (2b) in the audit comment), not via a hardcoded whitelist in the reset routine.
  • Add a new copyForNoExecution() method on StateOperatorProgress rather than growing the existing copy(...) argument list further (3 args after SPARK-56464 would have become 6+).

Does this PR introduce any user-facing change?

No public API change.

User-visible behavior change: idle-trigger progress events emitted via StreamingQueryListener.QueryProgressEvent, query.lastProgress, and query.recentProgress will now report 0 for all per-batch fields and per-batch customMetrics entries instead of carrying stale values from the previous data batch. Snapshot fields (numRowsTotal, memoryUsedBytes, numShufflePartitions, numStateStoreInstances, snapshot custom metrics) are unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit.

How was this patch tested?

New and updated tests in ProgressReporterSuite.scala:

  1. Extended the SPARK-56464 test with assertions that the three time fields (allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs) are reset to 0 on the idle trigger, alongside the existing row-count assertions. Test description updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch resets all per-batch StateOperatorProgress fields to zero" to reflect the broader scope.

  2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot split end-to-end against a real RocksDBStateStoreProvider. The test runs one data batch, advances the manual clock to trigger an idle progress event, then asserts that 3 per-batch RocksDB metrics (rocksdbCommitFlushLatency, rocksdbPutCount, rocksdbTotalBytesWritten) are reset to 0 on idle, while 5 snapshot RocksDB metrics (rocksdbPinnedBlocksMemoryUsage, rocksdbNumInternalColFamiliesKeys, rocksdbNumExternalColumnFamilies, rocksdbNumInternalColumnFamilies, rocksdbSstFileSize) are preserved across the idle trigger.

Local verification:

  • build/sbt 'sql/testOnly *ProgressReporterSuite' -> 2/2 tests pass.
  • build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite' -> 240 tests pass in 7m 16s.
  • dev/mima -> no exclusions required.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

@DHRUV6029 DHRUV6029 marked this pull request as draft May 6, 2026 06:49
@DHRUV6029 DHRUV6029 marked this pull request as ready for review May 6, 2026 06:52
… batch trigger progress event

### What changes were proposed in this pull request?

This PR is a follow-up to SPARK-56464 (commit 930c303), which left a
`TODO(SPARK-56537)` in `ProgressReporter#resetExecStatsForNoExecution` to
track the remaining per-batch fields on `StateOperatorProgress` that were
not being reset on no-data trigger progress events.

Three changes:

1. Reset the per-batch time fields on no-data trigger progress events.
   `allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset
   to 0 alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`,
   `numRowsDroppedByWatermark`) that were already handled by SPARK-56464.

2. Reset per-batch entries of `customMetrics` while preserving snapshot
   entries. `StateOperatorProgress.customMetrics` carries values from two
   metric registries (`StateStoreCustomMetric` for provider-level,
   `StatefulOperatorCustomMetric` for operator-level) and conflates per-batch
   counters/timings with snapshot reads of state-store status (current memory
   usage, key counts, file size). On a no-data trigger we now zero per-batch
   entries and preserve snapshot entries.

   The snapshot/per-batch distinction is encoded at the metric definition
   via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default
   `false`). The six snapshot Size metrics are marked at their definitions:
   - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`,
     `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`,
     `rocksdbNumInternalColumnFamilies`.
   - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`.

   `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep using
   the trait default (always per-batch). Operator-level
   `StatefulOperatorCustomSumMetric` instances (declared by
   `BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and
   `TransformWithStateExecBase`) are also always per-batch.

3. Centralize the reset semantics in a new `copyForNoExecution()` method
   on `StateOperatorProgress` instead of growing `copy(...)`'s parameter
   list further. The method takes no parameters; it inspects the operator
   instance's `snapshotCustomMetricNames` (a new `private[spark]`
   constructor field, defaulted to `Set.empty`, populated at progress
   build time by `StateStoreWriter.getProgress`) to decide which
   `customMetrics` keys to preserve. The existing 3-arg
   `copy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved)`
   signature is unchanged; its body is updated to thread
   `snapshotCustomMetricNames` through so
   `SessionWindowStateStoreSaveExec.getProgress` round-trip preserves it.

The `TODO(SPARK-56537)` comment is removed from
`ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a
single delegating map:
`originExecStats.stateOperators.map(_.copyForNoExecution())`.

### Why are the changes needed?

Today, on a no-data ("idle") trigger progress event, `StateOperatorProgress`
carries the previous batch's values for `allUpdatesTimeMs`,
`allRemovalsTimeMs`, `commitTimeMs`, and most of `customMetrics`. To a user
reading `query.lastProgress` / `query.recentProgress` during an idle period
this looks like work was performed when none was. It is also a known source
of test flakiness.

The `TODO(SPARK-56537)` left by SPARK-56464 in
`ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this
follow-up.

The design was discussed on the JIRA ticket and confirmed before
implementation:
- Encode snapshot semantics at the metric definition (option (2b) in the
  audit comment), not via a hardcoded whitelist in the reset routine.
- Add a new `copyForNoExecution()` method on `StateOperatorProgress`
  rather than growing the existing `copy(...)` argument list further (3
  args after SPARK-56464 would have become 6+).

### Does this PR introduce _any_ user-facing change?

No public API change.

User-visible behavior change: idle-trigger progress events emitted via
`StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and
`query.recentProgress` will now report `0` for all per-batch fields and
per-batch `customMetrics` entries instead of carrying stale values from the
previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`,
`numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics)
are unchanged. Same direction as the SPARK-56464 fix; this PR completes the
audit.

### How was this patch tested?

New and updated tests in `ProgressReporterSuite.scala`:

1. Extended the SPARK-56464 test with assertions that the three time fields
   (`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0
   on the idle trigger, alongside the existing row-count assertions. Test
   description updated from "no-data batch resets numRowsRemoved to zero"
   to "no-data batch resets all per-batch StateOperatorProgress fields to
   zero" to reflect the broader scope.

2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but
   preserves snapshot customMetrics (RocksDB)" exercising the per-batch /
   snapshot split end-to-end against a real `RocksDBStateStoreProvider`.
   The test runs one data batch, advances the manual clock to trigger an
   idle progress event, then asserts that 3 per-batch RocksDB metrics
   (`rocksdbCommitFlushLatency`, `rocksdbPutCount`,
   `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 snapshot
   RocksDB metrics (`rocksdbPinnedBlocksMemoryUsage`,
   `rocksdbNumInternalColFamiliesKeys`,
   `rocksdbNumExternalColumnFamilies`,
   `rocksdbNumInternalColumnFamilies`, `rocksdbSstFileSize`) are preserved
   across the idle trigger.

Local verification:
- `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass.
- `build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass in 7m 16s.
- `dev/mima` -> no exclusions required.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant