Skip to content

[SPARK-56720][SS] Fail subsequent async log writes after a prior failure in async progress tracking#55676

Open
eason-yuchen-liu wants to merge 5 commits intoapache:masterfrom
eason-yuchen-liu:ss-async-log-fail-fast
Open

[SPARK-56720][SS] Fail subsequent async log writes after a prior failure in async progress tracking#55676
eason-yuchen-liu wants to merge 5 commits intoapache:masterfrom
eason-yuchen-liu:ss-async-log-fail-fast

Conversation

@eason-yuchen-liu
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Introduce a shared AtomicReference[Throwable] (the first async write error) that is consulted at the start of every async write task in both AsyncOffsetSeqLog and AsyncCommitLog. Once the first failure is recorded:

  • Subsequent offset and commit log write tasks short-circuit by failing with the original error without touching durable storage.
  • The first error is preserved via compareAndSet(null, err) so it is not overwritten by later cascading failures.

The shared reference is owned by AsyncProgressTrackingMicroBatchExecution and threaded through AsyncStreamingQueryCheckpointMetadata to both async logs. The existing .exceptionally handlers in markMicroBatchStart / markMicroBatchEnd are routed through a new recordAsyncWriteError helper so the shared ref is also populated when the failure originates from addAsync's own thenApply (e.g. concurrentStreamLogUpdate).

Why are the changes needed?

When async progress tracking is enabled, offset and commit log writes are submitted to a single-threaded executor service. If one async write task fails, follow-up writes already queued (or queued shortly afterward) can still proceed and persist files to durable storage, leaving inconsistent state on disk — for example, an offset entry for batch N missing while batch N+1 is present, or a commit-log entry written without its corresponding offset-log entry.

The original error can also be overwritten in the ErrorNotifier by a later cascading failure, so the user-visible exception masks the root cause (e.g. surfacing concurrentStreamLogUpdate instead of the actual Permission denied / IOException that started the cascade).

Does this PR introduce any user-facing change?

No. Async-progress-tracking queries that hit a write failure will now surface the root cause instead of a later cascading error, but the failure mode (query termination with a StreamingQueryException) is unchanged.

How was this patch tested?

Added a regression test in AsyncProgressTrackingMicroBatchExecutionSuite that triggers a real I/O failure on the first offset write and verifies:

  1. The shared first-error reference is populated.
  2. A follow-up commit-log write short-circuits with the original error and produces no commit file.
  3. A follow-up offset-log write for the next batch also short-circuits with the same first error.
  4. The shared error reference is not overwritten by later cascading failures.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor (Claude Opus 4.7)

… async progress tracking

### What changes were proposed in this pull request?

Introduce a shared `AtomicReference[Throwable]` (the first async write
error) that is consulted at the start of every async write task in both
`AsyncOffsetSeqLog` and `AsyncCommitLog`. Once the first failure is
recorded:

- Subsequent offset and commit log write tasks short-circuit by failing
  with the original error without touching durable storage.
- The first error is preserved via `compareAndSet(null, err)` so it is
  not overwritten by later cascading failures.

The shared reference is owned by `AsyncProgressTrackingMicroBatchExecution`
and threaded through `AsyncStreamingQueryCheckpointMetadata` to both
async logs. The existing `.exceptionally` handlers in
`markMicroBatchStart` / `markMicroBatchEnd` are routed through a new
`recordAsyncWriteError` helper so the shared ref is also populated when
the failure originates from `addAsync`'s own `thenApply` (e.g.
`concurrentStreamLogUpdate`).

### Why are the changes needed?

When async progress tracking is enabled, offset and commit log writes
are submitted to a single-threaded executor service. If one async write
task fails, follow-up writes already queued (or queued shortly afterward)
can still proceed and persist files to durable storage, leaving
inconsistent state on disk — for example, an offset entry for batch N
missing while batch N+1 is present, or a commit-log entry written
without its corresponding offset-log entry.

The original error can also be overwritten in the `ErrorNotifier` by a
later cascading failure, so the user-visible exception masks the root
cause (e.g. surfacing `concurrentStreamLogUpdate` instead of the actual
`Permission denied` / IOException that started the cascade).

### Does this PR introduce _any_ user-facing change?

No. Async-progress-tracking queries that hit a write failure will now
surface the root cause instead of a later cascading error, but the
failure mode (query termination with a `StreamingQueryException`) is
unchanged.

### How was this patch tested?

Added a regression test in `AsyncProgressTrackingMicroBatchExecutionSuite`
that triggers a real I/O failure on the first offset write and verifies:

1. The shared first-error reference is populated.
2. A follow-up commit-log write short-circuits with the original error
   and produces no commit file.
3. A follow-up offset-log write for the next batch also short-circuits
   with the same first error.
4. The shared error reference is not overwritten by later cascading
   failures.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor (Claude Opus 4.7)
@eason-yuchen-liu eason-yuchen-liu changed the title [SPARK][SS] Fail subsequent async log writes after a prior failure in async progress tracking [SPARK-56720][SS] Fail subsequent async log writes after a prior failure in async progress tracking May 4, 2026
// Records the first error seen by any async log write task. Subsequent async
// log writes short-circuit by failing with this error before touching storage.
// This prevents creating gaps on durable storage (e.g. offset N missing while
// offset N+1 is present, or commit N+1 written without offset N) when an
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents creating gaps on durable storage (e.g. offset N missing while
// offset N+1 is present, or commit N+1 written without offset N

There can be gaps in terms of batches when the APT interval is set but what cannot happen is commit log for batch n is written but not offset log for batch n.

@@ -142,6 +143,14 @@ class AsyncOffsetSeqLog(
} else {
executorService.submit(new Runnable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"executorService.submit" returns a future. Can we just check the status of the future on the next log write? Any exception thrown within the task would be bubbled up by checking the completion status of the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is valid alternative. The difference is that the futures are created at runtime, and we need to pass the previous future to the next submit to form a perfect chain, which may be prone to error IMO. Honestly, compared to the current approach, I do not see an advantage of this alternative. Please let me know if you think it is a strictly better approach.

offsetCommitIntervalMs: Long,
clock: Clock = new SystemClock())
clock: Clock = new SystemClock(),
asyncWriteError: AtomicReference[Throwable] = new AtomicReference[Throwable]())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't just re-use the error notifier for this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good idea.


private def recordAsyncWriteError(th: Throwable): Unit = {
firstAsyncWriteError.compareAndSet(null, th)
errorNotifier.markError(th)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use the errorNotifier?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good idea. We can avoid creating a new variable.

Copy link
Copy Markdown
Contributor Author

@eason-yuchen-liu eason-yuchen-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Addressed the comment to reuse errorNotifier.


private def recordAsyncWriteError(th: Throwable): Unit = {
firstAsyncWriteError.compareAndSet(null, th)
errorNotifier.markError(th)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good idea. We can avoid creating a new variable.

offsetCommitIntervalMs: Long,
clock: Clock = new SystemClock())
clock: Clock = new SystemClock(),
asyncWriteError: AtomicReference[Throwable] = new AtomicReference[Throwable]())
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good idea.

@@ -142,6 +143,14 @@ class AsyncOffsetSeqLog(
} else {
executorService.submit(new Runnable {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is valid alternative. The difference is that the futures are created at runtime, and we need to pass the previous future to the next submit to form a perfect chain, which may be prone to error IMO. Honestly, compared to the current approach, I do not see an advantage of this alternative. Please let me know if you think it is a strictly better approach.

@eason-yuchen-liu eason-yuchen-liu requested a review from jerrypeng May 6, 2026 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants