Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,36 @@
],
"sqlState" : "22023"
},
"AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the AutoCDC key column `<keyColumnName>` is not present in the flow's selected source schema. AutoCDC requires every key column to be present in the source change-data feed and retained by any configured column selection."
],
"sqlState" : "22023"
},
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
],
"sqlState" : "42703"
},
"AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
"AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
"message" : [
"Invalid AutoCDC destination <tableName> with multiple flows: <flows>. An AutoCDC target table must have exactly one flow writing to it."
],
"sqlState" : "42000"
},
"AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
"Using <caseSensitivity> column name comparison, the column `<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC column name `<reservedColumnName>`. Rename or remove the column."
"The column `<columnName>` in the <schemaName> schema collides with the reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using <caseSensitivity> column name comparison). Rename or remove the column."
],
"sqlState" : "42710"
},
"AUTOCDC_SCD2_NOT_SUPPORTED" : {
"message" : [
"AutoCDC flows do not currently support SCD Type 2 transformations."
],
"sqlState" : "0A000"
},
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",
Expand Down Expand Up @@ -3687,6 +3705,11 @@
"Flow <flowIdentifier> returns an invalid relation type."
],
"subClass" : {
"AUTOCDC_RELATION_FOR_TEMPORARY_VIEW" : {
"message" : [
"AutoCDC flows must target a streaming table because their reconciliation semantics require a streaming-table sink, but the flow <flowIdentifier> attempts to write an AutoCDC relation to the temporary view <viewIdentifier>."
]
},
"BATCH_RELATION_FOR_STREAMING_TABLE" : {
"message" : [
"Streaming tables may only be defined by streaming relations, but the flow <flowIdentifier> attempts to write a batch relation to the streaming table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to convert the batch relation into a streaming relation, or populating the streaming table with an append once-flow instead."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UnresolvedFlow}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -371,7 +371,7 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS =>
val relationFlowDetails = flow.getRelationFlowDetails
graphElementRegistry.registerFlow(
UnresolvedFlow(
UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object ColumnSelection {
}

/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
private[autocdc] object CaseSensitivityLabels {
private[pipelines] object CaseSensitivityLabels {
val CaseSensitive: String = "case-sensitive"
val CaseInsensitive: String = "case-insensitive"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
Expand Down Expand Up @@ -89,9 +89,6 @@ case class Scd1BatchProcessor(
* column.
*/
def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = {
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
validateCdcMetadataColumnNotPresent(validatedMicrobatch)

val rowDeleteSequence: Column = changeArgs.deleteCondition match {
case Some(deleteCondition) =>
F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
Expand Down Expand Up @@ -211,31 +208,18 @@ case class Scd1BatchProcessor(
joinType = "left_anti"
)
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver

microbatch.schema.fieldNames
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
.foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
"columnName" -> conflictingColumnName,
"schemaName" -> "microbatch",
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
)
)
}
}
}

object Scd1BatchProcessor {
// Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing.
private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row"
private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
/**
* Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed
* dataframes must not contain any columns starting with this prefix; the invariant is
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction.
*/
private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"

private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata"

private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
Expand All @@ -251,7 +235,7 @@ object Scd1BatchProcessor {
/**
* Schema of the CDC metadata struct column for SCD1.
*/
private def cdcMetadataColSchema(sequencingType: DataType): StructType =
private[pipelines] def cdcMetadataColSchema(sequencingType: DataType): StructType =
StructType(
Seq(
// The sequencing of the event if it represents a delete, null otherwise.
Expand All @@ -265,7 +249,7 @@ object Scd1BatchProcessor {
* Construct the CDC metadata struct column for SCD1, following the exact schema and field
* ordering defined by [[cdcMetadataColSchema]].
*/
private[autocdc] def constructCdcMetadataCol(
private[pipelines] def constructCdcMetadataCol(
deleteSequence: Column,
upsertSequence: Column,
sequencingType: DataType): Column = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
} else {
f
}
convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
resolveFlow(flowToResolve, maybeNewFuncResult)

// If the flow failed due to an UnresolvedDatasetException, it means that one of the
// flow's inputs wasn't available. After other flows are resolved, these inputs
Expand All @@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
}
}

private def convertResolvedToTypedFlow(
private def resolveFlow(
flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.

That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.

Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:

case acf: AutoCdcFlow =>
  if (!funcResult.dataFrame.get.isStreaming) {
    throw new AnalysisException(
      errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
      messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
    )
  }
  new AutoCdcMergeFlow(acf, funcResult)

Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a good question, but we don't actually need to do anything here because:

  1. It's not possible to create an MV with an AutoCDC flow input because (a) MV's must be defined with an inline flow function, (b) AutoCDC flows must be defined as standalone flows with a target, and (c) MVs are not allowed to have multiple input flows. That means if an AutoCDC flow targets an MV, the MV necessarily has at least 2 flows, and would be invalidated
  2. AutoCDC flows are unique in that users don't actually get to define their flow functions. SDP will define the AutoCDC flow function at flow registration time, and we will define it in such a way that forces it to be streaming by construction (i.e spark.read_stream(source))

Eventually we will support AutoCDC once flows which would indeed be batch flows, but that's not supported today - once = false always by construction.

As a middle ground though I'll also introduce a test demonstrating that AutoCDC flows cannot write to MVs in either the flow execution or flow registration PR.

case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult)
}
}

private def transformUntypedFlowToResolvedFlow(
flow: UntypedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case _ if flow.once => new AppendOnceFlow(flow, funcResult)
case _ if funcResult.dataFrame.get.isStreaming =>
Expand All @@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
// then get their results overwritten.
val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1
new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
case _ => new CompleteFlow(flow, funcResult)
}
}
}
Loading