Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -55,6 +59,8 @@
@PublicEvolving
public class SchemaUtils {

private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class);

/**
* create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from
* RecordData.
Expand Down Expand Up @@ -120,7 +126,36 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve

private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
Set<String> existingColumnNames =
columns.stream()
.map(Column::getName)
.collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
// Skip columns that already exist in the schema to handle duplicate AddColumnEvents
// (e.g., from gh-ost online schema migrations)
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
Column incomingColumn = columnWithPosition.getAddColumn();
columns.stream()
.filter(c -> c.getName().equals(incomingColumn.getName()))
.findFirst()
.ifPresent(
existingColumn -> {
if (!existingColumn
.getType()
.equals(incomingColumn.getType())) {
Comment on lines +143 to +145
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if existing column definition and incoming definition is incompatible? Will there be coercions or implicit casting?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No coercions or implicit casting are performed. When a duplicate column is detected by name, the existing column definition is preserved as-is and the incoming duplicate is skipped. If the types differ, a WARN log is emitted: "Skipping duplicate column '{}' for table {} but types differ: existing={}, incoming={}". This is intentional for the gh-ost use case. Duplicate AddColumnEvents from online schema migration tools should have matching types. A type mismatch indicates a potential upstream inconsistency that should be investigated, but we don't want to crash the pipeline over it.

// No coercion is performed; the existing column
// definition is preserved as-is.
LOG.warn(
"Skipping duplicate column '{}' for table {} but types differ: "
+ "existing={}, incoming={}",
incomingColumn.getName(),
event.tableId(),
existingColumn.getType(),
incomingColumn.getType());
}
});
continue;
}
Comment on lines +129 to +158
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyAddColumnEvent now silently skips adding a column when the name already exists. This can mask upstream inconsistencies (e.g., same column name but different type/comment/default), leaving the schema potentially out of sync with the source without any signal. Consider validating that the existing column definition matches the incoming addColumn (and throw or at least log/warn when it differs) so only true duplicates are treated as idempotent.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added LOG field to SchemaUtils. When skipping a duplicate column, compares DataType of existing vs incoming column and logs at WARN level if they differ

switch (columnWithPosition.getPosition()) {
case FIRST:
{
Expand Down Expand Up @@ -165,10 +200,92 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
break;
}
}
existingColumnNames.add(columnWithPosition.getAddColumn().getName());
}
return oldSchema.copy(columns);
}

/**
* Filters out redundant columns from an {@link AddColumnEvent} that already exist in the
* current schema, and deduplicates columns within the same event. Returns {@link
* Optional#empty()} if all columns are redundant.
*
* <p>This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events
* may be emitted for the same column.
*
* <p><b>Note:</b> Duplicate detection is based on column name only. If a duplicate
* AddColumnEvent arrives with a different type for an existing column name, a warning will be
* logged but the column will still be skipped. This is the expected behavior for online schema
* migration tools (gh-ost, pt-osc) where duplicate events are always exact copies.
*/
public static Optional<AddColumnEvent> filterRedundantAddColumns(
Schema currentSchema, AddColumnEvent event) {
Map<String, Column> existingColumns = new HashMap<>();
for (Column col : currentSchema.getColumns()) {
existingColumns.put(col.getName(), col);
}
Set<String> seenColumns = new HashSet<>(existingColumns.keySet());
List<AddColumnEvent.ColumnWithPosition> nonRedundant = new ArrayList<>();
List<String> filteredColumnNames = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition cwp : event.getAddedColumns()) {
String colName = cwp.getAddColumn().getName();
if (seenColumns.add(colName)) {
nonRedundant.add(cwp);
} else {
filteredColumnNames.add(colName);
Column existingCol = existingColumns.get(colName);
if (existingCol != null
&& !existingCol.getType().equals(cwp.getAddColumn().getType())) {
LOG.warn(
"Skipping duplicate column '{}' for table {} but types differ: "
+ "existing={}, incoming={}",
colName,
event.tableId(),
existingCol.getType(),
cwp.getAddColumn().getType());
}
}
}
if (nonRedundant.isEmpty()) {
return Optional.empty();
}
if (!filteredColumnNames.isEmpty()) {
LOG.debug(
"Filtered redundant columns {} from AddColumnEvent for table {}",
filteredColumnNames,
event.tableId());
}
if (nonRedundant.size() == event.getAddedColumns().size()) {
return Optional.of(event);
}
return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant));
}

/**
* Filters duplicate {@link AddColumnEvent} columns that already exist in the given schema. For
* non-AddColumnEvent schema changes, the event is returned as-is.
*
* @param currentSchema the current schema to check against
* @param event the schema change event to filter
* @return the filtered event, or {@link Optional#empty()} if the event is fully redundant
*/
public static Optional<SchemaChangeEvent> filterDuplicateAddColumns(
Schema currentSchema, SchemaChangeEvent event) {
if (!(event instanceof AddColumnEvent)) {
return Optional.of(event);
}
Optional<AddColumnEvent> filtered =
filterRedundantAddColumns(currentSchema, (AddColumnEvent) event);
if (!filtered.isPresent()) {
LOG.debug(
"Skipping fully redundant AddColumnEvent for table {} "
+ "- all columns already exist",
event.tableId());
return Optional.empty();
}
return Optional.of(filtered.get());
}

private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
List<Column> columns =
oldSchema.getColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */
class SchemaUtilsTest {
Expand Down Expand Up @@ -484,4 +486,231 @@ void testInferWiderSchema() {
.build()))
.isExactlyInstanceOf(IllegalStateException.class);
}

// ========================== Tests for duplicate AddColumnEvent handling
// ==========================

@Test
void testFilterRedundantAddColumns_allDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isEmpty();
}

@Test
void testFilterRedundantAddColumns_noDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("email", DataTypes.STRING())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(2);
Assertions.assertThat(result.get()).isEqualTo(addColumnEvent);
}

@Test
void testFilterRedundantAddColumns_partialDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
.isEqualTo("age");
}

@Test
void testFilterRedundantAddColumns_emptyAddedColumns() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, Collections.emptyList());

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isEmpty();
}

@Test
void testApplyAddColumnEvent_idempotent() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build());
}

@Test
void testApplyAddColumnEvent_allDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build());
}

@Test
void testFilterRedundantAddColumns_withPositions() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
"id"));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT()),
AddColumnEvent.ColumnPosition.AFTER,
"name"));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);

AddColumnEvent.ColumnWithPosition remaining = result.get().getAddedColumns().get(0);
Assertions.assertThat(remaining.getAddColumn().getName()).isEqualTo("age");
Assertions.assertThat(remaining.getPosition())
.isEqualTo(AddColumnEvent.ColumnPosition.AFTER);
Assertions.assertThat(remaining.getExistedColumnName()).isEqualTo("name");
}

@Test
void testFilterRedundantAddColumns_intraEventDuplicates() {
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();
TableId tableId = TableId.tableId("default", "schema", "table");
AddColumnEvent event =
new AddColumnEvent(
tableId,
Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING()))));
Optional<AddColumnEvent> result = SchemaUtils.filterRedundantAddColumns(schema, event);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
.isEqualTo("name");
}

@Test
void testApplyAddColumnEvent_duplicateWithinSameEvent() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build());
}
}
Loading