[FLINK-38828][cdc-runtime] Add state persistence to PostTransformOperator for projection schema evolution#4280
Conversation
…ator for projection schema evolution
|
Hi @VinaySagarGonabavi, thanks for your contribution. Actually Transform operator used to be stateful until #4056. The idea is transform rules itself is deterministic and making it a pure mapping node makes more sense. As for the problem mentioned above, Transform operator should emit |
@yuxiqian Thanks for looking into the PR, and explaining the design direction. I agree with the approach suggested. There is a follow up PR (v2) with handling in |
|
An alternate approach is made for the issue in #4299 based on feedback in this PR |
What is the purpose of the change
When a Flink CDC pipeline uses transform projections (e.g.,
*, UPPER(col) AS computed_col) and the pipeline is restarted from a savepoint/checkpoint with an updated projection (e.g., adding or removing a computed column), thePostTransformOperatordoes not detect the projection change. The downstream sink never receives schema change events for the new or removed columns, leaving the sink schema stale and out of sync with the actual projected data.This PR adds state persistence to
PostTransformOperatorso that on restore, it can detect differences between the old (checkpointed) post-transform schema and the newly calculated one, and emit appropriateAddColumnEvent/DropColumnEventbefore processing data.Brief change log
PostTransformChangeInfo.java: AddedSerializerinner class implementingSimpleVersionedSerializer<PostTransformChangeInfo>with backward-compatible deserialization using a magic marker pattern (__magic_post_transform__)PostTransformOperator.java:initializeState()/snapshotState()overrides using union list state (ListState<byte[]>)SchemaMergingUtils.strictlyMergeSchemas()(matchingprocessCreateTableEventlogic) and generatesAddColumnEvent/DropColumnEventfor detected differencesDataChangeEventper tableCreateTableEventclears pending changes for the table (fresh schema supersedes old diff)invalidateCache(tableId)to before processingCreateTableEvent/SchemaChangeEvent(was after), ensuring fresh processors are created with the new schemaopen()with guards against double-initialization sinceinitializeState()now runs beforeopen()Verifying this change
This change added 16 new tests:
Unit tests (
PostTransformChangeInfoTest.java— 7 tests):Integration tests (
TransformOperatorWithSchemaEvolveTest.java— 9 tests):generateSchemaChangeEventsproduces correctAddColumnEventat LAST positiongenerateSchemaChangeEventsproduces correctAddColumnEventwith AFTER positiongenerateSchemaChangeEventsproduces correctAddColumnEventat FIRST positiongenerateSchemaChangeEventsproduces correctDropColumnEventDataChangeEventCreateTableEventclears pending changesDoes this pull request potentially affect one of the following parts
HashMap.remove()call perDataChangeEvent; returnsnullafter the first event per table — negligible overhead)Documentation