[SPARK-56720][SS] Fail subsequent async log writes after a prior failure in async progress tracking#55676
Conversation
… async progress tracking ### What changes were proposed in this pull request? Introduce a shared `AtomicReference[Throwable]` (the first async write error) that is consulted at the start of every async write task in both `AsyncOffsetSeqLog` and `AsyncCommitLog`. Once the first failure is recorded: - Subsequent offset and commit log write tasks short-circuit by failing with the original error without touching durable storage. - The first error is preserved via `compareAndSet(null, err)` so it is not overwritten by later cascading failures. The shared reference is owned by `AsyncProgressTrackingMicroBatchExecution` and threaded through `AsyncStreamingQueryCheckpointMetadata` to both async logs. The existing `.exceptionally` handlers in `markMicroBatchStart` / `markMicroBatchEnd` are routed through a new `recordAsyncWriteError` helper so the shared ref is also populated when the failure originates from `addAsync`'s own `thenApply` (e.g. `concurrentStreamLogUpdate`). ### Why are the changes needed? When async progress tracking is enabled, offset and commit log writes are submitted to a single-threaded executor service. If one async write task fails, follow-up writes already queued (or queued shortly afterward) can still proceed and persist files to durable storage, leaving inconsistent state on disk — for example, an offset entry for batch N missing while batch N+1 is present, or a commit-log entry written without its corresponding offset-log entry. The original error can also be overwritten in the `ErrorNotifier` by a later cascading failure, so the user-visible exception masks the root cause (e.g. surfacing `concurrentStreamLogUpdate` instead of the actual `Permission denied` / IOException that started the cascade). ### Does this PR introduce _any_ user-facing change? No. Async-progress-tracking queries that hit a write failure will now surface the root cause instead of a later cascading error, but the failure mode (query termination with a `StreamingQueryException`) is unchanged. ### How was this patch tested? Added a regression test in `AsyncProgressTrackingMicroBatchExecutionSuite` that triggers a real I/O failure on the first offset write and verifies: 1. The shared first-error reference is populated. 2. A follow-up commit-log write short-circuits with the original error and produces no commit file. 3. A follow-up offset-log write for the next batch also short-circuits with the same first error. 4. The shared error reference is not overwritten by later cascading failures. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor (Claude Opus 4.7)
| // Records the first error seen by any async log write task. Subsequent async | ||
| // log writes short-circuit by failing with this error before touching storage. | ||
| // This prevents creating gaps on durable storage (e.g. offset N missing while | ||
| // offset N+1 is present, or commit N+1 written without offset N) when an |
There was a problem hiding this comment.
This prevents creating gaps on durable storage (e.g. offset N missing while
// offset N+1 is present, or commit N+1 written without offset N
There can be gaps in terms of batches when the APT interval is set but what cannot happen is commit log for batch n is written but not offset log for batch n.
| @@ -142,6 +143,14 @@ class AsyncOffsetSeqLog( | |||
| } else { | |||
| executorService.submit(new Runnable { | |||
There was a problem hiding this comment.
"executorService.submit" returns a future. Can we just check the status of the future on the next log write? Any exception thrown within the task would be bubbled up by checking the completion status of the future.
There was a problem hiding this comment.
I think it is valid alternative. The difference is that the futures are created at runtime, and we need to pass the previous future to the next submit to form a perfect chain, which may be prone to error IMO. Honestly, compared to the current approach, I do not see an advantage of this alternative. Please let me know if you think it is a strictly better approach.
| offsetCommitIntervalMs: Long, | ||
| clock: Clock = new SystemClock()) | ||
| clock: Clock = new SystemClock(), | ||
| asyncWriteError: AtomicReference[Throwable] = new AtomicReference[Throwable]()) |
There was a problem hiding this comment.
Is there a reason we don't just re-use the error notifier for this case?
There was a problem hiding this comment.
Yep, good idea.
|
|
||
| private def recordAsyncWriteError(th: Throwable): Unit = { | ||
| firstAsyncWriteError.compareAndSet(null, th) | ||
| errorNotifier.markError(th) |
There was a problem hiding this comment.
Why not just use the errorNotifier?
There was a problem hiding this comment.
Yep, good idea. We can avoid creating a new variable.
eason-yuchen-liu
left a comment
There was a problem hiding this comment.
Thanks for the review. Addressed the comment to reuse errorNotifier.
|
|
||
| private def recordAsyncWriteError(th: Throwable): Unit = { | ||
| firstAsyncWriteError.compareAndSet(null, th) | ||
| errorNotifier.markError(th) |
There was a problem hiding this comment.
Yep, good idea. We can avoid creating a new variable.
| offsetCommitIntervalMs: Long, | ||
| clock: Clock = new SystemClock()) | ||
| clock: Clock = new SystemClock(), | ||
| asyncWriteError: AtomicReference[Throwable] = new AtomicReference[Throwable]()) |
There was a problem hiding this comment.
Yep, good idea.
| @@ -142,6 +143,14 @@ class AsyncOffsetSeqLog( | |||
| } else { | |||
| executorService.submit(new Runnable { | |||
There was a problem hiding this comment.
I think it is valid alternative. The difference is that the futures are created at runtime, and we need to pass the previous future to the next submit to form a perfect chain, which may be prone to error IMO. Honestly, compared to the current approach, I do not see an advantage of this alternative. Please let me know if you think it is a strictly better approach.
What changes were proposed in this pull request?
Introduce a shared
AtomicReference[Throwable](the first async write error) that is consulted at the start of every async write task in bothAsyncOffsetSeqLogandAsyncCommitLog. Once the first failure is recorded:compareAndSet(null, err)so it is not overwritten by later cascading failures.The shared reference is owned by
AsyncProgressTrackingMicroBatchExecutionand threaded throughAsyncStreamingQueryCheckpointMetadatato both async logs. The existing.exceptionallyhandlers inmarkMicroBatchStart/markMicroBatchEndare routed through a newrecordAsyncWriteErrorhelper so the shared ref is also populated when the failure originates fromaddAsync's ownthenApply(e.g.concurrentStreamLogUpdate).Why are the changes needed?
When async progress tracking is enabled, offset and commit log writes are submitted to a single-threaded executor service. If one async write task fails, follow-up writes already queued (or queued shortly afterward) can still proceed and persist files to durable storage, leaving inconsistent state on disk — for example, an offset entry for batch N missing while batch N+1 is present, or a commit-log entry written without its corresponding offset-log entry.
The original error can also be overwritten in the
ErrorNotifierby a later cascading failure, so the user-visible exception masks the root cause (e.g. surfacingconcurrentStreamLogUpdateinstead of the actualPermission denied/ IOException that started the cascade).Does this PR introduce any user-facing change?
No. Async-progress-tracking queries that hit a write failure will now surface the root cause instead of a later cascading error, but the failure mode (query termination with a
StreamingQueryException) is unchanged.How was this patch tested?
Added a regression test in
AsyncProgressTrackingMicroBatchExecutionSuitethat triggers a real I/O failure on the first offset write and verifies:Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.7)