[SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE metrics#55711
[SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE metrics#55711juliuszsompolski wants to merge 5 commits intoapache:masterfrom
Conversation
… metrics
### What changes were proposed in this pull request?
Switches the DSv2 row-level operation metrics that count rows produced by
executor tasks to use `SQLLastAttemptMetric` instead of plain `SQLMetric`,
so the values surfaced in `UpdateSummary` and `MergeSummary` are stable
across stage retries.
Specifically:
- In `RowLevelWriteExec.sparkMetrics`, the UPDATE branch now constructs
`numUpdatedRows` and `numCopiedRows` via
`SQLLastAttemptMetrics.createMetric`. Increment paths are unchanged
(`SQLLastAttemptMetric extends SQLMetric`).
- In `MergeRowsExec.metrics`, all 8 row counters (`numTargetRowsCopied`,
`numTargetRowsInserted`, `numTargetRowsDeleted`, `numTargetRowsUpdated`,
and the matched / not-matched-by-source splits) are switched to SLAM.
Both the interpreted (`longMetric("...") += 1`) and codegen
(`metricTerm(...).add(1)`) increment paths work unchanged.
- `RowLevelWriteExec.getMetricValue` now reads via
`lastAttemptValueForHighestRDDId()` for `SQLLastAttemptMetric`, falling
back to `slam.value` if SLAM bailed out. This affects both the UPDATE
summary path and the MERGE summary path (which reads
`MergeRowsExec.metrics`).
The DELETE branch in `RowLevelWriteExec.sparkMetrics` is intentionally
left on plain `SQLMetric`. The group-based DELETE summary derives
`numDeletedRows` on the driver as `numScannedRows - numCopiedRows` where
`numScannedRows` comes from `BatchScanExec.numOutputRows` (still a plain
`SQLMetric`); making that derivation retry-stable needs a separate
design decision (either a SLAM scan-output metric or a counting node
inserted before `ReplaceDataExec`). DELETE is left for a follow-up so
this change stays focused on cases where the swap is mechanical.
### Why are the changes needed?
`SQLMetric.value` aggregates increments from every task attempt that ever
ran, so on a stage retry the row counts double up. The values flow into
the connector-visible `WriteSummary` (and downstream into operator
metrics consumers such as Delta's invariant checks), so an inflated count
mis-reports what the operation actually did. `SQLLastAttemptMetric`
reports only the last attempt's contribution and so gives the row count
that matches what was actually committed.
### Does this PR introduce _any_ user-facing change?
The values surfaced via `UpdateSummary.numUpdatedRows /numCopiedRows` and
`MergeSummary.numTargetRows*` will, in the presence of stage retries, be
the row counts from the last attempt rather than the sum across all
attempts. With no retries, behavior is unchanged. The metric names,
display strings, and presence in the SQL UI are unchanged.
### How was this patch tested?
Existing tests in `UpdateTableSuiteBase` subclasses and the merge suites
exercise the metric values; they continue to pass since SLAM and SQLMetric
report the same value when there are no stage retries.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
…metrics Adds two tests that exercise SQLLastAttemptMetric correctness under stage retries triggered by injected shuffle fetch failures: - UpdateTableSuiteBase: "UPDATE: metric values are stable across stage retries" runs an UPDATE with an IN-subquery to force a shuffle (with AQE and broadcast disabled for plan determinism), then turns on spark.test.injectShuffleFetchFailures. The DAGScheduler corrupts the first attempt of each shuffle map stage, the writer stage retries, and the test asserts that UpdateSummary still reports the last-attempt numUpdatedRows / numCopiedRows. Runs for all four UPDATE variants (Group/Delta x metadata/no-metadata). - MergeIntoTableSuiteBase: "MERGE: metric values are stable across stage retries" does the same with a MERGE statement whose join introduces the shuffle. Asserts all 8 MergeSummary fields. Runs across the merge suite variants. The retries are visible in the test logs as FetchFailedException / MetadataFetchFailedException entries; the assertions only pass because MergeSummary / UpdateSummary read via SQLLastAttemptMetric's lastAttemptValueForHighestRDDId() rather than the raw accumulator value.
…p AQE override Three small test cleanups in response to review feedback: - Drop the "MERGE: " / "UPDATE: " prefix from the test names; the suite context already implies the operation. - Move the MERGE retry test in MergeIntoTableSuiteBase to sit next to the other "Merge metrics ..." tests (right after SPARK-52689) instead of after SPARK-55074. - Move the UPDATE retry test in UpdateTableSuiteBase to sit right after the existing IN-predicate test, since they share the IN-subquery shape. - Stop overriding ADAPTIVE_EXECUTION_ENABLED in either test. SLAM is correct under AQE; the only reason these tests need a config override is to keep the join from being broadcast-resolved (which would skip the shuffle, and therefore the fetch-failure injection, entirely). AUTO_BROADCASTJOIN_THRESHOLD = -1 is sufficient for that. The retries still fire (FetchFailedException visible in the logs) and the assertions on `UpdateSummary` / `MergeSummary` still pass with AQE on.
Brings DELETE into the same SLAM-correctness story as UPDATE / MERGE.
DELETE in DSv2 has two paths:
- Delta-based DELETE (WriteDeltaExec): the writer task increments
`numDeletedRows` directly from row-level operations.
- Group-based DELETE (ReplaceDataExec): the writer only sees COPY rows;
`numCopiedRows` is incremented executor-side, and `numDeletedRows` is
derived on the driver as `numScannedRows - numCopiedRows` where
`numScannedRows` comes from `BatchScanExec.metrics("numOutputRows")`.
Changes:
- `RowLevelWriteExec.sparkMetrics`: the DELETE branch now uses
`SQLLastAttemptMetrics.createMetric` for `numDeletedRows` and
`numCopiedRows`, mirroring the UPDATE branch. Increment paths are
unchanged. The driver-side `metrics("numDeletedRows").set(...)` in
`ReplaceDataExec.getWriteSummary` still works because for the
group-based path that metric is only updated on the driver, so SLAM
records it as a driver value and surfaces it via
`lastAttemptValueForHighestRDDId()`.
- `BatchScanExec` overrides `sparkMetrics` so `numOutputRows` is SLAM
*only* when the table is wrapped as `RowLevelOperationTable` with
`command == DELETE`. Other scans (regular reads, UPDATE / MERGE row-
level scans, streaming scans via the trait default) are untouched.
This is the only place the trait default needs to be overridden, since
row-level rewrites only produce `BatchScanExec`.
- `getMetricValue` in `RowLevelWriteExec` (already SLAM-aware from the
earlier UPDATE / MERGE commit) handles both the writer's
`numCopiedRows` / `numDeletedRows` and the scan's `numOutputRows`
uniformly.
Test:
- `DeleteFromTableSuiteBase`: "metric values are stable across stage
retries" — DELETE with an `IN`-subquery, broadcast disabled to keep the
shuffle, fetch-failure injection on. Asserts the `DeleteSummary`
values match the actual delete after the writer stage retries. Runs
across the four DELETE variants.
… DELETE case Per review, the rationale for using SLAM only for DELETE row-level scans belongs next to the case arm it explains, not on top of the metric block. Also tightens the wording.
| case rlot: RowLevelOperationTable if rlot.operation.command() == DELETE => | ||
| SQLLastAttemptMetrics.createMetric(sparkContext, name) | ||
| case _ => | ||
| SQLMetrics.createMetric(sparkContext, name) |
There was a problem hiding this comment.
Any reason for not always using SLAM here? Is it riskier than regular metrics?
IMO it would look better if this code was in DataSourceV2ScanExecBase, but there we can't check if it's a DELETE command.
There was a problem hiding this comment.
There is some memory overhead - an array of num tasks size of partial values by default, so let's say a couple hundred bytes to a couple of kilobytes. Computational overhead should be negligible.
I think that in general all numOutputRows metrics all over Spark could benefit from porting to SLAM, but wanted to make the blast radius as small as possible here for now.
|
TODO: Work on a better test right not for UPDATE and MERGE, because the fetch failure injection framework right now doesn't trigger retries of the last stage, where these metrics are computed, so actually doesn't trigger duplicate non SLAM metrics. |
What changes were proposed in this pull request?
Switches the DSv2 row-level operation metrics that count rows produced by executor tasks to use
SQLLastAttemptMetricinstead of plainSQLMetric, so the values surfaced inUpdateSummary,DeleteSummary, andMergeSummaryare stable across stage retries.Specifically:
RowLevelWriteExec.sparkMetrics, the UPDATE and DELETE branches now construct their row counters (numUpdatedRows/numDeletedRows/numCopiedRows) viaSQLLastAttemptMetrics.createMetric. Increment paths are unchanged (SQLLastAttemptMetric extends SQLMetric).MergeRowsExec.metrics, all 8 row counters (numTargetRowsCopied,numTargetRowsInserted,numTargetRowsDeleted,numTargetRowsUpdated, and the matched / not-matched-by-source splits) are switched to SLAM. Both the interpreted (longMetric("...") += 1) and codegen (metricTerm(...).add(1)) increment paths work unchanged.BatchScanExecoverridessparkMetricssonumOutputRowsbecomes SLAM only when the scan reads on behalf of a row-level DELETE (i.e.tableisRowLevelOperationTablewithoperation.command() == DELETE). This is needed because group-based DELETE derivesnumDeletedRows = numScannedRows - numCopiedRowson the driver inReplaceDataExec.getWriteSummary; if either input is overcounted the difference is overcounted in lockstep on a writer-stage retry. Other scans (regular reads, UPDATE/MERGE row-level scans, streaming scans via the trait default) keep the plainSQLMetric.RowLevelWriteExec.getMetricValuenow reads vialastAttemptValueForHighestRDDId()forSQLLastAttemptMetric, falling back toslam.valueif SLAM bailed out. This handles the UPDATE/DELETE summary paths, the MERGE summary path (MergeRowsExec.metrics), and the scan sidenumOutputRowsreferenced fromReplaceDataExec.getWriteSummaryuniformly.The driver-side
metrics("numDeletedRows").set(...)inReplaceDataExec.getWriteSummarycontinues to work after the swap because for the group-based DELETE path that metric is only updated on the driver, so SLAM records it as a driver value and surfaces it vialastAttemptValueForHighestRDDId()without bailing out.Why are the changes needed?
SQLMetric.valueaggregates increments from every task attempt that ever ran, so on a stage retry the row counts double up. The values flow into the connector-visibleWriteSummary(and downstream into operator metrics consumers such as Delta's invariant checks), so an inflated count mis-reports what the operation actually did.SQLLastAttemptMetricreports only the last attempt's contribution and so gives the row count that matches what was actually committed.Does this PR introduce any user-facing change?
The values surfaced via
UpdateSummary.numUpdatedRows / numCopiedRows,DeleteSummary.numDeletedRows / numCopiedRows, andMergeSummary.numTargetRows*(i.e. what the connector receives fromBatchWrite.commit(messages, summary)) will, in the presence of stage retries, be the row counts from the last attempt rather than the sum across all attempts. With no retries, behavior is unchanged. The metric names, display strings, and presence in the SQL UI are unchanged.Note: the SQL UI still shows the raw accumulator value (
SQLMetric.value), which on stage retries is the sum across all task attempts and therefore overcounts. Only the values passed to the connector viaWriteSummaryare SLAM-corrected. Making the SQL UI also display the last-attempt value is a bigger follow-up that would touch the SQL UI's metric collection pipeline.How was this patch tested?
Existing tests in
UpdateTableSuiteBase,DeleteFromTableSuiteBase, and the merge suites exercise the metric values; they continue to pass since SLAM andSQLMetricreport the same value when there are no stage retries.Three new tests cover the retry behavior directly, one in each of
UpdateTableSuiteBase,DeleteFromTableSuiteBase, andMergeIntoTableSuiteBase, all named "metric values are stable across stage retries". Each runs the operation with anIN-subquery (or a join, for MERGE) to force a shuffle, setsspark.sql.autoBroadcastJoinThreshold = -1so the join doesn't get broadcast away (AQE is left at its default), and then enablesspark.test.injectShuffleFetchFailures. The DAGScheduler corrupts the first attempt of each shuffle map stage, the writer stage retries, and the test asserts that the corresponding*Summaryrow counts still match the actual operation result. Each test runs across the suite's variants (group-based / delta-based, with/without metadata).The injected retries are visible in the test logs as
FetchFailedException/MetadataFetchFailedException. The summary-value assertions only pass because the new SLAM-aware reader returns the last-attempt value rather than the doubled raw accumulator value.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7