Skip to content

[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses#56042

Open
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56956-introduce-flow-data-classes
Open

[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses#56042
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56956-introduce-flow-data-classes

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 21, 2026

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


What changes were proposed in this pull request?

Introduce dataclass for unresolved AutoCDC flow (AutoCdcFlow) and resolved AutoCDC flow (AutoCdcMergeFlow). Add wiring to analyze an AutoCdcFlow to an AutoCdcMergeFlow.

A small refactor was additionally made on the UnresolvedFlow and ResolvedFlow class hierarchy.

Why are the changes needed?

Support AutoCDC flow registration and analysis. AutoCDC flow execution will be supported in a future PR. Previously, an UnresolvedFlow additionally always represented an untyped-flow; a flow where do not yet know its execution-type, i.e streaming, append-once, etc.

AutoCdcFlow is a specialized flow with support for only streaming flows, hence it represents a flow whose execution-type we know at construction. It is still unresolved at registration time, and needs to go through resolution to determine its position in the DAG and its input/output schemas.

Hence we introduce the intermediary child UntypedFlow for UnresolvedFlow, which all previous flows are classified as during registration. An AutoCdcFlow directly implements UnresolvedFlow (skipping `UntypedFlow in its inheritance chain) because it is not untyped.

Does this PR introduce any user-facing change?

No, the AutoCDC feature is not released anywhere yet.

How was this patch tested?

ConnectValidPipelineSuite and AutoCdcFlowSuite

Was this patch authored or co-authored using generative AI tooling?

Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

@AnishMahto
Copy link
Copy Markdown
Contributor Author

AnishMahto commented May 22, 2026

@szehon-ho

This is actually a fairly small change btw, 600 LOC is just tests. The only real logic added here is some validation on construction of an AutoCdcMergeFlow.

Comment on lines -116 to -134

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver

microbatch.schema.fieldNames
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
.foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
"columnName" -> conflictingColumnName,
"schemaName" -> "microbatch",
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
)
)
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic and corresponding test were removed because by construction its never possible now, given how AutoCdcMergeFlow validates requireReservedPrefixAbsentInSourceColumns.

@AnishMahto AnishMahto force-pushed the SPARK-56956-introduce-flow-data-classes branch from 5d4b9f9 to bc0c1d8 Compare May 22, 2026 20:52
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline comments below (streaming enforcement, SCD2 error type, typo, duplicate test, isAutoCdcFlow clarity). None blocking if streaming validation is intentionally deferred.

flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.

That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.

Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:

case acf: AutoCdcFlow =>
  if (!funcResult.dataFrame.get.isStreaming) {
    throw new AnalysisException(
      errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
      messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
    )
  }
  new AutoCdcMergeFlow(acf, funcResult)

Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a good question, but we don't actually need to do anything here because:

  1. It's not possible to create an MV with an AutoCDC flow input because (a) MV's must be defined with an inline flow function, (b) AutoCDC flows must be defined as standalone flows with a target, and (c) MVs are not allowed to have multiple input flows. That means if an AutoCDC flow targets an MV, the MV necessarily has at least 2 flows, and would be invalidated
  2. AutoCDC flows are unique in that users don't actually get to define their flow functions. SDP will define the AutoCDC flow function at flow registration time, and we will define it in such a way that forces it to be streaming by construction (i.e spark.read_stream(source))

Eventually we will support AutoCDC once flows which would indeed be batch flows, but that's not supported today - once = false always by construction.

As a middle ground though I'll also introduce a test demonstrating that AutoCDC flows cannot write to MVs in either the flow execution or flow registration PR.

Comment thread sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala Outdated
Comment thread sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala Outdated
@AnishMahto AnishMahto requested a review from szehon-ho May 22, 2026 22:44
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline nits below; all are non-blocking and can be tracked in the execution/registration follow-ups.

* columns that the AutoCDC MERGE engine projects onto the target table. Downstream
* dependencies in the pipeline see this augmented schema.
*/
override val schema: StructType = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AutoCdcMergeFlow overrides schema to the augmented target-facing schema (column selection + CDC metadata), but still inherits ResolvedFlow.load() which returns the raw CDF df. If any code path reads this flow as an Input and assumes load() matches schema, that could be surprising.

Probably fine if AutoCDC outputs are always materialized to tables first, but worth confirming when the execution PR wires this up — either override load() or document that the augmented schema is for inference/planning only. Non-blocking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great callout.

Since AutoCDC flows must write to a streaming table (MV, temp view, persisted view are all invalid targets), AutoCdcMergeFlow.load isn't actually ever called.

But the comment is right that without an override, the inherited AutoCdcMergeFlow.load implementation is incorrect. Regardless of whether its called at runtime or not today, we should make sure the contract is correct.

Added a meaningful override and also left a comment explaining this. I also added tests to demonstrate that AutoCDC flows cannot write to MV/persisted view/temp view.

As a side note, I think this is a great example of why good inheritance is difficult to get right and more often than not adds unnecessary coupling 😛.

@AnishMahto AnishMahto force-pushed the SPARK-56956-introduce-flow-data-classes branch from d8410ec to ccd031f Compare May 23, 2026 22:37
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — solid foundation PR for AutoCDC flow registration and analysis. The UnresolvedFlow / UntypedFlow split, validation at AutoCdcMergeFlow construction, and graph-level guardrails look good. Execution and registration wiring can follow in later PRs.

// Temporary views' flows are generally allowed to be either streaming or batch.
resolvedFlow match {
case _: AutoCdcMergeFlow =>
// The exception is AutoCDC flows, which require require a streaming-table sink to
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicated word — should be "which require a streaming-table sink" (drop the extra "require").

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants