[FLINK-38828][cdc-common][cdc-runtime] Handle schema evolution upon projection updates via SchemaCoordinator#4299
Open
VinaySagarGonabavi wants to merge 1 commit intoapache:masterfrom
Conversation
…rojection updates via SchemaCoordinator
yuxiqian
reviewed
Mar 19, 2026
Comment on lines
+291
to
+313
| // If it's a one-by-one routing rule, we can simply forward it to downstream | ||
| // sink. However, if the incoming event is a CreateTableEvent for an | ||
| // already-known evolved table (e.g. after restart with changed projections), | ||
| // we must compute the schema diff instead of forwarding the raw | ||
| // CreateTableEvent, which would fail at the sink. | ||
| if (event instanceof CreateTableEvent && currentEvolvedSchema != null) { | ||
| CreateTableEvent createTableEvent = (CreateTableEvent) event; | ||
| List<SchemaChangeEvent> diffEvents = | ||
| SchemaMergingUtils.getSchemaDifference( | ||
| evolvedTableId, | ||
| currentEvolvedSchema, | ||
| createTableEvent.getSchema()); | ||
| rawSchemaChangeEvents.addAll(diffEvents); | ||
| LOG.info( | ||
| "Step 3.3 - It's a one-by-one routing but CreateTableEvent for existing table. Computed diff events: {}.", | ||
| diffEvents); | ||
| } else { | ||
| SchemaChangeEvent rawEvent = event.copy(evolvedTableId); | ||
| rawSchemaChangeEvents.add(rawEvent); | ||
| LOG.info( | ||
| "Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", | ||
| rawEvent); | ||
| } |
Member
There was a problem hiding this comment.
Can you add E2e test cases in org.apache.flink.cdc.pipeline.tests.migration to cover these scenarios?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This is a v2 approach following feedback on PR #4280 — schema evolution responsibility stays in
SchemaCoordinatorrather than adding state toPostTransformOperator, keeping the transform operator stateless as intended by PR #4056.When a Flink CDC pipeline uses transform projections and the pipeline is restarted from a savepoint/checkpoint with an updated projection, the downstream sink never receives schema change events for the new/removed columns, leaving the sink schema stale and out of sync with the actual projected data.
This PR fixes the issue by teaching SchemaCoordinator to detect schema differences when a
CreateTableEventarrives for an already-known table. Instead of treating it as redundant, the coordinator computes the schema diff and emits appropriateAddColumnEvent/DropColumnEvent/AlterColumnTypeEventdownstream.PostTransformOperatorremains stateless - it's a pure mapping node.Brief change log
SchemaUtils.isSchemaChangeEventRedundant(): CreateTableEvent is now redundant only if the table exists AND the schema is identical (was: just existence check)SchemaCoordinator.deduceEvolvedSchemaChanges(): When a CreateTableEvent arrives for an existing table in 1:1 routing, computes schema diff via SchemaMergingUtils.getSchemaDifference() and returns diff events instead of forwarding the raw CreateTableEventNote:
isSchemaChangeEventRedundantis also called by Hudi and Paimon sink connectors. The behavioral change is a bugfix for those paths as well (they were silently dropping CreateTableEvents with changed schemas), but no connector-specific code was modified.Verifying this change
3 new test methods across 3 files:
SchemaUtilsTest: Redundancy checks for CreateTableEvent - non-existent table, same schema, extra columns, type diff, fewer columnsSchemaMergingUtilsTest: Diff scenarios for projection changes - add column, drop column, type change, swap columns, identical schemasSchemaEvolveTest: End-to-end through SchemaOperator + SchemaCoordinator - add column, drop column, type change, identical schema (no-op)Does this pull request potentially affect one of the following parts
Documentation