Skip to content

[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes#4279

Open
VinaySagarGonabavi wants to merge 5 commits intoapache:masterfrom
VinaySagarGonabavi:fix-duplicate-add-column-events
Open

[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes#4279
VinaySagarGonabavi wants to merge 5 commits intoapache:masterfrom
VinaySagarGonabavi:fix-duplicate-add-column-events

Conversation

@VinaySagarGonabavi
Copy link

@VinaySagarGonabavi VinaySagarGonabavi commented Feb 23, 2026

What is the purpose of the change

Flink CDC pipelines crash with IllegalArgumentException: Field names must be unique when duplicate
AddColumnEvents are received for the same column. This commonly occurs during online schema migration
tools like gh-ost which perform shadow-table operations that can produce duplicate schema change events.

Root cause: Neither SchemaUtils.applySchemaChange() nor the transform operators check whether a
column already exists before applying an AddColumnEvent, causing the schema to contain duplicate field
names and triggering the exception in Schema.Builder.

Related JIRA: FLINK-38830

Brief change log

This PR implements a two-layer defense strategy:

  1. Proactive filtering (Layer 1): New SchemaUtils.filterRedundantAddColumns() utility method that
    filters out AddColumnEvent.ColumnWithPosition entries where the column already exists in the current
    schema, and also deduplicates columns within the same event. This is called in both
    PreTransformOperator and PostTransformOperator before applying schema changes, converting
    fully-redundant events to no-ops.

  2. Idempotent safety net (Layer 2): SchemaUtils.applyAddColumnEvent() is made idempotent — if a
    column with the same name already exists, the add is silently skipped instead of throwing an exception.
    This catches any edge cases not handled by Layer 1.

Files changed:

  • flink-cdc-common/.../utils/SchemaUtils.java — Added filterRedundantAddColumns() and made applyAddColumnEvent() idempotent
  • flink-cdc-runtime/.../operators/transform/PreTransformOperator.java — Calls filter before applying schema changes
  • flink-cdc-runtime/.../operators/transform/PostTransformOperator.java — Calls filter before applying schema changes
  • flink-cdc-common/.../utils/SchemaUtilsTest.java — 8 new unit tests covering all duplicate scenarios
  • flink-cdc-runtime/.../transform/TransformOperatorWithSchemaEvolveTest.java — 2 new integration tests

Verifying this change

This change adds 10 new tests (917 total tests, 0 failures):

  • 8 unit tests in SchemaUtilsTest:

    • testFilterRedundantAddColumns_noDuplicates
    • testFilterRedundantAddColumns_allDuplicates
    • testFilterRedundantAddColumns_partialDuplicates
    • testFilterRedundantAddColumns_emptyEvent
    • testFilterRedundantAddColumns_withPositions
    • testFilterRedundantAddColumns_intraEventDuplicates
    • testApplyAddColumnEvent_idempotent
    • testApplyAddColumnEvent_allDuplicates
  • 2 integration tests in TransformOperatorWithSchemaEvolveTest:

    • testDuplicateAddColumnEventsInPreTransform
    • testDuplicateAddColumnEventsInPostTransform

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (minimal — one Set.contains() check per column in AddColumnEvents only)
  • Anything that affects deployment or recovery: no

Documentation

  • Does this pull request introduce a new feature? no (bug fix)
  • If yes, how is the feature documented? N/A

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes Flink CDC pipeline crashes caused by duplicate AddColumnEvents (e.g., emitted by gh-ost/pt-osc during online schema migrations) by filtering redundant column-add operations and making add-column schema application idempotent.

Changes:

  • Add SchemaUtils.filterRedundantAddColumns(...) to drop already-existing columns and intra-event duplicates (returning empty when fully redundant).
  • Make SchemaUtils add-column application skip already-existing column names instead of throwing.
  • Apply redundant-add filtering in both PreTransformOperator and PostTransformOperator, and add unit/integration tests for duplicate scenarios.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java Adds redundant AddColumn filtering utility; makes add-column application idempotent.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java Filters redundant AddColumnEvents before schema application (but introduces an early-return risk).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java Filters redundant AddColumnEvents before applying schema changes in post-transform.
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java Adds unit tests for redundant filtering and idempotent add-column application.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java Adds integration tests for duplicate/partial-duplicate add-column events through transform operators.
Comments suppressed due to low confidence (2)

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:928

  • This Javadoc says the duplicate event is "sent directly to the post-transform operator", but the test uses evolveFromSource(...), which exercises both operators in the normal pipeline. Either adjust the wording to match what is actually being tested, or extend the harness/test case to inject schema-change events directly into PostTransformOperator if that behavior is important to cover.
    /**
     * This case tests that duplicate AddColumnEvents are handled gracefully by
     * PostTransformOperator. When the same column is added twice, the second event should be
     * filtered out even when sent directly to the post-transform operator.
     */
    @Test
    void testDuplicateAddColumnEventPostTransform() throws Exception {
        TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
        TransformWithSchemaEvolveTestCase.of(
                        tableId,
                        "*, id + age as computed",
                        "name <> 'Alice'",

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:852

  • The PR description lists integration tests named testDuplicateAddColumnEventsInPreTransform / testDuplicateAddColumnEventsInPostTransform, but the actual test method name here is testDuplicateAddColumnEventPreTransform. If the listed names are meant to be accurate for verification automation or documentation, please align either the method names or the description.
    @Test
    void testDuplicateAddColumnEventPreTransform() throws Exception {
        TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
        TransformWithSchemaEvolveTestCase.of(
                        tableId,

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

null))))
.expectNothingInPreTransformed()
.expectNothingInPostTransformed()
.runTests("duplicate add extras column should be filtered")
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These duplicate-AddColumnEvent tests stop immediately after asserting the redundant event is filtered. To prevent regressions like losing internal transform processors/state, it would be useful to also process at least one subsequent DataChangeEvent after the duplicate schema event and assert the pipeline continues to transform records correctly (i.e., no operator crash / missing schema view).

Suggested change
.runTests("duplicate add extras column should be filtered")
.runTests("duplicate add extras column should be filtered")
// After the duplicate AddColumnEvent has been filtered, process a subsequent
// DataChangeEvent to verify that the pipeline continues to transform records
// correctly (i.e., no operator crash or missing schema view).
// The concrete DataChangeEvent and its expected transformed output should follow
// the same construction patterns as other tests in this class.
// For example (pseudocode, to be aligned with existing helpers in this test):
// .evolveFromSource(
// someInsertOrUpdateDataChangeEventFor(tableId, id, name, age, extras))
// .expectInPreTransformed(expectedPreTransformEvent)
// .expectInPostTransformed(expectedPostTransformEvent)
// .runTests("data change after duplicate add extras column is still processed")

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new commit, both testDuplicateAddColumnEventPreTransform() and testDuplicateAddColumnEventPostTransform() send a DataChangeEvent after the duplicate AddColumnEvent and assert it's processed correctly through the transform pipeline

Comment on lines +224 to +239
// Filter out redundant AddColumnEvent columns that already exist in the schema
// to handle duplicate events from tools like gh-ost online schema migrations
if (event instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
Schema currentSchema = tableChangeInfo.getSourceSchema();
Optional<AddColumnEvent> filtered =
SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent);
if (!filtered.isPresent()) {
LOG.debug(
"Skipping fully redundant AddColumnEvent for table {} "
+ "- all columns already exist",
tableId);
return Optional.empty();
}
event = filtered.get();
}
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In processEvent, preTransformProcessorMap.remove(tableId) is executed for every SchemaChangeEvent. With the new early-return here, a fully-redundant AddColumnEvent will return Optional.empty() before cachePreTransformProcessor(...) is called, so the processor map entry is never rebuilt. This can cause subsequent DataChangeEvents for the table to fail the processor != null check and crash the pipeline. Ensure the processor is re-cached even when the AddColumnEvent is filtered to a no-op (or avoid removing the processor map entry in the redundant case).

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed preTransformProcessorMap.remove(tableId) before cacheChangeSchema(). Now the processor stays intact on the filtered path and gets overwritten via cachePreTransformProcessor() on the non-filtered path

Comment on lines +123 to +132
Set<String> existingColumnNames =
columns.stream()
.map(Column::getName)
.collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
// Skip columns that already exist in the schema to handle duplicate AddColumnEvents
// (e.g., from gh-ost online schema migrations)
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
continue;
}
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyAddColumnEvent now silently skips adding a column when the name already exists. This can mask upstream inconsistencies (e.g., same column name but different type/comment/default), leaving the schema potentially out of sync with the source without any signal. Consider validating that the existing column definition matches the incoming addColumn (and throw or at least log/warn when it differs) so only true duplicates are treated as idempotent.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added LOG field to SchemaUtils. When skipping a duplicate column, compares DataType of existing vs incoming column and logs at WARN level if they differ

