Skip to content

[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback#56016

Open
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56953-SCD1-foreachbatch-callback
Open

[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback#56016
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56953-SCD1-foreachbatch-callback

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 20, 2026

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


This is a stacked PR. Review incremental diff here: AnishMahto/spark@SPARK-56927-SCD1-merge-microbatch-onto-target...SPARK-56953-SCD1-foreachbatch-callback

Incremental diff that includes [SPARK-56953] [SPARK-56927][SPARK-56923]: AnishMahto/spark@SPARK-56249-merge-tombstones-onto-microbatch...SPARK-56953-SCD1-foreachbatch-callback


Preamble:

The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics.

SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.

foreachBatch Callback:

Implementation of the actual callback that will be passed into the foreachBatch streaming query for SCD1 flows.

The callback orchestrates microbatch validation and calling all of the Scd1BatchProcessor helpers to execute the complete reconciliation of a change-data-feed microbatch onto the auxiliary and target tables.

Introduce Scd1ForeachBatchExecSuite to exercise E2E business logic unit tests. In the future when we hook this up to an actual flow execution, we will introduce E2E integration tests that can additionally verify schema evolution, interaction with other dataflow graph entities, etc.

Additionally refactor some test helper functions into a shared AutoCdcCatalogExecutionTestBase.

@AnishMahto AnishMahto force-pushed the SPARK-56953-SCD1-foreachbatch-callback branch from 9b6544d to 971ed44 Compare May 22, 2026 21:43
@AnishMahto AnishMahto force-pushed the SPARK-56953-SCD1-foreachbatch-callback branch from 971ed44 to 4ac762e Compare May 23, 2026 00:33
@szehon-ho
Copy link
Copy Markdown
Member

Suggestion: centralize microbatch pipeline ordering on Scd1BatchProcessor

Scd1ForeachBatchExec.execute wires a fixed sequence of public Scd1BatchProcessor steps. That's clear today, but once this is hooked into a real foreachBatch callback, any caller could invoke those methods out of order (e.g. tombstones before CDC metadata, or merges before projection).

Spark-style library code usually hides that ordering behind a single entry point rather than exposing every step.

For example:

// Scd1BatchProcessor – public API
def reconcileMicrobatch(
    batchDf: DataFrame,
    auxiliaryTableDf: DataFrame): DataFrame = { /* validate? */ ... }

// Scd1ForeachBatchExec
def execute(batchDf: DataFrame, batchId: Long): Unit = {
  ScdBatchValidator(...).validateMicrobatch()
  val reconciled = batchProcessor.reconcileMicrobatch(
    batchDf,
    batchDf.sparkSession.read.table(auxiliaryTableIdentifier.quotedString))
  batchProcessor.mergeMicrobatchOntoAuxiliaryTable(reconciled, auxiliaryTableIdentifier)
  batchProcessor.mergeMicrobatchOntoTarget(reconciled, targetTableIdentifier)
}
  1. Add reconcileMicrobatch on Scd1BatchProcessor for validate → dedupe → metadata → projection → tombstones (validation can stay in ScdBatchValidator and be called from the exec, as above).
  2. Keep the existing step methods as private[autocdc] for unit tests in Scd1BatchProcessorSuite / merge suites.
  3. Slim Scd1ForeachBatchExec to validate → read aux → reconcileMicrobatch → merge aux → merge target.

That preserves testability without exposing a composable-but-unsafe public pipeline API. Minor naming nit: *Exec usually means a physical operator in Spark; something like Scd1ForeachBatchHandler / Scd1MicrobatchReconciler might fit pipelines better if you're open to a rename.

)

batchProcessor.mergeMicrobatchOntoTarget(
reconciledMicrobatchDf = reconciledMicrobatch,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After mergeMicrobatchOntoAuxiliaryTable succeeds, a failure in mergeMicrobatchOntoTarget could leave aux updated while the target is not (or vice versa on partial retry, depending on foreachBatch semantics).

What is the expected recovery story here—idempotent replay of the same batchId, manual repair, or a future transactional wrapper? Worth a short note in scaladoc or SPIP follow-up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still idempotent even if we fail in between the auxiliary table merge and the target merge.

In the auxiliary table merge, we only delete previous tombstones iff they are either be replaced by a newer tombstone or an upsert - in either case the implication is the microbatch must contain a newer event that renders the previous tombstone stale.

Hence on microbatch replay, it doesn't matter whether those (now stale) tombstones are still present in the auxiliary table or not. I'll leave a comment about this in the code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!


val reconciledMicrobatch = batchProcessor.applyTombstonesToMicrobatch(
microbatchDf = projectedMicrobatch,
auxiliaryTableDf = batchDf.sparkSession.read.table(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads the full auxiliary table on every microbatch for tombstone application. Is that intentional (aux expected to stay small), or should we plan a follow-up—e.g. project to keys + __spark_autocdc_metadata only, or key-pruned reads as the aux table grows?

Copy link
Copy Markdown
Contributor Author

@AnishMahto AnishMahto May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this is a good question. We do indeed expect the auxiliary table to be small, especially for SCD1 - there can be at most one tombstone per key in the universe of possible keys, and if a row is upserted to post deletion, the tombstone is GC'd.

It is worth mentioning if a row is deleted in the upstream source and never touched again (not totally uncommon, i.e row is permanently deleted), its tombstone will continue to live on indefinitely - necessary for correctness, but there are future paths where we can consider a TTL for tombstone rows to eventually clean them up. This would be a correctness vs time/space efficiency tradeoff.

Anyhow given that we expect the auxiliary table to generally be small, we should expect this join to typically use a broadcast join - should be relatively fast.

That being said I agree there's room for spark engine based optimization such as pruning/clustering for rarer cases where the auxiliary table does grow larger in size. I'll leave a follow up comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

@AnishMahto
Copy link
Copy Markdown
Contributor Author

I left some comments in the code explaining concepts we discussed in these threads. Accepted code suggestions.

By the way in the near future I'm hoping to merge some automated/fuzz testing for idempotency, to provide additional signal for the idempotency argument (and prevent idempotency regression).

The idea would be to compose two different Scd1ForeachBatchHandler, one with a stubbed faulty Scd1BatchProcessor.mergeMicrobatchOntoAuxiliaryTable, and the one with the regular Scd1ForeachBatchHandler implementation. When both handlers run on the same generated microbatch, they should produce the same output (granted the failing handler needs to run twice to recover from failure).

@AnishMahto AnishMahto requested a review from szehon-ho May 23, 2026 02:19
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! I left some nits, but let's tackle in follow up if they make sense as they are just some minor cleanup

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants