Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@
],
"sqlState" : "0A000"
},
"AUTOCDC_INVALID_COLUMN_SELECTION" : {
"message" : [
"Invalid column selection."
],
"subClass" : {
"COLUMNS_NOT_FOUND" : {
"message" : [
"The following columns are not present in the schema: <missingColumns>. Available columns: <availableColumns>."
]
},
"MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Column selection entries must be a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
]
}
},
"sqlState" : "42703"
},
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.sql.{AnalysisException, Column}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types.StructType

/**
* A column reference that must be a single, unqualified identifier (no nested field path and
* no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and
* throws an [[AnalysisException]] if it does not resolve to exactly one name part.
*/
case class UnqualifiedColumnName(name: String) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible correctness gap with backtick-quoted names. CatalystSqlParser.parseMultipartIdentifier("a.b") returns Seq("a.b") (a single part, no backticks), so the constructor accepts the input — but name retains the raw string `a.b`. Downstream, ColumnSelection.applyToSchema matches name directly against schema.fieldNames, which contain the unquoted form a.b. So a user who writes UnqualifiedColumnName("a.b") to refer to the schema column literally named a.b will always hit COLUMNS_NOT_FOUND.

The existing test UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot only asserts that .name round-trips the raw input; it doesn't cover the include/exclude lookup, which would fail.

Suggest normalizing on construction:

case class UnqualifiedColumnName private (name: String)
object UnqualifiedColumnName {
  def apply(input: String): UnqualifiedColumnName = {
    val parts = CatalystSqlParser.parseMultipartIdentifier(input)
    if (parts.length != 1) throw multipartColumnIdentifierError(input, parts)
    new UnqualifiedColumnName(parts.head)
  }
}

This way name always equals the schema field name and the include/exclude lookup behaves as users expect.

UnqualifiedColumnName.validate(name)
}

object UnqualifiedColumnName {
private def validate(columnName: String): Unit = {
val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName)
if (nameParts.length != 1) {
throw multipartColumnIdentifierError(columnName, nameParts)
}
}

private def multipartColumnIdentifierError(
columnName: String,
nameParts: Seq[String]
): AnalysisException =
new AnalysisException(
errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
messageParameters = Map(
"columnName" -> columnName,
"nameParts" -> nameParts.mkString(", ")
)
)
}

sealed trait ColumnSelection
object ColumnSelection {
type ColumnList = Seq[UnqualifiedColumnName]

case class IncludeColumns(columns: ColumnList) extends ColumnSelection
case class ExcludeColumns(columns: ColumnList) extends ColumnSelection

/**
* Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema.
* Field names are matched exactly. Field order follows the original schema (filtered in place).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth calling out in the scaladoc that field matching is case-sensitive and exact (no support for spark.sql.caseSensitive=false). The test comments document this intent, but a user reading only the public API will likely expect the global config to apply. A one-liner like

// Matching is case-sensitive regardless of spark.sql.caseSensitive.

would be helpful. (Or, if matching it to the session config is desired, do it now before the type is depended on.)

*/
def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType =
columnSelection match {
case None =>
// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
Comment thread
AnishMahto marked this conversation as resolved.
validateColumnsExistInSchema(includeColumns, schema)

val includeColumnSet = includeColumns.map(_.name).toSet
StructType(schema.fields.filter(f => includeColumnSet.contains(f.name)))
case Some(ExcludeColumns(excludeColumns)) =>
validateColumnsExistInSchema(excludeColumns, schema)

val excludeColumnSet = excludeColumns.map(_.name).toSet
StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name)))
}

private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = {
val schemaColumns = schema.fieldNames.toSet
val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: validateColumnsExistInSchema is called from both IncludeColumns and ExcludeColumns branches, and applyToSchema then immediately reconstructs an essentially equivalent set on the caller side. You could combine these into one helper that returns the filtered fields and the validated set in one pass, or simply have validateColumnsExistInSchema return the set so applyToSchema doesn't recompute it. Not a blocker.

if (missingColumns.nonEmpty) {
throw new AnalysisException(
errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
messageParameters = Map(
"missingColumns" -> missingColumns.mkString(", "),
"availableColumns" -> schema.fieldNames.mkString(", ")
))
}
}
}

/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
sealed trait ScdType

object ScdType {
/** Representation for the standard SCD1 strategy. */
case object Type1 extends ScdType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we scaladoc type1, type2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponds to the industry standard definitions for SCD1/SCD2 so I won't document exactly what those algorithms entail, but added a basic scaladoc that Type1 <=> SCD1 and Type2 <=> SCD2.

/** Representation for the standard SCD2 strategy. */
case object Type2 extends ScdType
}

/**
* Configuration for an AutoCDC flow.
*
* @param keys The column(s) that uniquely identify a row in the source data.
* @param sequencing Expression ordering CDC events to correctly resolve out-of-order
* arrivals. Must be a sortable type.
* @param deleteCondition Expression that marks a source row as a DELETE. When None, all
* rows are treated as upserts.
* @param storedAsScdType The SCD strategy these args should be applied to.
* @param columnSelection Which source columns to select in the target table. None means
* all columns.
*/
case class ChangeArgs(
Comment thread
AnishMahto marked this conversation as resolved.
keys: Seq[String],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type inconsistency: keys is Seq[String] while columnSelection uses Seq[UnqualifiedColumnName]. The SPIP semantics for keys are the same — they must be single, unqualified column references in the source schema. Using Seq[UnqualifiedColumnName] here would give you free construction-time validation and a uniform vocabulary across the dataclass. (If keys is intentionally left as Seq[String] for now and will be validated/normalized in a follow-up, please add a TODO so it isn't forgotten.)

sequencing: Column,
storedAsScdType: ScdType,
deleteCondition: Option[Column] = None,
columnSelection: Option[ColumnSelection] = None
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ChangeArgs is a case class carrying two Column fields. Column does not have a content-based equals/hashCode (it compares by reference / underlying expression node identity), so two ChangeArgs built from equivalent but separately constructed Column instances will compare unequal. If any downstream code plans to use ChangeArgs as a Map key, deduplicate it, or assert equality in tests, this will be surprising. Either (a) document this caveat, or (b) override equals/hashCode to compare sequencing.expr.canonicalized / deleteCondition.map(_.expr.canonicalized).

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

class ChangeArgsSuite extends SparkFunSuite {

private val sourceSchema = new StructType()
.add("id", IntegerType, nullable = false)
.add("Name", StringType)
.add("age", IntegerType)

test("ColumnSelection None leaves schema unchanged") {
assert(ColumnSelection.applyToSchema(sourceSchema, None) == sourceSchema)
}

test("ColumnSelection IncludeColumns filters by exact name in schema order") {
val filteredSchema = ColumnSelection.applyToSchema(
sourceSchema,
Some(ColumnSelection.IncludeColumns(
Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")))))

assert(filteredSchema == new StructType()
.add("Name", StringType)
.add("age", IntegerType))
}

test("ColumnSelection ExcludeColumns filters by exact name") {
val filteredSchema = ColumnSelection.applyToSchema(
sourceSchema,
Some(ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))))

assert(filteredSchema == new StructType()
.add("Name", StringType)
.add("age", IntegerType))
}

test("ColumnSelection IncludeColumns fails for columns not present in schema") {
checkError(
exception = intercept[AnalysisException] {
ColumnSelection.applyToSchema(
sourceSchema,
// Column inclusion is case-sensitive; "name" will not match against "Name".
Some(ColumnSelection.IncludeColumns(
Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing"))))
)
},
condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
sqlState = "42703",
parameters = Map(
"missingColumns" -> "name, missing",
"availableColumns" -> "id, Name, age"
)
)
}

test("ColumnSelection ExcludeColumns fails for columns not present in schema") {
checkError(
exception = intercept[AnalysisException] {
ColumnSelection.applyToSchema(
sourceSchema,
// Column exclusion is case-sensitive; "NAME" will not match against "Name".
Some(ColumnSelection.ExcludeColumns(
Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing"))))
)
},
condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
sqlState = "42703",
parameters = Map(
"missingColumns" -> "NAME, missing",
"availableColumns" -> "id, Name, age"
)
)
}

test("UnqualifiedColumnName accepts a simple single-part identifier") {
assert(UnqualifiedColumnName("col").name == "col")
}

test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") {
// Backticks make the dot part of a single name part, so this passes validation.
assert(UnqualifiedColumnName("`a.b`").name == "`a.b`")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: add a positive test exercising ColumnSelection.applyToSchema with UnqualifiedColumnName("a.b") against a StructType that contains a field literally named a.b. As written this would today raise COLUMNS_NOT_FOUND (see comment on UnqualifiedColumnName), which is probably not the intended behavior. A round-trip test would have caught it.

}

test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") {
checkError(
exception = intercept[AnalysisException] {
UnqualifiedColumnName("a.b")
},
condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
sqlState = "42703",
parameters = Map(
"columnName" -> "a.b",
"nameParts" -> "a, b"
)
)
}

test("UnqualifiedColumnName rejects a qualified column reference") {
checkError(
exception = intercept[AnalysisException] {
UnqualifiedColumnName("src.x")
},
condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
sqlState = "42703",
parameters = Map(
"columnName" -> "src.x",
"nameParts" -> "src, x"
)
)
}

test("UnqualifiedColumnName rejects an identifier with three or more parts") {
checkError(
exception = intercept[AnalysisException] {
UnqualifiedColumnName("a.b.c")
},
condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
sqlState = "42703",
parameters = Map(
"columnName" -> "a.b.c",
"nameParts" -> "a, b, c"
)
)
}

test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") {
checkError(
exception = intercept[ParseException] {
UnqualifiedColumnName("")
},
condition = "PARSE_EMPTY_STATEMENT",
sqlState = Some("42617")
)
}
}