Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,52 @@ case class Scd1BatchProcessor(
)
}

/**
* Left anti-join the microbatch with the auxiliary table on tombstones that match against and
* effectively delete late-arriving upserts (or stale deletes).
*
* @param microbatchDf The incoming microbatch dataframe with at minimum all of the key
* columns + CDC metadata column.
* @param auxiliaryTableDf Dataframe representing the auxiliary table, with at minimum the key
* columns + CDC metadata column.
*
* The returned filtered dataframe has the same schema as the input microbatch, but with only
* the rows that remain unaffected by any known tombstones.
*/
def applyTombstonesToMicrobatch(
microbatchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = {
val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")

val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName

val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
val effectiveSeq = F.greatest(
Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata),
Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)
)
val tombstoneDeleteSeq =
Scd1BatchProcessor.deleteSequenceOf(F.col(s"auxiliaryTable.$cdcMetadata"))

val keysMatch = changeArgs.keys
.map { k =>
F.col(s"microbatch.${k.quoted}") === F.col(s"auxiliaryTable.${k.quoted}")
}
.reduce(_ && _)

// A microbatch row is considered late-arriving (and therefore deleted by the tombstone) when
// the auxiliary table holds a tombstone for the same key with a strictly larger delete
// sequence. Both late-arriving upserts and deletes are dropped.
val microbatchRowDeletedByTombstone = effectiveSeq < tombstoneDeleteSeq

aliasedMicrobatchDf.join(
right = aliasedAuxiliaryTableDf,
joinExprs = keysMatch && microbatchRowDeletedByTombstone,
joinType = "left_anti"
)
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
Expand Down Expand Up @@ -194,6 +240,14 @@ object Scd1BatchProcessor {
private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"

/** Project the delete sequence out of the CDC metadata column. */
private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
cdcMetadataCol.getField(cdcDeleteSequenceFieldName)

/** Project the upsert sequence out of the CDC metadata column. */
private[autocdc] def upsertSequenceOf(cdcMetadataCol: Column): Column =
cdcMetadataCol.getField(cdcUpsertSequenceFieldName)

/**
* Schema of the CDC metadata struct column for SCD1.
*/
Expand Down
Loading