[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses#56042
Conversation
02f656a to
6df7b64
Compare
|
This is actually a fairly small change btw, 600 LOC is just tests. The only real logic added here is some validation on construction of an |
|
|
||
| private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { | ||
| val microbatchSqlConf = microbatch.sparkSession.sessionState.conf | ||
| val resolver = microbatchSqlConf.resolver | ||
|
|
||
| microbatch.schema.fieldNames | ||
| .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) | ||
| .foreach { conflictingColumnName => | ||
| throw new AnalysisException( | ||
| errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", | ||
| messageParameters = Map( | ||
| "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), | ||
| "columnName" -> conflictingColumnName, | ||
| "schemaName" -> "microbatch", | ||
| "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName | ||
| ) | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
This logic and corresponding test were removed because by construction its never possible now, given how AutoCdcMergeFlow validates requireReservedPrefixAbsentInSourceColumns.
5d4b9f9 to
bc0c1d8
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline comments below (streaming enforcement, SCD2 error type, typo, duplicate test, isAutoCdcFlow clarity). None blocking if streaming validation is intentionally deferred.
| flow: UnresolvedFlow, | ||
| funcResult: FlowFunctionResult): ResolvedFlow = { | ||
| flow match { | ||
| case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult) |
There was a problem hiding this comment.
UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.
That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.
Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:
case acf: AutoCdcFlow =>
if (!funcResult.dataFrame.get.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
)
}
new AutoCdcMergeFlow(acf, funcResult)Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.
There was a problem hiding this comment.
So this is a good question, but we don't actually need to do anything here because:
- It's not possible to create an MV with an AutoCDC flow input because (a) MV's must be defined with an inline flow function, (b) AutoCDC flows must be defined as standalone flows with a target, and (c) MVs are not allowed to have multiple input flows. That means if an AutoCDC flow targets an MV, the MV necessarily has at least 2 flows, and would be invalidated
- AutoCDC flows are unique in that users don't actually get to define their flow functions. SDP will define the AutoCDC flow function at flow registration time, and we will define it in such a way that forces it to be streaming by construction (i.e
spark.read_stream(source))
Eventually we will support AutoCDC once flows which would indeed be batch flows, but that's not supported today - once = false always by construction.
As a middle ground though I'll also introduce a test demonstrating that AutoCDC flows cannot write to MVs in either the flow execution or flow registration PR.
szehon-ho
left a comment
There was a problem hiding this comment.
Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline nits below; all are non-blocking and can be tracked in the execution/registration follow-ups.
| * columns that the AutoCDC MERGE engine projects onto the target table. Downstream | ||
| * dependencies in the pipeline see this augmented schema. | ||
| */ | ||
| override val schema: StructType = { |
There was a problem hiding this comment.
AutoCdcMergeFlow overrides schema to the augmented target-facing schema (column selection + CDC metadata), but still inherits ResolvedFlow.load() which returns the raw CDF df. If any code path reads this flow as an Input and assumes load() matches schema, that could be surprising.
Probably fine if AutoCDC outputs are always materialized to tables first, but worth confirming when the execution PR wires this up — either override load() or document that the augmented schema is for inference/planning only. Non-blocking.
There was a problem hiding this comment.
This is a great callout.
Since AutoCDC flows must write to a streaming table (MV, temp view, persisted view are all invalid targets), AutoCdcMergeFlow.load isn't actually ever called.
But the comment is right that without an override, the inherited AutoCdcMergeFlow.load implementation is incorrect. Regardless of whether its called at runtime or not today, we should make sure the contract is correct.
Added a meaningful override and also left a comment explaining this. I also added tests to demonstrate that AutoCDC flows cannot write to MV/persisted view/temp view.
As a side note, I think this is a great example of why good inheritance is difficult to get right and more often than not adds unnecessary coupling 😛.
d8410ec to
ccd031f
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
LGTM — solid foundation PR for AutoCDC flow registration and analysis. The UnresolvedFlow / UntypedFlow split, validation at AutoCdcMergeFlow construction, and graph-level guardrails look good. Execution and registration wiring can follow in later PRs.
| // Temporary views' flows are generally allowed to be either streaming or batch. | ||
| resolvedFlow match { | ||
| case _: AutoCdcMergeFlow => | ||
| // The exception is AutoCDC flows, which require require a streaming-table sink to |
There was a problem hiding this comment.
nit: duplicated word — should be "which require a streaming-table sink" (drop the extra "require").
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
What changes were proposed in this pull request?
Introduce dataclass for unresolved AutoCDC flow (
AutoCdcFlow) and resolved AutoCDC flow (AutoCdcMergeFlow). Add wiring to analyze anAutoCdcFlowto anAutoCdcMergeFlow.A small refactor was additionally made on the
UnresolvedFlowandResolvedFlowclass hierarchy.Why are the changes needed?
Support AutoCDC flow registration and analysis. AutoCDC flow execution will be supported in a future PR. Previously, an
UnresolvedFlowadditionally always represented an untyped-flow; a flow where do not yet know its execution-type, i.e streaming, append-once, etc.AutoCdcFlowis a specialized flow with support for only streaming flows, hence it represents a flow whose execution-type we know at construction. It is still unresolved at registration time, and needs to go through resolution to determine its position in the DAG and its input/output schemas.Hence we introduce the intermediary child
UntypedFlowforUnresolvedFlow, which all previous flows are classified as during registration. AnAutoCdcFlowdirectly implementsUnresolvedFlow(skipping `UntypedFlow in its inheritance chain) because it is not untyped.Does this PR introduce any user-facing change?
No, the AutoCDC feature is not released anywhere yet.
How was this patch tested?
ConnectValidPipelineSuiteandAutoCdcFlowSuiteWas this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh