Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,29 @@
],
"sqlState" : "22023"
},
"AUTOCDC_MICROBATCH_VALIDATION" : {
"message" : [
"AutoCDC flow on table <tableName> in batch <batchId> failed microbatch validation."
],
"subClass" : {
"NON_ORDERABLE_SEQUENCE" : {
"message" : [
"The sequencing column has non-orderable type <dataType>. The sequencing column must be of a type that supports ordering."
]
},
"NULL_KEY" : {
"message" : [
"The microbatch contains rows with null values in the following key column(s): <nullKeyCounts>. All rows must have non-null values for every key column."
]
},
"NULL_SEQUENCE" : {
"message" : [
"The microbatch contains <nullCount> row(s) with a null sequencing value. All rows must have a non-null sequencing value."
]
}
},
"sqlState" : "22000"
},
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc
import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.types.{DataType, StructField, StructType}
Expand All @@ -37,6 +38,45 @@ case class Scd1BatchProcessor(
changeArgs: ChangeArgs,
resolvedSequencingType: DataType) {

/**
* Reconcile a CDC microbatch into the canonical form that the auxiliary- and target-table
* merges consume. Composes the per-step transforms in the only order that produces correct
* SCD1 semantics:
*
* 1. [[deduplicateMicrobatch]]: collapse same-key events to the latest by sequence.
* 2. [[extendMicrobatchRowsWithCdcMetadata]]: project the operational `_cdc_metadata` column
* (must run before column selection, which may drop inputs the metadata expressions
* reference).
* 3. [[projectTargetColumnsOntoMicrobatch]]: apply the user-defined column selection while
* preserving the CDC metadata column.
* 4. [[applyTombstonesToMicrobatch]]: filter out late-arriving events superseded by
* tombstones already recorded in the auxiliary table.
*
* The per-step methods are kept package-visible so that focused unit tests can pin each
* transform's behavior independently. This method itself is package-visible so that
* [[Scd1ForeachBatchHandler]] can call it after running [[ScdBatchValidator.validateMicrobatch]]
* - validation is intentionally not folded in here, as it must run before any of these
* transforms touch the data.
*
* @param batchDf The validated incoming CDC microbatch.
* @param auxiliaryTableDf A snapshot of the auxiliary table for tombstone reconciliation.
* Must contain at minimum the key columns + `_cdc_metadata`.
* @return The reconciled microbatch, ready to be merged onto both tables.
*/
private[autocdc] def reconcileMicrobatch(
batchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = {
val deduplicated = deduplicateMicrobatch(validatedMicrobatch = batchDf)
val withCdcMetadata = extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch = deduplicated)
val projected = projectTargetColumnsOntoMicrobatch(
microbatchWithCdcMetadataDf = withCdcMetadata
)
applyTombstonesToMicrobatch(
microbatchDf = projected,
auxiliaryTableDf = auxiliaryTableDf
)
}

/**
* Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key
* as ordered by [[ChangeArgs.sequencing]].
Expand All @@ -51,7 +91,7 @@ case class Scd1BatchProcessor(
*
* The schema of the returned dataframe matches the schema of the microbatch exactly.
*/
def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
private[autocdc] def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
// The `max_by` API can only return a single column, so pack/unpack the entire row into a
// temporary column before and after the `max_by` operation.
val winningRowCol = Scd1BatchProcessor.winningRowColName
Expand Down Expand Up @@ -88,7 +128,8 @@ case class Scd1BatchProcessor(
* The returned dataframe has all of the columns in the input microbatch + the CDC metadata
* column.
*/
def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = {
private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
validatedMicrobatch: DataFrame): DataFrame = {
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
validateCdcMetadataColumnNotPresent(validatedMicrobatch)

Expand Down Expand Up @@ -123,7 +164,8 @@ case class Scd1BatchProcessor(
* Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per
* [[ChangeArgs.columnSelection]] + the CDC metadata column.
*/
def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
private[autocdc] def projectTargetColumnsOntoMicrobatch(
microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
val caseSensitiveColumnComparison =
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis

Expand Down Expand Up @@ -178,7 +220,7 @@ case class Scd1BatchProcessor(
* The returned filtered dataframe has the same schema as the input microbatch, but with only
* the rows that remain unaffected by any known tombstones.
*/
def applyTombstonesToMicrobatch(
private[autocdc] def applyTombstonesToMicrobatch(
microbatchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = {
val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
Expand Down Expand Up @@ -212,6 +254,147 @@ case class Scd1BatchProcessor(
)
}

/**
* Merge the reconciled (deduplicated per key) microbatch onto the auxiliary table,
* advancing or deleting existing tombstones and inserting new tombstones for previously
* untracked keys.
*
* After the merge, the auxiliary table has the same schema as before, but with the latest
* tombstone data per key.
*
* @param reconciledMicrobatchDf The deduplicated microbatch.
* @param auxiliaryTableIdentifier The identifier of the auxiliary table.
*/
private[autocdc] def mergeMicrobatchOntoAuxiliaryTable(
reconciledMicrobatchDf: DataFrame,
auxiliaryTableIdentifier: TableIdentifier
): Unit = {
val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
val meta = Scd1BatchProcessor.cdcMetadataColName

// Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are
// irrelevant for the auxiliary table and should not be persisted.
val reducedMicrobatch = reconciledMicrobatchDf
.select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*)
.as("reducedMicrobatch")

val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`")
val incomingDelete: Column = Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata)
val incomingUpsert: Column = Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)

val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`")
val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata)

val doKeysMatch = changeArgs.keys
.map(k => F.col(s"reducedMicrobatch.${k.quoted}") === F.col(s"$auxIdentQuoted.${k.quoted}"))
.reduce(_ && _)

val incomingRowRepresentsDeleteEvent =
incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > incomingUpsert)

reducedMicrobatch
.mergeInto(auxIdentQuoted, doKeysMatch)
// Incoming delete is newer than the stored one: advance the high-water mark.
.whenMatched(
incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete
)
.update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata))
// Incoming upsert is newer than the stored delete: the key was re-inserted after the
// delete, so the aux tombstone is stale - remove it to prevent unbounded growth.
.whenMatched(
!incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete
)
.delete()
// New delete for a key not yet tracked, add it to auxiliary table. Note that in the
// reconciled microbatch, there is at most one event for key, which represents the latest
// known event for the key. If the latest known event is a delete, it must be a tombstone.
.whenNotMatched(incomingRowRepresentsDeleteEvent)
.insertAll()
.merge()
}

/**
* Merge the reconciled (deduplicated, tombstone applied, and column selection + metadata
* column projected) microbatch onto the target table, as per SCD1 semantics.
*
* Microbatch invariants:
* - Exactly one of {upsert, delete} version is non-null, the other is null.
* - There is at most one event per key, representing the latest known event for the key
* across the microbatch and auxiliary table.
*
* Target table invariants:
* - Target table only contains live rows; delete sequence is always null, upsert sequence
* is always non-null.
*
* @param reconciledMicrobatchDf The reconciled microbatch dataframe.
* @param targetTableIdentifier The identifier of the target table.
*/
private[autocdc] def mergeMicrobatchOntoTarget(
reconciledMicrobatchDf: DataFrame,
targetTableIdentifier: TableIdentifier
): Unit = {
val meta = Scd1BatchProcessor.cdcMetadataColName

val destinationTableStr = targetTableIdentifier.quotedString
// (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge.
val microbatchDf = reconciledMicrobatchDf.as("microbatch")

val microbatchCdcMetadataCol = F.col(s"microbatch.`$meta`")
val destinationCdcMetadataCol =
F.col(s"$destinationTableStr.`$meta`")

val microbatchDeleteVersionField =
Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadataCol)
val microbatchUpsertVersionField =
Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadataCol)
val destinationUpsertVersionField =
Scd1BatchProcessor.upsertSequenceOf(destinationCdcMetadataCol)

val keysMatch = changeArgs.keys
.map(k =>
F.col(s"microbatch.${k.quoted}") === F.col(s"$destinationTableStr.${k.quoted}")
)
.reduce(_ && _)

// Upsert beats existing row if incoming upsert sequence is geq to the upsert sequence on
// the target.
val incomingWinsUpsert = microbatchUpsertVersionField.isNotNull &&
microbatchUpsertVersionField >= destinationUpsertVersionField

// Delete beats existing row if delete sequencing is strictly greater than the upsert
// sequence on the target. This is an arbitrary but deliberate choice to maintain that
// upserts get priority over deletes on duplicate sequencing.
val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
microbatchDeleteVersionField > destinationUpsertVersionField

// When the incoming upsert wins against an existing record, the entire row (all columns)
// will be overwritten, including the CDC metadata column. We only exclude keys because
// most merge implementations require that join columns are not being mutated, even if
// the mutation is a no-op.
val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
val keyNames = changeArgs.keys.map(_.name)
val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
microbatchDf.columns
.filterNot(c => keyNames.exists(resolver(_, c)))
.map { c =>
val quotedCol = QuotingUtils.quoteIdentifier(c)
s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
}
.toMap

microbatchDf
.mergeInto(destinationTableStr, keysMatch)
.whenMatched(incomingWinsDelete)
.delete()
.whenMatched(incomingWinsUpsert)
.update(columnsToUpdateWhenIncomingWinsUpsert)
// New key: only insert upserts; deletes for absent keys are no-ops for the target table
// merge, and instead would have been inserted as tombstones into the auxiliary table.
.whenNotMatched(microbatchDeleteVersionField.isNull)
.insertAll()
.merge()
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.classic.DataFrame

/**
* Exposes an API to execute one SCD Type 1 AutoCDC microbatch reconciliation on a
* foreachBatch streaming query.
*/
case class Scd1ForeachBatchHandler(
batchProcessor: Scd1BatchProcessor,
auxiliaryTableIdentifier: TableIdentifier,
targetTableIdentifier: TableIdentifier) {

/**
* Process a single CDC microbatch and merge it into the auxiliary and target tables.
*
* Idempotent under same-`batchId` replay: both merges are gated on sequence inequalities,
* so a partial failure between them is reconciled correctly when foreachBatch retries the
* whole batch.
*/
def execute(batchDf: DataFrame, batchId: Long): Unit = {
ScdBatchValidator(
destinationIdentifier = targetTableIdentifier,
changeArgs = batchProcessor.changeArgs,
batchDf = batchDf,
batchId = batchId
).validateMicrobatch()

val reconciledMicrobatch = batchProcessor.reconcileMicrobatch(
batchDf = batchDf,
// Aux holds at most one row per currently-active tombstone (revived keys are GC'd
// by mergeMicrobatchOntoAuxiliaryTable), so it generally stays small enough for a broadcast
// join. Future optimizations: key-pruned reads, table format-aware clustering and tombstone
// TTL.
auxiliaryTableDf = batchDf.sparkSession.read.table(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads the full auxiliary table on every microbatch for tombstone application. Is that intentional (aux expected to stay small), or should we plan a follow-up—e.g. project to keys + __spark_autocdc_metadata only, or key-pruned reads as the aux table grows?

Copy link
Copy Markdown
Contributor Author

@AnishMahto AnishMahto May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this is a good question. We do indeed expect the auxiliary table to be small, especially for SCD1 - there can be at most one tombstone per key in the universe of possible keys, and if a row is upserted to post deletion, the tombstone is GC'd.

It is worth mentioning if a row is deleted in the upstream source and never touched again (not totally uncommon, i.e row is permanently deleted), its tombstone will continue to live on indefinitely - necessary for correctness, but there are future paths where we can consider a TTL for tombstone rows to eventually clean them up. This would be a correctness vs time/space efficiency tradeoff.

Anyhow given that we expect the auxiliary table to generally be small, we should expect this join to typically use a broadcast join - should be relatively fast.

That being said I agree there's room for spark engine based optimization such as pruning/clustering for rarer cases where the auxiliary table does grow larger in size. I'll leave a follow up comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

auxiliaryTableIdentifier.quotedString
)
)

batchProcessor.mergeMicrobatchOntoAuxiliaryTable(
reconciledMicrobatchDf = reconciledMicrobatch,
auxiliaryTableIdentifier = auxiliaryTableIdentifier
)

// Failure between these two merges is safe under foreachBatch retry: the aux merge
// only ever mutates a tombstone when this batch's event makes it stale (strictly newer
// delete advances it) or redundant (`>=` upsert revives the key, GC'ing the tombstone),
// so on retry those preconditions no longer hold against the just-advanced aux state -
// the aux merge is a no-op and the target merge replays as if for the first time.
batchProcessor.mergeMicrobatchOntoTarget(
reconciledMicrobatchDf = reconciledMicrobatch,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After mergeMicrobatchOntoAuxiliaryTable succeeds, a failure in mergeMicrobatchOntoTarget could leave aux updated while the target is not (or vice versa on partial retry, depending on foreachBatch semantics).

What is the expected recovery story here—idempotent replay of the same batchId, manual repair, or a future transactional wrapper? Worth a short note in scaladoc or SPIP follow-up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still idempotent even if we fail in between the auxiliary table merge and the target merge.

In the auxiliary table merge, we only delete previous tombstones iff they are either be replaced by a newer tombstone or an upsert - in either case the implication is the microbatch must contain a newer event that renders the previous tombstone stale.

Hence on microbatch replay, it doesn't matter whether those (now stale) tombstones are still present in the auxiliary table or not. I'll leave a comment about this in the code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

targetTableIdentifier = targetTableIdentifier
)
}
}
Loading