…dation, add type-mismatch warnings, extract shared filter helper
@yuxiqian yuxiqian self-requested a review March 10, 2026 03:46
* Shared utility methods for filtering schema change events in both {@link PreTransformOperator}
* and {@link PostTransformOperator}.
*/
class TransformSchemaChangeUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth creating a new Utils class for merely such a method? Putting it in SchemaUtils makes more sense.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++. Moved the method to SchemaUtils

* @return the filtered event, or {@link Optional#empty()} if the event is fully redundant
*/
static Optional<SchemaChangeEvent> filterDuplicateAddColumns(
Schema currentSchema, SchemaChangeEvent event, Logger log) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unusual to pass the Logger as argument.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoided this pattern in the new commit

* after duplicate filtering, with the computed column correctly evaluated.
*/
@Test
void testDuplicateAddColumnEventPreTransform() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add an ITCase or E2e test in flink-cdc-composer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New ITCase is added

Comment on lines +143 to +145
if (!existingColumn
.getType()
.equals(incomingColumn.getType())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if existing column definition and incoming definition is incompatible? Will there be coercions or implicit casting?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No coercions or implicit casting are performed. When a duplicate column is detected by name, the existing column definition is preserved as-is and the incoming duplicate is skipped. If the types differ, a WARN log is emitted: "Skipping duplicate column '{}' for table {} but types differ: existing={}, incoming={}". This is intentional for the gh-ost use case. Duplicate AddColumnEvents from online schema migration tools should have matching types. A type mismatch indicates a potential upstream inconsistency that should be investigated, but we don't want to crash the pipeline over it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants