Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc
import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.types.{DataType, StructField, StructType}
Expand Down Expand Up @@ -212,6 +213,67 @@ case class Scd1BatchProcessor(
)
}

/**
* Merge the reconciled (deduplicated per key) microbatch onto the auxiliary table,
* advancing or deleting existing tombstones and inserting new tombstones for previously
* untracked keys.
*
* After the merge, the auxiliary table has the same schema as before, but with the latest
* tombstone data per key.
*
* @param reconciledMicrobatchDf The deduplicated microbatch.
* @param auxiliaryTableIdentifier The identifier (not a [[DataFrame]]) of the auxiliary
* table, as required by the `mergeInto(table, condition)`
* API.
*/
def mergeMicrobatchOntoAuxiliaryTable(
reconciledMicrobatchDf: DataFrame,
auxiliaryTableIdentifier: TableIdentifier
): Unit = {
val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
val meta = Scd1BatchProcessor.cdcMetadataColName

// Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are
// irrelevant for the auxiliary table and should not be persisted.
val reducedMicrobatch = reconciledMicrobatchDf
.select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*)
.as("reducedMicrobatch")

val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`")
val incomingDelete: Column = Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata)
val incomingUpsert: Column = Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)

val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`")
val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata)

val doKeysMatch = changeArgs.keys
.map(k => F.col(s"reducedMicrobatch.${k.quoted}") === F.col(s"$auxIdentQuoted.${k.quoted}"))
.reduce(_ && _)

val incomingRowRepresentsDeleteEvent =
incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > incomingUpsert)

reducedMicrobatch
.mergeInto(auxIdentQuoted, doKeysMatch)
// Incoming delete is newer than the stored one: advance the high-water mark.
.whenMatched(
incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete
)
.update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata))
// Incoming upsert is newer than the stored delete: the key was re-inserted after the
// delete, so the aux tombstone is stale - remove it to prevent unbounded growth.
.whenMatched(
!incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete
)
.delete()
// New delete for a key not yet tracked, add it to auxiliary table. Note that in the
// reconciled microbatch, there is at most one event for key, which represents the latest
// known event for the key. If the latest known event is a delete, it must be a tombstone.
.whenNotMatched(incomingRowRepresentsDeleteEvent)
.insertAll()
.merge()
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
Expand Down
Loading