-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback #56016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
79ea719
9b16dce
496fe08
d7f2e7d
d0734dc
d144034
00e8cb1
5293b60
4ac762e
6763609
119e87b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.pipelines.autocdc | ||
|
|
||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.classic.DataFrame | ||
|
|
||
| /** | ||
| * Exposes an API to execute one SCD Type 1 AutoCDC microbatch reconciliation on a | ||
| * foreachBatch streaming query. | ||
| */ | ||
| case class Scd1ForeachBatchHandler( | ||
| batchProcessor: Scd1BatchProcessor, | ||
| auxiliaryTableIdentifier: TableIdentifier, | ||
| targetTableIdentifier: TableIdentifier) { | ||
|
|
||
| /** | ||
| * Process a single CDC microbatch and merge it into the auxiliary and target tables. | ||
| * | ||
| * Idempotent under same-`batchId` replay: both merges are gated on sequence inequalities, | ||
| * so a partial failure between them is reconciled correctly when foreachBatch retries the | ||
| * whole batch. | ||
| */ | ||
| def execute(batchDf: DataFrame, batchId: Long): Unit = { | ||
| ScdBatchValidator( | ||
| destinationIdentifier = targetTableIdentifier, | ||
| changeArgs = batchProcessor.changeArgs, | ||
| batchDf = batchDf, | ||
| batchId = batchId | ||
| ).validateMicrobatch() | ||
|
|
||
| val reconciledMicrobatch = batchProcessor.reconcileMicrobatch( | ||
| batchDf = batchDf, | ||
| // Aux holds at most one row per currently-active tombstone (revived keys are GC'd | ||
| // by mergeMicrobatchOntoAuxiliaryTable), so it generally stays small enough for a broadcast | ||
| // join. Future optimizations: key-pruned reads, table format-aware clustering and tombstone | ||
| // TTL. | ||
| auxiliaryTableDf = batchDf.sparkSession.read.table( | ||
| auxiliaryTableIdentifier.quotedString | ||
| ) | ||
| ) | ||
|
|
||
| batchProcessor.mergeMicrobatchOntoAuxiliaryTable( | ||
| reconciledMicrobatchDf = reconciledMicrobatch, | ||
| auxiliaryTableIdentifier = auxiliaryTableIdentifier | ||
| ) | ||
|
|
||
| // Failure between these two merges is safe under foreachBatch retry: the aux merge | ||
| // only ever mutates a tombstone when this batch's event makes it stale (strictly newer | ||
| // delete advances it) or redundant (`>=` upsert revives the key, GC'ing the tombstone), | ||
| // so on retry those preconditions no longer hold against the just-advanced aux state - | ||
| // the aux merge is a no-op and the target merge replays as if for the first time. | ||
| batchProcessor.mergeMicrobatchOntoTarget( | ||
| reconciledMicrobatchDf = reconciledMicrobatch, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After What is the expected recovery story here—idempotent replay of the same
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is still idempotent even if we fail in between the auxiliary table merge and the target merge. In the auxiliary table merge, we only delete previous tombstones iff they are either be replaced by a newer tombstone or an upsert - in either case the implication is the microbatch must contain a newer event that renders the previous tombstone stale. Hence on microbatch replay, it doesn't matter whether those (now stale) tombstones are still present in the auxiliary table or not. I'll leave a comment about this in the code.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks! |
||
| targetTableIdentifier = targetTableIdentifier | ||
| ) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads the full auxiliary table on every microbatch for tombstone application. Is that intentional (aux expected to stay small), or should we plan a follow-up—e.g. project to keys +
__spark_autocdc_metadataonly, or key-pruned reads as the aux table grows?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep this is a good question. We do indeed expect the auxiliary table to be small, especially for SCD1 - there can be at most one tombstone per key in the universe of possible keys, and if a row is upserted to post deletion, the tombstone is GC'd.
It is worth mentioning if a row is deleted in the upstream source and never touched again (not totally uncommon, i.e row is permanently deleted), its tombstone will continue to live on indefinitely - necessary for correctness, but there are future paths where we can consider a TTL for tombstone rows to eventually clean them up. This would be a correctness vs time/space efficiency tradeoff.
Anyhow given that we expect the auxiliary table to generally be small, we should expect this join to typically use a broadcast join - should be relatively fast.
That being said I agree there's room for spark engine based optimization such as pruning/clustering for rarer cases where the auxiliary table does grow larger in size. I'll leave a follow up comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm!