Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,58 @@ case class Scd1BatchProcessor(
)
}

/**
* Project the user-defined column selection onto the microbatch. By this point the input
* microbatch should already have projected its CDC metadata, because it's possible that the
* user-defined column selection drops columns that are otherwise necessary to compute the
* CDC metadata.
*
* Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per
* [[ChangeArgs.columnSelection]] + the CDC metadata column.
*/
def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
val caseSensitiveColumnComparison =
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis

// The user schema is the microbatch schema after dropping the system CDC metadata column.
// We project out the system column before applying user selection and project it back in
// afterwards, so that users cannot control whether this [necessary] column shows up in the
// target table.
val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
schemaName = "microbatch",
schema = microbatchWithCdcMetadataDf.schema,
columnSelection = Some(
ColumnSelection.ExcludeColumns(
Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
)
),
caseSensitive = caseSensitiveColumnComparison
)

val userSelectedColumnsInMicrobatchSchema =
ColumnSelection.applyToSchema(
schemaName = "microbatch",
schema = userColumnsInMicrobatchSchema,
columnSelection = changeArgs.columnSelection,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: columnSelection can remove key columns (e.g. ExcludeColumns on a key, or a narrow IncludeColumns that omits keys). Will a later merge step still need those columns on this DataFrame?

If keys must remain until after merge, we should validate here (or when constructing ChangeArgs) that changeArgs.keys are not dropped. If merge runs before projection, or keys are re-injected elsewhere, could you add a brief note in the scaladoc on the expected pipeline order?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep we do require all keys remain in the column selection.

I added that validation in this PR during flow analysis time (well before flow execution, which is when this would actually be called) - see requireKeysPresentInSelectedSchema.

Flow analysis must always be done before flow execution, so there's no need to do additional user-friendly validation in this internal flow execution step. For unit testing purposes if a test is incorrectly setup, spark will just throw an unresolved column exception.

caseSensitive = caseSensitiveColumnComparison
)

// In addition to the explicit user-selected columns, re-project the operational CDC metadata
// column as the last column.
val finalColumnsInMicrobatchToSelect =
userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => {
// Spark drops backticks in the schema, quote all identifiers for safety before executing
// select. Identifiers could have special characters such as '.'.
F.col(QuotingUtils.quoteIdentifier(colName))
}) :+ F.col(
Scd1BatchProcessor.cdcMetadataColName
)

microbatchWithCdcMetadataDf.select(
finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
)
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ import org.apache.spark.sql.types._

class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {

/**
* Test Schema for a microbatch that already has the SCD1 CDC metadata column projected.
*/
private val microbatchWithCdcMetadataSchema: StructType = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("age", IntegerType)
.add(
Scd1BatchProcessor.cdcMetadataColName,
new StructType()
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
)

/** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */
private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
Expand Down Expand Up @@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
)
}
}

test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " +
Comment thread
AnishMahto marked this conversation as resolved.
"when columnSelection is None") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
Row(1, "alice", 30, Row(null, 10L)),
Row(2, "bob", 25, Row(20L, null))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
columnSelection = None
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

// None selection is no-op on the user columns, and the CDC metadata column is unconditionally
// re-projected last, so the output shape exactly matches the input.
assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq)
checkAnswer(
df = result,
expectedAnswer = Seq(
Row(1, "alice", 30, Row(null, 10L)),
Row(2, "bob", 25, Row(20L, null))
)
)
}

test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " +
"IncludeColumns does not contain it") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
Row(1, "alice", 30, Row(null, 10L))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
columnSelection = Some(
ColumnSelection.IncludeColumns(
Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
)
)
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

assert(result.schema.fieldNames.toSeq ==
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, 30, Row(null, 10L))
)
}

test("projectTargetColumnsOntoMicrobatch respects exclude column") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
Row(1, "alice", 30, Row(null, 10L))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
columnSelection = Some(
ColumnSelection.ExcludeColumns(
Seq(UnqualifiedColumnName("age"))
)
)
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

assert(
result.schema.fieldNames.toSeq ==
Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
)
checkAnswer(
df = result,
expectedAnswer = Row(1, "alice", Row(null, 10L))
)
}

test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
Row(1, "alice", 30, Row(null, 10L))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
// User specifies (age, id) -- intentionally different from the schema order (id, age).
columnSelection = Some(ColumnSelection.IncludeColumns(
Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
))
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

// Output column order follows the original microbatch schema (id before age), not the order
// in which the user listed columns in IncludeColumns. The CDC metadata column is appended
// last as always.
assert(result.schema.fieldNames.toSeq ==
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))

checkAnswer(
df = result,
expectedAnswer = Row(1, 30, Row(null, 10L))
)
}

test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " +
"literal dot") {
val schema = new StructType()
.add("id", IntegerType)
// Even if a column is created with backticks via DDL, those backticks are consumed by Spark
// before resolving the schema; they won't show up in the schema field.
.add("user.id", StringType)
.add(
Scd1BatchProcessor.cdcMetadataColName,
new StructType()
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))

val batch = microbatchOf(schema)(
Row(1, "u-100", Row(null, 10L))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
columnSelection = Some(
ColumnSelection.IncludeColumns(
Seq(
UnqualifiedColumnName("id"),
UnqualifiedColumnName("`user.id`")
)
)
)
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

assert(result.schema.fieldNames.toSeq ==
Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, "u-100", Row(null, 10L))
)
}

test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " +
"when SQLConf.CASE_SENSITIVE=false") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
Row(1, "alice", 30, Row(null, 10L))
)

val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
keys = Seq(UnqualifiedColumnName("id")),
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1,
// User columns intentionally use a different case than the schema (id, age).
columnSelection = Some(
ColumnSelection.IncludeColumns(
Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
)
)
),
resolvedSequencingType = LongType
)

val result = processor.projectTargetColumnsOntoMicrobatch(batch)

// Output column names follow the microbatch schema's casing, not the casing in the user's
// columnSelection. The CDC metadata column is appended last as always.
assert(result.schema.fieldNames.toSeq ==
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, 30, Row(null, 10L))
)
}
}
}