Skip to content

[FLINK-38828][cdc-runtime] Add state persistence to PostTransformOperator for projection schema evolution#4280

Closed
VinaySagarGonabavi wants to merge 1 commit intoapache:masterfrom
VinaySagarGonabavi:fix-projection-schema-evolution
Closed

[FLINK-38828][cdc-runtime] Add state persistence to PostTransformOperator for projection schema evolution#4280
VinaySagarGonabavi wants to merge 1 commit intoapache:masterfrom
VinaySagarGonabavi:fix-projection-schema-evolution

Conversation

@VinaySagarGonabavi
Copy link

What is the purpose of the change

When a Flink CDC pipeline uses transform projections (e.g., *, UPPER(col) AS computed_col) and the pipeline is restarted from a savepoint/checkpoint with an updated projection (e.g., adding or removing a computed column), the PostTransformOperator does not detect the projection change. The downstream sink never receives schema change events for the new or removed columns, leaving the sink schema stale and out of sync with the actual projected data.

This PR adds state persistence to PostTransformOperator so that on restore, it can detect differences between the old (checkpointed) post-transform schema and the newly calculated one, and emit appropriate AddColumnEvent/DropColumnEvent before processing data.

Brief change log

  • PostTransformChangeInfo.java: Added Serializer inner class implementing SimpleVersionedSerializer<PostTransformChangeInfo> with backward-compatible deserialization using a magic marker pattern (__magic_post_transform__)

  • PostTransformOperator.java:

    • Added initializeState() / snapshotState() overrides using union list state (ListState<byte[]>)
    • On restore, recalculates post-transform schemas with current projection rules using all effective transformers with SchemaMergingUtils.strictlyMergeSchemas() (matching processCreateTableEvent logic) and generates AddColumnEvent/DropColumnEvent for detected differences
    • Emits pending schema changes before the first DataChangeEvent per table
    • CreateTableEvent clears pending changes for the table (fresh schema supersedes old diff)
    • Moved invalidateCache(tableId) to before processing CreateTableEvent/SchemaChangeEvent (was after), ensuring fresh processors are created with the new schema
    • Modified open() with guards against double-initialization since initializeState() now runs before open()

Verifying this change

This change added 16 new tests:

Unit tests (PostTransformChangeInfoTest.java — 7 tests):

  • Serializer version number verification
  • Roundtrip serialization with simple schema
  • Roundtrip serialization with complex schema (TIMESTAMP, DECIMAL, BYTES)
  • Roundtrip with single-part TableId (no namespace/schema)
  • Backward-compatible deserialization of old format (no magic marker)
  • Backward-compatible deserialization of old format with complex schema
  • Equivalence between new and old format deserialization

Integration tests (TransformOperatorWithSchemaEvolveTest.java — 9 tests):

  • generateSchemaChangeEvents produces correct AddColumnEvent at LAST position
  • generateSchemaChangeEvents produces correct AddColumnEvent with AFTER position
  • generateSchemaChangeEvents produces correct AddColumnEvent at FIRST position
  • generateSchemaChangeEvents produces correct DropColumnEvent
  • Mixed add/drop column changes in a single diff
  • No-op when schemas are identical
  • Pending schema changes emitted before DataChangeEvent
  • CreateTableEvent clears pending changes
  • Multiple pending schema changes emitted in order

Does this pull request potentially affect one of the following parts

  • Dependencies: no
  • The public API: no
  • The runtime per-record code path: yes (adds a single HashMap.remove() call per DataChangeEvent; returns null after the first event per table — negligible overhead)
  • Anything that affects deployment or recovery: yes (adds operator state for checkpoint/savepoint persistence)

Documentation

  • Does this pull request introduce a new feature? No (fixes existing behavior gap)
  • If yes, how is the feature documented? N/A

@yuxiqian
Copy link
Member

yuxiqian commented Feb 28, 2026

Hi @VinaySagarGonabavi, thanks for your contribution. Actually Transform operator used to be stateful until #4056. The idea is transform rules itself is deterministic and making it a pure mapping node makes more sense.

As for the problem mentioned above, Transform operator should emit CreateTableEvent with newly transformed schema after restarting from checkpoint. Should the SchemaOperator and SchemaCoordinator (they're stateful) take the responsibility of handling these changes and emit correct schema change events to downstream?

@VinaySagarGonabavi
Copy link
Author

Hi @VinaySagarGonabavi, thanks for your contribution. Actually Transform operator used to be stateful until #4056. The idea is transform rules itself is deterministic and making it a pure mapping node makes more sense.

As for the problem mentioned above, Transform operator should emit CreateTableEvent with newly transformed schema after restarting from checkpoint. Should the SchemaOperator and SchemaCoordinator (they're stateful) take the responsibility of handling these changes and emit correct schema change events to downstream?

@yuxiqian Thanks for looking into the PR, and explaining the design direction. I agree with the approach suggested. There is a follow up PR (v2) with handling in SchemaCoordinator and SchemaOperator #4299 . Please take a look and provide your review when you get a chance

@VinaySagarGonabavi
Copy link
Author

An alternate approach is made for the issue in #4299 based on feedback in this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants