Skip to content

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56069

Open
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56651-autocdc-python-api
Open

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56069
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56651-autocdc-python-api

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 22, 2026

Takeover of #56045. PR description is copied.

What changes were proposed in this pull request?

Adds create_auto_cdc_flow to the the SDP Python API. For now, this will only support SCD Type 1. Parameters:

  • name: the name of the flow
  • target: the target table
  • source: the source dataset with the change events
  • keys: the unique key per row,
  • sequence_by: a sequence id to establish time order
  • apply_as_deletes: a boolean expression indicating whether an event represents a delete
  • apply_as_truncates: a boolean expression indicating whether an event represents a truncation
  • column_list: a list of columns to include in the target table
  • except_column_list: a list of columns to exclude from the target table
  • stored_as_scd_type the SCD type, must be 1
  • ignore_null_updates_column_list: a list of columns for which to ignore null values
  • ignore_null_updates_except_column_list: a list of columns for which not to ignore null values
  • source_code_location: the location in the Python source code that defines this flow

This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos.

This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error.

Why are the changes needed?

See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/

Does this PR introduce any user-facing change?

Yes, it introduces a new method in the SDP Python API.

How was this patch tested?

Unit tests were added, using a local graph registry.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

anew and others added 5 commits May 21, 2026 18:12
- Remove spaces around = in keyword arguments (PEP 8)
- Fix type hint: List[Union[str, Column]] -> Union[List[str], List[Column]]
- Reorder imports and collapse unnecessary line continuations

Co-authored-by: Isaac
- Move inline imports to module level
- Fix assertNone -> assertIsNone
- Fix assertEqual(stored_as_scd_type, "1") -> assertIsNone for default case
- Add missing assertions for optional fields in test_create_auto_cdc_flow

Co-authored-by: Isaac
- Drop ignore_null_updates_column_list / ignore_null_updates_except_column_list
  from create_auto_cdc_flow until execution support lands; the proto fields
  remain so the server-side wiring is unchanged.
- Add a test exercising string-form arguments (keys=["id"], sequence_by="ts",
  apply_as_deletes=str, apply_as_truncates=str, column_list=[str]) to verify
  they normalize to PySpark Columns.
- Fix Pyspark -> PySpark casing in create_auto_cdc_flow docstring.
- Restore the prior import order in spark_connect_graph_element_registry.py
  so the diff vs master is limited to the substantive symbol additions.
Per AnishMahto's heads-up on apache#56045, apply_as_truncates is
unlikely to land for the 4.2 cut. Following the same principle applied to
ignore_null_updates_*, we drop the parameter from the user-facing Python API
now and re-add it once execution support is in. The proto field stays so the
server-side wiring is untouched.
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking over #56045 and scoping the API sensibly (dropping truncates / ignore-null updates until execution exists).

A few inline comments below — mainly around default flow_name on the Connect path, client-side validation consistency with other SDP APIs, and test/doc nits.

Note (not inline): PipelinesHandler still throws UnsupportedOperationException for AutoCdcFlowDetails, so Connect definition will fail until server support lands. Worth calling that out in the PR description so users know this PR is Python API + proto wiring only.

Comment thread python/pyspark/pipelines/spark_connect_graph_element_registry.py Outdated
Comment thread python/pyspark/pipelines/api.py Outdated
:param name: The name of the flow for this create_auto_cdc_flow command. When unspecified this \
will build a "default flow" with name equal to the target name.
"""
keys = _normalize_column_list(keys)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other SDP APIs validate inputs up front (create_streaming_table checks type(name) is not str, etc.). Consider similar checks here before building AutoCdcFlow:

  • target / source: str (same pattern as name in create_streaming_table)
  • keys: non-empty list; reject mixed [str, Column] if you want to match Union[List[str], List[Column]]
  • column_list vs except_column_list: error if both are set (doc already says only one is allowed)
  • _normalize_column_list: reject elements that are neither str nor Column with PySparkTypeError instead of passing them through

Happy to keep this minimal for the first cut, but matching existing SDP validation would give clearer errors at definition time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In compliance with other SDP APIs, I added type checks. These actually make sense to do at the Python API layer, since Python specifically does not provide strong static typing.

Leaving logical validations for the Spark driver/pipelines handler though, since these validations are client language independent (ex. if we support any other language clients in the future, the validation should be the same).

Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/api.py
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
@AnishMahto AnishMahto changed the title Spark 56651 autocdc python api [SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1 May 24, 2026
@AnishMahto AnishMahto requested a review from szehon-ho May 24, 2026 00:11
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the earlier feedback — default flow name at the API layer, type checks, and the expanded test coverage all look good.

A few minor nits inline (trailing whitespace, doc example indent, test comment typo, and two small test gaps). None are blocking from my side.

Scope note: this PR is Python API + Connect proto wiring only; PipelinesHandler still throws for AutoCdcFlowDetails, so end-to-end Connect definition will fail until server support lands. That's fine for this PR.

LGTM.

Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py Outdated
Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
@AnishMahto
Copy link
Copy Markdown
Contributor Author

I don't know if this is intentional or not, but it looks like the docs build environment in CI doesn't have all of the transitive dependencies required by pyspark connect packages. Ex. see failure here due to pyspark.sql.connect.functions.builtin import which requires zstandard >= 0.25.0 to be installed: https://github.com/AnishMahto/spark/actions/runs/26351167405/job/77569752098.

In any case as a workaround I will lazily import pyspark.sql.connect.functions.builtin so that docs build is unaffected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants