[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes#4279
Conversation
…prevent pipeline crashes
b7876af to
8962d4f
Compare
There was a problem hiding this comment.
Pull request overview
Fixes Flink CDC pipeline crashes caused by duplicate AddColumnEvents (e.g., emitted by gh-ost/pt-osc during online schema migrations) by filtering redundant column-add operations and making add-column schema application idempotent.
Changes:
- Add
SchemaUtils.filterRedundantAddColumns(...)to drop already-existing columns and intra-event duplicates (returning empty when fully redundant). - Make
SchemaUtilsadd-column application skip already-existing column names instead of throwing. - Apply redundant-add filtering in both
PreTransformOperatorandPostTransformOperator, and add unit/integration tests for duplicate scenarios.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java | Adds redundant AddColumn filtering utility; makes add-column application idempotent. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java | Filters redundant AddColumnEvents before schema application (but introduces an early-return risk). |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java | Filters redundant AddColumnEvents before applying schema changes in post-transform. |
| flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java | Adds unit tests for redundant filtering and idempotent add-column application. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java | Adds integration tests for duplicate/partial-duplicate add-column events through transform operators. |
Comments suppressed due to low confidence (2)
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:928
- This Javadoc says the duplicate event is "sent directly to the post-transform operator", but the test uses
evolveFromSource(...), which exercises both operators in the normal pipeline. Either adjust the wording to match what is actually being tested, or extend the harness/test case to inject schema-change events directly intoPostTransformOperatorif that behavior is important to cover.
/**
* This case tests that duplicate AddColumnEvents are handled gracefully by
* PostTransformOperator. When the same column is added twice, the second event should be
* filtered out even when sent directly to the post-transform operator.
*/
@Test
void testDuplicateAddColumnEventPostTransform() throws Exception {
TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
TransformWithSchemaEvolveTestCase.of(
tableId,
"*, id + age as computed",
"name <> 'Alice'",
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:852
- The PR description lists integration tests named
testDuplicateAddColumnEventsInPreTransform/testDuplicateAddColumnEventsInPostTransform, but the actual test method name here istestDuplicateAddColumnEventPreTransform. If the listed names are meant to be accurate for verification automation or documentation, please align either the method names or the description.
@Test
void testDuplicateAddColumnEventPreTransform() throws Exception {
TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
TransformWithSchemaEvolveTestCase.of(
tableId,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| null)))) | ||
| .expectNothingInPreTransformed() | ||
| .expectNothingInPostTransformed() | ||
| .runTests("duplicate add extras column should be filtered") |
There was a problem hiding this comment.
These duplicate-AddColumnEvent tests stop immediately after asserting the redundant event is filtered. To prevent regressions like losing internal transform processors/state, it would be useful to also process at least one subsequent DataChangeEvent after the duplicate schema event and assert the pipeline continues to transform records correctly (i.e., no operator crash / missing schema view).
| .runTests("duplicate add extras column should be filtered") | |
| .runTests("duplicate add extras column should be filtered") | |
| // After the duplicate AddColumnEvent has been filtered, process a subsequent | |
| // DataChangeEvent to verify that the pipeline continues to transform records | |
| // correctly (i.e., no operator crash or missing schema view). | |
| // The concrete DataChangeEvent and its expected transformed output should follow | |
| // the same construction patterns as other tests in this class. | |
| // For example (pseudocode, to be aligned with existing helpers in this test): | |
| // .evolveFromSource( | |
| // someInsertOrUpdateDataChangeEventFor(tableId, id, name, age, extras)) | |
| // .expectInPreTransformed(expectedPreTransformEvent) | |
| // .expectInPostTransformed(expectedPostTransformEvent) | |
| // .runTests("data change after duplicate add extras column is still processed") |
There was a problem hiding this comment.
In the new commit, both testDuplicateAddColumnEventPreTransform() and testDuplicateAddColumnEventPostTransform() send a DataChangeEvent after the duplicate AddColumnEvent and assert it's processed correctly through the transform pipeline
| // Filter out redundant AddColumnEvent columns that already exist in the schema | ||
| // to handle duplicate events from tools like gh-ost online schema migrations | ||
| if (event instanceof AddColumnEvent) { | ||
| AddColumnEvent addColumnEvent = (AddColumnEvent) event; | ||
| Schema currentSchema = tableChangeInfo.getSourceSchema(); | ||
| Optional<AddColumnEvent> filtered = | ||
| SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent); | ||
| if (!filtered.isPresent()) { | ||
| LOG.debug( | ||
| "Skipping fully redundant AddColumnEvent for table {} " | ||
| + "- all columns already exist", | ||
| tableId); | ||
| return Optional.empty(); | ||
| } | ||
| event = filtered.get(); | ||
| } |
There was a problem hiding this comment.
In processEvent, preTransformProcessorMap.remove(tableId) is executed for every SchemaChangeEvent. With the new early-return here, a fully-redundant AddColumnEvent will return Optional.empty() before cachePreTransformProcessor(...) is called, so the processor map entry is never rebuilt. This can cause subsequent DataChangeEvents for the table to fail the processor != null check and crash the pipeline. Ensure the processor is re-cached even when the AddColumnEvent is filtered to a no-op (or avoid removing the processor map entry in the redundant case).
There was a problem hiding this comment.
Removed preTransformProcessorMap.remove(tableId) before cacheChangeSchema(). Now the processor stays intact on the filtered path and gets overwritten via cachePreTransformProcessor() on the non-filtered path
| Set<String> existingColumnNames = | ||
| columns.stream() | ||
| .map(Column::getName) | ||
| .collect(Collectors.toCollection(HashSet::new)); | ||
| for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { | ||
| // Skip columns that already exist in the schema to handle duplicate AddColumnEvents | ||
| // (e.g., from gh-ost online schema migrations) | ||
| if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
applyAddColumnEvent now silently skips adding a column when the name already exists. This can mask upstream inconsistencies (e.g., same column name but different type/comment/default), leaving the schema potentially out of sync with the source without any signal. Consider validating that the existing column definition matches the incoming addColumn (and throw or at least log/warn when it differs) so only true duplicates are treated as idempotent.
There was a problem hiding this comment.
Added LOG field to SchemaUtils. When skipping a duplicate column, compares DataType of existing vs incoming column and logs at WARN level if they differ
…dation, add type-mismatch warnings, extract shared filter helper
| * Shared utility methods for filtering schema change events in both {@link PreTransformOperator} | ||
| * and {@link PostTransformOperator}. | ||
| */ | ||
| class TransformSchemaChangeUtils { |
There was a problem hiding this comment.
Is it worth creating a new Utils class for merely such a method? Putting it in SchemaUtils makes more sense.
There was a problem hiding this comment.
++. Moved the method to SchemaUtils
| * @return the filtered event, or {@link Optional#empty()} if the event is fully redundant | ||
| */ | ||
| static Optional<SchemaChangeEvent> filterDuplicateAddColumns( | ||
| Schema currentSchema, SchemaChangeEvent event, Logger log) { |
There was a problem hiding this comment.
It's unusual to pass the Logger as argument.
There was a problem hiding this comment.
Avoided this pattern in the new commit
| * after duplicate filtering, with the computed column correctly evaluated. | ||
| */ | ||
| @Test | ||
| void testDuplicateAddColumnEventPreTransform() throws Exception { |
There was a problem hiding this comment.
Please also add an ITCase or E2e test in flink-cdc-composer.
| if (!existingColumn | ||
| .getType() | ||
| .equals(incomingColumn.getType())) { |
There was a problem hiding this comment.
What will happen if existing column definition and incoming definition is incompatible? Will there be coercions or implicit casting?
There was a problem hiding this comment.
No coercions or implicit casting are performed. When a duplicate column is detected by name, the existing column definition is preserved as-is and the incoming duplicate is skipped. If the types differ, a WARN log is emitted: "Skipping duplicate column '{}' for table {} but types differ: existing={}, incoming={}". This is intentional for the gh-ost use case. Duplicate AddColumnEvents from online schema migration tools should have matching types. A type mismatch indicates a potential upstream inconsistency that should be investigated, but we don't want to crash the pipeline over it.
… to SchemaUtils, document type-mismatch behavior
…iltering through pipeline
What is the purpose of the change
Flink CDC pipelines crash with
IllegalArgumentException: Field names must be uniquewhen duplicateAddColumnEvents are received for the same column. This commonly occurs during online schema migrationtools like gh-ost which perform shadow-table operations that can produce duplicate schema change events.
Root cause: Neither
SchemaUtils.applySchemaChange()nor the transform operators check whether acolumn already exists before applying an
AddColumnEvent, causing the schema to contain duplicate fieldnames and triggering the exception in
Schema.Builder.Related JIRA: FLINK-38830
Brief change log
This PR implements a two-layer defense strategy:
Proactive filtering (Layer 1): New
SchemaUtils.filterRedundantAddColumns()utility method thatfilters out
AddColumnEvent.ColumnWithPositionentries where the column already exists in the currentschema, and also deduplicates columns within the same event. This is called in both
PreTransformOperatorandPostTransformOperatorbefore applying schema changes, convertingfully-redundant events to no-ops.
Idempotent safety net (Layer 2):
SchemaUtils.applyAddColumnEvent()is made idempotent — if acolumn with the same name already exists, the add is silently skipped instead of throwing an exception.
This catches any edge cases not handled by Layer 1.
Files changed:
flink-cdc-common/.../utils/SchemaUtils.java— AddedfilterRedundantAddColumns()and madeapplyAddColumnEvent()idempotentflink-cdc-runtime/.../operators/transform/PreTransformOperator.java— Calls filter before applying schema changesflink-cdc-runtime/.../operators/transform/PostTransformOperator.java— Calls filter before applying schema changesflink-cdc-common/.../utils/SchemaUtilsTest.java— 8 new unit tests covering all duplicate scenariosflink-cdc-runtime/.../transform/TransformOperatorWithSchemaEvolveTest.java— 2 new integration testsVerifying this change
This change adds 10 new tests (917 total tests, 0 failures):
8 unit tests in
SchemaUtilsTest:testFilterRedundantAddColumns_noDuplicatestestFilterRedundantAddColumns_allDuplicatestestFilterRedundantAddColumns_partialDuplicatestestFilterRedundantAddColumns_emptyEventtestFilterRedundantAddColumns_withPositionstestFilterRedundantAddColumns_intraEventDuplicatestestApplyAddColumnEvent_idempotenttestApplyAddColumnEvent_allDuplicates2 integration tests in
TransformOperatorWithSchemaEvolveTest:testDuplicateAddColumnEventsInPreTransformtestDuplicateAddColumnEventsInPostTransformDoes this pull request potentially affect one of the following parts
@Public(Evolving): noSet.contains()check per column in AddColumnEvents only)Documentation