Skip to content

[SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE metrics#55711

Open
juliuszsompolski wants to merge 5 commits intoapache:masterfrom
juliuszsompolski:dsv2-dml-slam
Open

[SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE metrics#55711
juliuszsompolski wants to merge 5 commits intoapache:masterfrom
juliuszsompolski:dsv2-dml-slam

Conversation

@juliuszsompolski
Copy link
Copy Markdown
Contributor

@juliuszsompolski juliuszsompolski commented May 6, 2026

What changes were proposed in this pull request?

Switches the DSv2 row-level operation metrics that count rows produced by executor tasks to use SQLLastAttemptMetric instead of plain SQLMetric, so the values surfaced in UpdateSummary, DeleteSummary, and MergeSummary are stable across stage retries.

Specifically:

  • In RowLevelWriteExec.sparkMetrics, the UPDATE and DELETE branches now construct their row counters (numUpdatedRows / numDeletedRows / numCopiedRows) via SQLLastAttemptMetrics.createMetric. Increment paths are unchanged (SQLLastAttemptMetric extends SQLMetric).
  • In MergeRowsExec.metrics, all 8 row counters (numTargetRowsCopied, numTargetRowsInserted, numTargetRowsDeleted, numTargetRowsUpdated, and the matched / not-matched-by-source splits) are switched to SLAM. Both the interpreted (longMetric("...") += 1) and codegen (metricTerm(...).add(1)) increment paths work unchanged.
  • BatchScanExec overrides sparkMetrics so numOutputRows becomes SLAM only when the scan reads on behalf of a row-level DELETE (i.e. table is RowLevelOperationTable with operation.command() == DELETE). This is needed because group-based DELETE derives numDeletedRows = numScannedRows - numCopiedRows on the driver in ReplaceDataExec.getWriteSummary; if either input is overcounted the difference is overcounted in lockstep on a writer-stage retry. Other scans (regular reads, UPDATE/MERGE row-level scans, streaming scans via the trait default) keep the plain SQLMetric.
  • RowLevelWriteExec.getMetricValue now reads via lastAttemptValueForHighestRDDId() for SQLLastAttemptMetric, falling back to slam.value if SLAM bailed out. This handles the UPDATE/DELETE summary paths, the MERGE summary path (MergeRowsExec.metrics), and the scan side numOutputRows referenced from ReplaceDataExec.getWriteSummary uniformly.

The driver-side metrics("numDeletedRows").set(...) in ReplaceDataExec.getWriteSummary continues to work after the swap because for the group-based DELETE path that metric is only updated on the driver, so SLAM records it as a driver value and surfaces it via lastAttemptValueForHighestRDDId() without bailing out.

Why are the changes needed?

SQLMetric.value aggregates increments from every task attempt that ever ran, so on a stage retry the row counts double up. The values flow into the connector-visible WriteSummary (and downstream into operator metrics consumers such as Delta's invariant checks), so an inflated count mis-reports what the operation actually did. SQLLastAttemptMetric reports only the last attempt's contribution and so gives the row count that matches what was actually committed.

Does this PR introduce any user-facing change?

The values surfaced via UpdateSummary.numUpdatedRows / numCopiedRows, DeleteSummary.numDeletedRows / numCopiedRows, and MergeSummary.numTargetRows* (i.e. what the connector receives from BatchWrite.commit(messages, summary)) will, in the presence of stage retries, be the row counts from the last attempt rather than the sum across all attempts. With no retries, behavior is unchanged. The metric names, display strings, and presence in the SQL UI are unchanged.

Note: the SQL UI still shows the raw accumulator value (SQLMetric.value), which on stage retries is the sum across all task attempts and therefore overcounts. Only the values passed to the connector via WriteSummary are SLAM-corrected. Making the SQL UI also display the last-attempt value is a bigger follow-up that would touch the SQL UI's metric collection pipeline.

How was this patch tested?

Existing tests in UpdateTableSuiteBase, DeleteFromTableSuiteBase, and the merge suites exercise the metric values; they continue to pass since SLAM and SQLMetric report the same value when there are no stage retries.

Three new tests cover the retry behavior directly, one in each of UpdateTableSuiteBase, DeleteFromTableSuiteBase, and MergeIntoTableSuiteBase, all named "metric values are stable across stage retries". Each runs the operation with an IN-subquery (or a join, for MERGE) to force a shuffle, sets spark.sql.autoBroadcastJoinThreshold = -1 so the join doesn't get broadcast away (AQE is left at its default), and then enables spark.test.injectShuffleFetchFailures. The DAGScheduler corrupts the first attempt of each shuffle map stage, the writer stage retries, and the test asserts that the corresponding *Summary row counts still match the actual operation result. Each test runs across the suite's variants (group-based / delta-based, with/without metadata).

The injected retries are visible in the test logs as FetchFailedException / MetadataFetchFailedException. The summary-value assertions only pass because the new SLAM-aware reader returns the last-attempt value rather than the doubled raw accumulator value.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

Juliusz Sompolski added 3 commits May 6, 2026 11:25
… metrics

### What changes were proposed in this pull request?

Switches the DSv2 row-level operation metrics that count rows produced by
executor tasks to use `SQLLastAttemptMetric` instead of plain `SQLMetric`,
so the values surfaced in `UpdateSummary` and `MergeSummary` are stable
across stage retries.

Specifically:
- In `RowLevelWriteExec.sparkMetrics`, the UPDATE branch now constructs
  `numUpdatedRows` and `numCopiedRows` via
  `SQLLastAttemptMetrics.createMetric`. Increment paths are unchanged
  (`SQLLastAttemptMetric extends SQLMetric`).
- In `MergeRowsExec.metrics`, all 8 row counters (`numTargetRowsCopied`,
  `numTargetRowsInserted`, `numTargetRowsDeleted`, `numTargetRowsUpdated`,
  and the matched / not-matched-by-source splits) are switched to SLAM.
  Both the interpreted (`longMetric("...") += 1`) and codegen
  (`metricTerm(...).add(1)`) increment paths work unchanged.
- `RowLevelWriteExec.getMetricValue` now reads via
  `lastAttemptValueForHighestRDDId()` for `SQLLastAttemptMetric`, falling
  back to `slam.value` if SLAM bailed out. This affects both the UPDATE
  summary path and the MERGE summary path (which reads
  `MergeRowsExec.metrics`).

The DELETE branch in `RowLevelWriteExec.sparkMetrics` is intentionally
left on plain `SQLMetric`. The group-based DELETE summary derives
`numDeletedRows` on the driver as `numScannedRows - numCopiedRows` where
`numScannedRows` comes from `BatchScanExec.numOutputRows` (still a plain
`SQLMetric`); making that derivation retry-stable needs a separate
design decision (either a SLAM scan-output metric or a counting node
inserted before `ReplaceDataExec`). DELETE is left for a follow-up so
this change stays focused on cases where the swap is mechanical.

### Why are the changes needed?

`SQLMetric.value` aggregates increments from every task attempt that ever
ran, so on a stage retry the row counts double up. The values flow into
the connector-visible `WriteSummary` (and downstream into operator
metrics consumers such as Delta's invariant checks), so an inflated count
mis-reports what the operation actually did. `SQLLastAttemptMetric`
reports only the last attempt's contribution and so gives the row count
that matches what was actually committed.

### Does this PR introduce _any_ user-facing change?

The values surfaced via `UpdateSummary.numUpdatedRows /numCopiedRows` and
`MergeSummary.numTargetRows*` will, in the presence of stage retries, be
the row counts from the last attempt rather than the sum across all
attempts. With no retries, behavior is unchanged. The metric names,
display strings, and presence in the SQL UI are unchanged.

### How was this patch tested?

Existing tests in `UpdateTableSuiteBase` subclasses and the merge suites
exercise the metric values; they continue to pass since SLAM and SQLMetric
report the same value when there are no stage retries.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7
…metrics

Adds two tests that exercise SQLLastAttemptMetric correctness under stage
retries triggered by injected shuffle fetch failures:

- UpdateTableSuiteBase: "UPDATE: metric values are stable across stage
  retries" runs an UPDATE with an IN-subquery to force a shuffle (with
  AQE and broadcast disabled for plan determinism), then turns on
  spark.test.injectShuffleFetchFailures. The DAGScheduler corrupts the
  first attempt of each shuffle map stage, the writer stage retries, and
  the test asserts that UpdateSummary still reports the last-attempt
  numUpdatedRows / numCopiedRows. Runs for all four UPDATE variants
  (Group/Delta x metadata/no-metadata).

- MergeIntoTableSuiteBase: "MERGE: metric values are stable across stage
  retries" does the same with a MERGE statement whose join introduces
  the shuffle. Asserts all 8 MergeSummary fields. Runs across the merge
  suite variants.

The retries are visible in the test logs as FetchFailedException /
MetadataFetchFailedException entries; the assertions only pass because
MergeSummary / UpdateSummary read via SQLLastAttemptMetric's
lastAttemptValueForHighestRDDId() rather than the raw accumulator value.
…p AQE override

Three small test cleanups in response to review feedback:
- Drop the "MERGE: " / "UPDATE: " prefix from the test names; the suite
  context already implies the operation.
- Move the MERGE retry test in MergeIntoTableSuiteBase to sit next to
  the other "Merge metrics ..." tests (right after SPARK-52689) instead
  of after SPARK-55074.
- Move the UPDATE retry test in UpdateTableSuiteBase to sit right after
  the existing IN-predicate test, since they share the IN-subquery
  shape.
- Stop overriding ADAPTIVE_EXECUTION_ENABLED in either test. SLAM is
  correct under AQE; the only reason these tests need a config override
  is to keep the join from being broadcast-resolved (which would skip
  the shuffle, and therefore the fetch-failure injection, entirely).
  AUTO_BROADCASTJOIN_THRESHOLD = -1 is sufficient for that. The retries
  still fire (FetchFailedException visible in the logs) and the
  assertions on `UpdateSummary` / `MergeSummary` still pass with AQE on.
@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

@ZiyaZa @aokolnychyi

Brings DELETE into the same SLAM-correctness story as UPDATE / MERGE.

DELETE in DSv2 has two paths:

- Delta-based DELETE (WriteDeltaExec): the writer task increments
  `numDeletedRows` directly from row-level operations.

- Group-based DELETE (ReplaceDataExec): the writer only sees COPY rows;
  `numCopiedRows` is incremented executor-side, and `numDeletedRows` is
  derived on the driver as `numScannedRows - numCopiedRows` where
  `numScannedRows` comes from `BatchScanExec.metrics("numOutputRows")`.

Changes:
- `RowLevelWriteExec.sparkMetrics`: the DELETE branch now uses
  `SQLLastAttemptMetrics.createMetric` for `numDeletedRows` and
  `numCopiedRows`, mirroring the UPDATE branch. Increment paths are
  unchanged. The driver-side `metrics("numDeletedRows").set(...)` in
  `ReplaceDataExec.getWriteSummary` still works because for the
  group-based path that metric is only updated on the driver, so SLAM
  records it as a driver value and surfaces it via
  `lastAttemptValueForHighestRDDId()`.
- `BatchScanExec` overrides `sparkMetrics` so `numOutputRows` is SLAM
  *only* when the table is wrapped as `RowLevelOperationTable` with
  `command == DELETE`. Other scans (regular reads, UPDATE / MERGE row-
  level scans, streaming scans via the trait default) are untouched.
  This is the only place the trait default needs to be overridden, since
  row-level rewrites only produce `BatchScanExec`.
- `getMetricValue` in `RowLevelWriteExec` (already SLAM-aware from the
  earlier UPDATE / MERGE commit) handles both the writer's
  `numCopiedRows` / `numDeletedRows` and the scan's `numOutputRows`
  uniformly.

Test:
- `DeleteFromTableSuiteBase`: "metric values are stable across stage
  retries" — DELETE with an `IN`-subquery, broadcast disabled to keep the
  shuffle, fetch-failure injection on. Asserts the `DeleteSummary`
  values match the actual delete after the writer stage retries. Runs
  across the four DELETE variants.
@juliuszsompolski juliuszsompolski changed the title [SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE and MERGE metrics [SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE metrics May 6, 2026
… DELETE case

Per review, the rationale for using SLAM only for DELETE row-level scans
belongs next to the case arm it explains, not on top of the metric block.
Also tightens the wording.
case rlot: RowLevelOperationTable if rlot.operation.command() == DELETE =>
SQLLastAttemptMetrics.createMetric(sparkContext, name)
case _ =>
SQLMetrics.createMetric(sparkContext, name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not always using SLAM here? Is it riskier than regular metrics?

IMO it would look better if this code was in DataSourceV2ScanExecBase, but there we can't check if it's a DELETE command.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some memory overhead - an array of num tasks size of partial values by default, so let's say a couple hundred bytes to a couple of kilobytes. Computational overhead should be negligible.
I think that in general all numOutputRows metrics all over Spark could benefit from porting to SLAM, but wanted to make the blast radius as small as possible here for now.

@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

TODO: Work on a better test right not for UPDATE and MERGE, because the fetch failure injection framework right now doesn't trigger retries of the last stage, where these metrics are computed, so actually doesn't trigger duplicate non SLAM metrics.
Such duplicates would happen if the downstream stage produces a checksum mismatch on recompute, so the final stage gets restarted with a full retry. The failure injection mechanism would need some extensions to simulate that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants