Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import java.util.Objects;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* Encapsulates the parameters of a Change Data Capture (CDC) query, passed from the
* parser / DataFrame API to the catalog's
* {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method.
* {@link TableCatalog#loadChangelog(Identifier, ChangelogContext, CaseInsensitiveStringMap)}
* method.
*
* @since 4.2.0
*/
@Evolving
public class ChangelogInfo {
Copy link
Copy Markdown
Contributor Author

@aokolnychyi aokolnychyi May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this given that we will have to pass options to loadTable as well. Right now, we already have TableInfo that we use for CREATE table cases. We will NOT be able to use TableInfo for loading tables. So ChangelogInfo conflicts with TableInfo as it is used for loading, not creation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative name to consider is ChangelogParameters but context seems a better fit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

@aokolnychyi aokolnychyi May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal would be:

loadTable(ident, tableContext (timeTravelSpec, writePrivileges), options)
 - by default delegates to existing loadTable() methods
loadChangelog(ident, changelogContext (range, deduplicationMode, etc), options)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We concluded that passing all options to load methods for tables and changelog is a requirement for external connectors like Delta and Iceberg.

public class ChangelogContext {

/**
* Deduplication modes controlling how Spark post-processes raw change data.
Expand All @@ -47,7 +49,7 @@ public enum DeduplicationMode {
private final DeduplicationMode deduplicationMode;
private final boolean computeUpdates;

public ChangelogInfo(
public ChangelogContext(
ChangelogRange range,
DeduplicationMode deduplicationMode,
boolean computeUpdates) {
Expand All @@ -68,7 +70,7 @@ public ChangelogInfo(
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ChangelogInfo that)) return false;
if (!(o instanceof ChangelogContext that)) return false;
return computeUpdates == that.computeUpdates
&& Objects.equals(range, that.range)
&& deduplicationMode == that.deduplicationMode;
Expand All @@ -81,7 +83,7 @@ public int hashCode() {

@Override
public String toString() {
return "ChangelogInfo{range=" + range +
return "ChangelogContext{range=" + range +
", deduplicationMode=" + deduplicationMode +
", computeUpdates=" + computeUpdates + "}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.sql.errors.QueryCompilationErrors;
import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -195,22 +196,25 @@ default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExce

/**
* Load a {@link Changelog} for the given table, representing the row-level changes within the
* range specified by {@code changelogInfo}.
* range specified by {@code context}.
* <p>
* The default implementation throws an analysis exception indicating that the catalog does
* not support CDC. Catalogs that support CDC must override this method.
*
* @param ident a table identifier
* @param changelogInfo the CDC query parameters (range, deduplication mode, etc.)
* @param context the CDC query context (range, deduplication mode, etc.)
* @param options all options passed to the changelog query, including the CDC-recognized
* keys (range, deduplication mode, etc.) that are also parsed into {@code context}
* @return a Changelog instance for the requested table and range
* @throws NoSuchTableException If the table doesn't exist
*
* @since 4.2.0
*/
default Changelog loadChangelog(Identifier ident, ChangelogInfo changelogInfo)
throws NoSuchTableException {
throw new UnsupportedOperationException(
name() + " does not support Change Data Capture (CDC)");
default Changelog loadChangelog(
Identifier ident,
ChangelogContext context,
CaseInsensitiveStringMap options) throws NoSuchTableException {
throw new UnsupportedOperationException(name() + " does not support Change Data Capture (CDC)");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,8 +1135,8 @@ class Analyzer(
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)
resolveRelation(u, timeTravelSpec).getOrElse(r)

case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) =>
relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r)
case r @ RelationChanges(u: UnresolvedRelation, ctx) =>
relationResolution.resolveChangelog(u, ctx).getOrElse(r)

case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import java.lang.{Long => JLong}
import java.util.{Locale, Optional => JOptional}

import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.connector.catalog.ChangelogInfo
import org.apache.spark.sql.connector.catalog.ChangelogContext
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Utility methods for constructing [[ChangelogInfo]] from DataFrame API options.
* Utility methods for constructing [[ChangelogContext]] from DataFrame API options.
*/
object ChangelogInfoUtils {
object ChangelogContextUtils {

private val STARTING_VERSION = "startingVersion"
private val ENDING_VERSION = "endingVersion"
Expand All @@ -42,12 +42,12 @@ object ChangelogInfoUtils {
private val COMPUTE_UPDATES = "computeUpdates"

/**
* Build a [[ChangelogInfo]] from the options specified via `.option()` calls on
* Build a [[ChangelogContext]] from the options specified via `.option()` calls on
* `DataFrameReader` or `DataStreamReader`.
*/
def fromOptions(
options: CaseInsensitiveStringMap,
sessionLocalTimeZone: String): ChangelogInfo = {
sessionLocalTimeZone: String): ChangelogContext = {
val startVersion = Option(options.get(STARTING_VERSION))
val endVersion = Option(options.get(ENDING_VERSION))
val startTimestamp = Option(options.get(STARTING_TIMESTAMP))
Expand All @@ -59,9 +59,9 @@ object ChangelogInfoUtils {
val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE))
.getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
val deduplicationMode = deduplicationModeStr match {
case "none" => ChangelogInfo.DeduplicationMode.NONE
case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
case "none" => ChangelogContext.DeduplicationMode.NONE
case "dropcarryovers" => ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
case other =>
throw QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
}
Expand Down Expand Up @@ -98,7 +98,7 @@ object ChangelogInfoUtils {
new UnboundedRange()
}

new ChangelogInfo(range, deduplicationMode, computeUpdates)
new ChangelogContext(range, deduplicationMode, computeUpdates)
}

private def parseTimestamp(timestampStr: String, sessionLocalTimeZone: String): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_CHANGES, TreePattern}
import org.apache.spark.sql.connector.catalog.ChangelogInfo
import org.apache.spark.sql.connector.catalog.ChangelogContext

/**
* A logical node used to query Change Data Capture (CDC) changes for a table relation.
Expand All @@ -33,10 +33,10 @@ import org.apache.spark.sql.connector.catalog.ChangelogInfo
* [[UnresolvedLeafNode]]). Tree traversals like `transformUp` will not visit `relation`.
*
* @param relation the table relation (typically an [[UnresolvedRelation]])
* @param changelogInfo the CDC query parameters (range, deduplication mode, etc.)
* @param changelogContext the CDC query context (range, deduplication mode, etc.)
*/
case class RelationChanges(
relation: LogicalPlan,
changelogInfo: ChangelogInfo) extends UnresolvedLeafNode {
changelogContext: ChangelogContext) extends UnresolvedLeafNode {
override val nodePatterns: Seq[TreePattern] = Seq(RELATION_CHANGES)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.{
CatalogManager,
CatalogPlugin,
CatalogV2Util,
ChangelogInfo,
ChangelogContext,
Identifier,
LookupCatalog,
MetadataTable,
Expand Down Expand Up @@ -330,19 +330,17 @@ class RelationResolution(
* Resolve a CDC (CHANGES) query: look up the catalog, call loadChangelog(), wrap in
* ChangelogTable, and return a DataSourceV2Relation.
*/
def resolveChangelog(
u: UnresolvedRelation,
changelogInfo: ChangelogInfo): Option[LogicalPlan] = {
def resolveChangelog(u: UnresolvedRelation, ctx: ChangelogContext): Option[LogicalPlan] = {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val tableCatalog = catalog.asTableCatalog
val changelog = try {
tableCatalog.loadChangelog(ident, changelogInfo)
tableCatalog.loadChangelog(ident, ctx, u.options)
} catch {
case _: UnsupportedOperationException =>
throw QueryCompilationErrors.cdcNotSupportedError(tableCatalog.name())
}
val changelogTable = ChangelogTable(changelog, changelogInfo)
val changelogTable = ChangelogTable(changelog, ctx)
val relation = if (u.isStreaming) {
StreamingRelationV2(
None, changelogTable.name, changelogTable, u.options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo}
import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation}
import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor}
Expand Down Expand Up @@ -123,7 +123,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if !table.resolved =>
val changelog = table.changelog
val req = evaluateRequirements(changelog, table.changelogInfo)
val req = evaluateRequirements(changelog, table.changelogContext)

val resolvedRel = rel.copy(table = table.copy(resolved = true))
var updatedRel: LogicalPlan = resolvedRel
Expand All @@ -140,14 +140,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel)
updatedRel = injectNetChangeComputation(
updatedRel, rowIdExprs, table.changelogInfo.computeUpdates())
updatedRel, rowIdExprs, table.changelogContext.computeUpdates())
}
updatedRel

case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _)
if !table.resolved =>
val changelog = table.changelog
val req = evaluateRequirements(changelog, table.changelogInfo)
val req = evaluateRequirements(changelog, table.changelogContext)
val resolvedRel = rel.copy(table = table.copy(resolved = true))
var updatedRel: LogicalPlan = resolvedRel
if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) {
Expand All @@ -164,7 +164,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
// output, so name-based resolution against `updatedRel` recovers the right
// attributes regardless of any preceding wrapping.
updatedRel = addStreamingNetChangeComputation(
updatedRel, changelog, table.changelogInfo.computeUpdates())
updatedRel, changelog, table.changelogContext.computeUpdates())
}
updatedRel
}
Expand All @@ -175,7 +175,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {

/**
* Captures which post-processing passes a CDC query requires, derived from the
* user-provided [[ChangelogInfo]] options and the connector-declared [[Changelog]]
* user-provided [[ChangelogContext]] options and the connector-declared [[Changelog]]
* capability flags.
*/
private case class PostProcessingRequirements(
Expand All @@ -194,22 +194,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
*/
private def evaluateRequirements(
changelog: Changelog,
options: ChangelogInfo): PostProcessingRequirements = {
options: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES &&
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()

// If carry-overs are surfaced and update detection is enabled without carry-over
// removal, carry-overs would be falsely classified as updates, leading to wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) {
throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval(
changelog.name())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogContext, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
Expand Down Expand Up @@ -2649,17 +2649,17 @@ class AstBuilder extends DataTypeAstBuilder
withOrigin(ctx) {
val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause))
val options = resolveOptions(Option(ctx.optionsClause))
val changelogInfo = buildChangelogInfo(ctx.changesClause, options)
val result = RelationChanges(relation, changelogInfo)
val changelogContext = buildChangelogContext(ctx.changesClause, options)
val result = RelationChanges(relation, changelogContext)
mayApplyAliasPlan(ctx.tableAlias, result)
}

/**
* Build a [[ChangelogInfo]] from a batch changesClause context and optional WITH options.
* Build a [[ChangelogContext]] from a batch changesClause context and optional WITH options.
*/
private def buildChangelogInfo(
private def buildChangelogContext(
ctx: ChangesClauseContext,
options: CaseInsensitiveStringMap): ChangelogInfo = {
options: CaseInsensitiveStringMap): ChangelogContext = {
val startExclusive = ctx.startExclusive != null
val endExclusive = ctx.endExclusive != null
val startInclusive = !startExclusive
Expand Down Expand Up @@ -2704,16 +2704,16 @@ class AstBuilder extends DataTypeAstBuilder
}

val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
new ChangelogInfo(range, deduplicationMode, computeUpdates)
new ChangelogContext(range, deduplicationMode, computeUpdates)
}

/**
* Build a [[ChangelogInfo]] from a streaming streamChangesClause context and optional
* Build a [[ChangelogContext]] from a streaming streamChangesClause context and optional
* WITH options.
*/
private def buildStreamChangelogInfo(
private def buildStreamChangelogContext(
ctx: StreamChangesClauseContext,
options: CaseInsensitiveStringMap): ChangelogInfo = {
options: CaseInsensitiveStringMap): ChangelogContext = {
val startExclusive = ctx.startExclusive != null
val startInclusive = !startExclusive

Expand Down Expand Up @@ -2744,22 +2744,21 @@ class AstBuilder extends DataTypeAstBuilder
}

val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
new ChangelogInfo(range, deduplicationMode, computeUpdates)
new ChangelogContext(range, deduplicationMode, computeUpdates)
}

/**
* Extract deduplicationMode and computeUpdates from WITH options for CDC queries.
* Defaults: DROP_CARRYOVERS for deduplicationMode, false for computeUpdates.
*/
private def resolveChangelogOptions(
options: CaseInsensitiveStringMap)
: (ChangelogInfo.DeduplicationMode, Boolean) = {
options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode, Boolean) = {
val deduplicationModeStr = Option(options.get("deduplicationMode"))
.getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
val deduplicationMode = deduplicationModeStr match {
case "none" => ChangelogInfo.DeduplicationMode.NONE
case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
case "none" => ChangelogContext.DeduplicationMode.NONE
case "dropcarryovers" => ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
case other =>
throw QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
}
Expand Down Expand Up @@ -2925,8 +2924,8 @@ class AstBuilder extends DataTypeAstBuilder
case Some(changesCtx) =>
// Streaming CDC: wrap in RelationChanges and NamedStreamingRelation
val options = resolveOptions(Option(ctx.optionsClause))
val changelogInfo = buildStreamChangelogInfo(changesCtx, options)
val result = RelationChanges(relation, changelogInfo)
val changelogContext = buildStreamChangelogContext(changesCtx, options)
val result = RelationChanges(relation, changelogContext)
val table = mayApplyAliasPlan(ctx.tableAlias, result)
val tableWithWatermark = table.optionalMap(ctx.watermarkClause)(withWatermark)
val sourceNameOpt = extractSourceName(ctx.identifiedByClause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import java.util.{EnumSet => JEnumSet, Set => JSet}

import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext, Column, SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
case class ChangelogTable(
changelog: Changelog,
changelogInfo: ChangelogInfo,
changelogContext: ChangelogContext,
resolved: Boolean = false) extends Table with SupportsRead {

// Validate that the connector returned a schema with the required CDC metadata columns
Expand Down
Loading