Skip to content

[SPARK-56838][SDP] Introduce AutoCDC parameters dataclass#55836

Open
AnishMahto wants to merge 5 commits into
apache:masterfrom
AnishMahto:SPARK-56838-introduce-ChangeArgs
Open

[SPARK-56838][SDP] Introduce AutoCDC parameters dataclass#55836
AnishMahto wants to merge 5 commits into
apache:masterfrom
AnishMahto:SPARK-56838-introduce-ChangeArgs

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 12, 2026

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


Introduce ChangeArgs as the dataclass that represents AutoCDC API parameters. In future PRs:

  1. ChangeArgs will be constructed, populated, and propagated by SDP SQL/Python flow registration API.
  2. ChangeArgs will be referenced by SCD1/SCD2 algorithm implementations, to respect user specified configurations.
  3. Advanced AutoCDC parameters (as per the SPIP) such as ignoreNull or trackHistoryColumns will be added and supported.

Additionally introduce ColumnSelection helper class, to encode the notion of user selecting a list of columns for inclusion/exclusion directly into a data type, rather than relying on implicit understanding of a raw string list.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-56838][SDP][AutoCDC] Introduce AutoCDC parameters dataclass [SPARK-56838][SDP] Introduce AutoCDC parameters dataclass May 13, 2026
sealed trait ScdType

object ScdType {
case object Type1 extends ScdType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we scaladoc type1, type2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponds to the industry standard definitions for SCD1/SCD2 so I won't document exactly what those algorithms entail, but added a basic scaladoc that Type1 <=> SCD1 and Type2 <=> SCD2.

// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
validateColumnsExistInSchema(columns = includeColumns, schema = schema)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also checking, if columns is empty, this creates an empty structtype, is it expected?

Copy link
Copy Markdown
Contributor Author

@AnishMahto AnishMahto May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's intentional - a None column selection is semantically different from Some(IncludeColumns(Seq())), and the latter means don't select any columns.

Of course its not super meaningful to select no columns, and will likely be validated against when tables get materialized.


sealed trait ColumnSelection
object ColumnSelection {
type ColumnList = Seq[String]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check, we do not handle nested?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, and intentional as per the SPIP.

That being said this made me realize I should be more explicit about the restriction. Added the UnqualifiedColumnName class to make this explicit and true by construction.

// A none column selection is interpreted as a no-op.
schema
case Some(IncludeColumns(includeColumns)) =>
validateColumnsExistInSchema(columns = includeColumns, schema = schema)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and would it be better to pass in the includeColumnSet , to make the validate easier?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't really matter IMO, validation here needs to specifically do a contains check on the schema columns not the other way around.

Regardless of whether we use set/seq for the includeColumns, we'd need to loop through all of them (even if we did a set bijection).

@AnishMahto AnishMahto requested a review from szehon-ho May 13, 2026 17:06
Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the latest commit d1a38e6. Most of the earlier comments from @szehon-ho appear to be addressed (Type1/Type2 scaladocs, the multipart-identifier restriction is now enforced by construction via UnqualifiedColumnName, and defaulted parameters now sit at the tail of ChangeArgs). A few additional observations below.

This is an automated review by Claude Code on behalf of @gengliangwang. Please treat suggestions as starting points for discussion rather than blocking requirements.

* no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and
* throws an [[AnalysisException]] if it does not resolve to exactly one name part.
*/
case class UnqualifiedColumnName(name: String) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible correctness gap with backtick-quoted names. CatalystSqlParser.parseMultipartIdentifier("a.b") returns Seq("a.b") (a single part, no backticks), so the constructor accepts the input — but name retains the raw string `a.b`. Downstream, ColumnSelection.applyToSchema matches name directly against schema.fieldNames, which contain the unquoted form a.b. So a user who writes UnqualifiedColumnName("a.b") to refer to the schema column literally named a.b will always hit COLUMNS_NOT_FOUND.

The existing test UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot only asserts that .name round-trips the raw input; it doesn't cover the include/exclude lookup, which would fail.

Suggest normalizing on construction:

case class UnqualifiedColumnName private (name: String)
object UnqualifiedColumnName {
  def apply(input: String): UnqualifiedColumnName = {
    val parts = CatalystSqlParser.parseMultipartIdentifier(input)
    if (parts.length != 1) throw multipartColumnIdentifierError(input, parts)
    new UnqualifiedColumnName(parts.head)
  }
}

This way name always equals the schema field name and the include/exclude lookup behaves as users expect.

* all columns.
*/
case class ChangeArgs(
keys: Seq[String],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type inconsistency: keys is Seq[String] while columnSelection uses Seq[UnqualifiedColumnName]. The SPIP semantics for keys are the same — they must be single, unqualified column references in the source schema. Using Seq[UnqualifiedColumnName] here would give you free construction-time validation and a uniform vocabulary across the dataclass. (If keys is intentionally left as Seq[String] for now and will be validated/normalized in a follow-up, please add a TODO so it isn't forgotten.)


/**
* Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema.
* Field names are matched exactly. Field order follows the original schema (filtered in place).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth calling out in the scaladoc that field matching is case-sensitive and exact (no support for spark.sql.caseSensitive=false). The test comments document this intent, but a user reading only the public API will likely expect the global config to apply. A one-liner like

// Matching is case-sensitive regardless of spark.sql.caseSensitive.

would be helpful. (Or, if matching it to the session config is desired, do it now before the type is depended on.)

storedAsScdType: ScdType,
deleteCondition: Option[Column] = None,
columnSelection: Option[ColumnSelection] = None
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ChangeArgs is a case class carrying two Column fields. Column does not have a content-based equals/hashCode (it compares by reference / underlying expression node identity), so two ChangeArgs built from equivalent but separately constructed Column instances will compare unequal. If any downstream code plans to use ChangeArgs as a Map key, deduplicate it, or assert equality in tests, this will be surprising. Either (a) document this caveat, or (b) override equals/hashCode to compare sequencing.expr.canonicalized / deleteCondition.map(_.expr.canonicalized).


private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = {
val schemaColumns = schema.fieldNames.toSet
val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: validateColumnsExistInSchema is called from both IncludeColumns and ExcludeColumns branches, and applyToSchema then immediately reconstructs an essentially equivalent set on the caller side. You could combine these into one helper that returns the filtered fields and the validated set in one pass, or simply have validateColumnsExistInSchema return the set so applyToSchema doesn't recompute it. Not a blocker.


test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") {
// Backticks make the dot part of a single name part, so this passes validation.
assert(UnqualifiedColumnName("`a.b`").name == "`a.b`")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: add a positive test exercising ColumnSelection.applyToSchema with UnqualifiedColumnName("a.b") against a StructType that contains a field literally named a.b. As written this would today raise COLUMNS_NOT_FOUND (see comment on UnqualifiedColumnName), which is probably not the intended behavior. A round-trip test would have caught it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants