[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback#56016
[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback#56016AnishMahto wants to merge 11 commits into
Conversation
9b6544d to
971ed44
Compare
971ed44 to
4ac762e
Compare
|
Suggestion: centralize microbatch pipeline ordering on
Spark-style library code usually hides that ordering behind a single entry point rather than exposing every step. For example: // Scd1BatchProcessor – public API
def reconcileMicrobatch(
batchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = { /* validate? */ ... }
// Scd1ForeachBatchExec
def execute(batchDf: DataFrame, batchId: Long): Unit = {
ScdBatchValidator(...).validateMicrobatch()
val reconciled = batchProcessor.reconcileMicrobatch(
batchDf,
batchDf.sparkSession.read.table(auxiliaryTableIdentifier.quotedString))
batchProcessor.mergeMicrobatchOntoAuxiliaryTable(reconciled, auxiliaryTableIdentifier)
batchProcessor.mergeMicrobatchOntoTarget(reconciled, targetTableIdentifier)
}
That preserves testability without exposing a composable-but-unsafe public pipeline API. Minor naming nit: |
| ) | ||
|
|
||
| batchProcessor.mergeMicrobatchOntoTarget( | ||
| reconciledMicrobatchDf = reconciledMicrobatch, |
There was a problem hiding this comment.
After mergeMicrobatchOntoAuxiliaryTable succeeds, a failure in mergeMicrobatchOntoTarget could leave aux updated while the target is not (or vice versa on partial retry, depending on foreachBatch semantics).
What is the expected recovery story here—idempotent replay of the same batchId, manual repair, or a future transactional wrapper? Worth a short note in scaladoc or SPIP follow-up?
There was a problem hiding this comment.
Yeah this is still idempotent even if we fail in between the auxiliary table merge and the target merge.
In the auxiliary table merge, we only delete previous tombstones iff they are either be replaced by a newer tombstone or an upsert - in either case the implication is the microbatch must contain a newer event that renders the previous tombstone stale.
Hence on microbatch replay, it doesn't matter whether those (now stale) tombstones are still present in the auxiliary table or not. I'll leave a comment about this in the code.
|
|
||
| val reconciledMicrobatch = batchProcessor.applyTombstonesToMicrobatch( | ||
| microbatchDf = projectedMicrobatch, | ||
| auxiliaryTableDf = batchDf.sparkSession.read.table( |
There was a problem hiding this comment.
This reads the full auxiliary table on every microbatch for tombstone application. Is that intentional (aux expected to stay small), or should we plan a follow-up—e.g. project to keys + __spark_autocdc_metadata only, or key-pruned reads as the aux table grows?
There was a problem hiding this comment.
Yep this is a good question. We do indeed expect the auxiliary table to be small, especially for SCD1 - there can be at most one tombstone per key in the universe of possible keys, and if a row is upserted to post deletion, the tombstone is GC'd.
It is worth mentioning if a row is deleted in the upstream source and never touched again (not totally uncommon, i.e row is permanently deleted), its tombstone will continue to live on indefinitely - necessary for correctness, but there are future paths where we can consider a TTL for tombstone rows to eventually clean them up. This would be a correctness vs time/space efficiency tradeoff.
Anyhow given that we expect the auxiliary table to generally be small, we should expect this join to typically use a broadcast join - should be relatively fast.
That being said I agree there's room for spark engine based optimization such as pruning/clustering for rarer cases where the auxiliary table does grow larger in size. I'll leave a follow up comment.
|
I left some comments in the code explaining concepts we discussed in these threads. Accepted code suggestions. By the way in the near future I'm hoping to merge some automated/fuzz testing for idempotency, to provide additional signal for the idempotency argument (and prevent idempotency regression). The idea would be to compose two different |
szehon-ho
left a comment
There was a problem hiding this comment.
lgtm! I left some nits, but let's tackle in follow up if they make sense as they are just some minor cleanup
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
This is a stacked PR. Review incremental diff here: AnishMahto/spark@SPARK-56927-SCD1-merge-microbatch-onto-target...SPARK-56953-SCD1-foreachbatch-callback
Incremental diff that includes [SPARK-56953] [SPARK-56927][SPARK-56923]: AnishMahto/spark@SPARK-56249-merge-tombstones-onto-microbatch...SPARK-56953-SCD1-foreachbatch-callback
Preamble:
The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.
foreachBatch Callback:
Implementation of the actual callback that will be passed into the foreachBatch streaming query for SCD1 flows.
The callback orchestrates microbatch validation and calling all of the
Scd1BatchProcessorhelpers to execute the complete reconciliation of a change-data-feed microbatch onto the auxiliary and target tables.Introduce Scd1ForeachBatchExecSuite to exercise E2E business logic unit tests. In the future when we hook this up to an actual flow execution, we will introduce E2E integration tests that can additionally verify schema evolution, interaction with other dataflow graph entities, etc.
Additionally refactor some test helper functions into a shared
AutoCdcCatalogExecutionTestBase.