-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes #4279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8962d4f
8013129
c58a959
6d43f8d
8a303d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,10 +37,14 @@ | |
| import org.apache.flink.cdc.common.types.TimestampType; | ||
| import org.apache.flink.cdc.common.types.ZonedTimestampType; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.CheckReturnValue; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
|
|
@@ -55,6 +59,8 @@ | |
| @PublicEvolving | ||
| public class SchemaUtils { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); | ||
|
|
||
| /** | ||
| * create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from | ||
| * RecordData. | ||
|
|
@@ -120,7 +126,36 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve | |
|
|
||
| private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) { | ||
| LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns()); | ||
| Set<String> existingColumnNames = | ||
| columns.stream() | ||
| .map(Column::getName) | ||
| .collect(Collectors.toCollection(HashSet::new)); | ||
| for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { | ||
| // Skip columns that already exist in the schema to handle duplicate AddColumnEvents | ||
| // (e.g., from gh-ost online schema migrations) | ||
| if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) { | ||
| Column incomingColumn = columnWithPosition.getAddColumn(); | ||
| columns.stream() | ||
| .filter(c -> c.getName().equals(incomingColumn.getName())) | ||
| .findFirst() | ||
| .ifPresent( | ||
| existingColumn -> { | ||
| if (!existingColumn | ||
| .getType() | ||
| .equals(incomingColumn.getType())) { | ||
| // No coercion is performed; the existing column | ||
| // definition is preserved as-is. | ||
| LOG.warn( | ||
| "Skipping duplicate column '{}' for table {} but types differ: " | ||
| + "existing={}, incoming={}", | ||
| incomingColumn.getName(), | ||
| event.tableId(), | ||
| existingColumn.getType(), | ||
| incomingColumn.getType()); | ||
| } | ||
| }); | ||
| continue; | ||
| } | ||
|
Comment on lines
+129
to
+158
|
||
| switch (columnWithPosition.getPosition()) { | ||
| case FIRST: | ||
| { | ||
|
|
@@ -165,10 +200,92 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema | |
| break; | ||
| } | ||
| } | ||
| existingColumnNames.add(columnWithPosition.getAddColumn().getName()); | ||
| } | ||
| return oldSchema.copy(columns); | ||
| } | ||
|
|
||
| /** | ||
| * Filters out redundant columns from an {@link AddColumnEvent} that already exist in the | ||
| * current schema, and deduplicates columns within the same event. Returns {@link | ||
| * Optional#empty()} if all columns are redundant. | ||
| * | ||
| * <p>This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events | ||
| * may be emitted for the same column. | ||
| * | ||
| * <p><b>Note:</b> Duplicate detection is based on column name only. If a duplicate | ||
| * AddColumnEvent arrives with a different type for an existing column name, a warning will be | ||
| * logged but the column will still be skipped. This is the expected behavior for online schema | ||
| * migration tools (gh-ost, pt-osc) where duplicate events are always exact copies. | ||
| */ | ||
| public static Optional<AddColumnEvent> filterRedundantAddColumns( | ||
| Schema currentSchema, AddColumnEvent event) { | ||
| Map<String, Column> existingColumns = new HashMap<>(); | ||
| for (Column col : currentSchema.getColumns()) { | ||
| existingColumns.put(col.getName(), col); | ||
| } | ||
| Set<String> seenColumns = new HashSet<>(existingColumns.keySet()); | ||
| List<AddColumnEvent.ColumnWithPosition> nonRedundant = new ArrayList<>(); | ||
| List<String> filteredColumnNames = new ArrayList<>(); | ||
| for (AddColumnEvent.ColumnWithPosition cwp : event.getAddedColumns()) { | ||
| String colName = cwp.getAddColumn().getName(); | ||
| if (seenColumns.add(colName)) { | ||
| nonRedundant.add(cwp); | ||
| } else { | ||
| filteredColumnNames.add(colName); | ||
| Column existingCol = existingColumns.get(colName); | ||
| if (existingCol != null | ||
| && !existingCol.getType().equals(cwp.getAddColumn().getType())) { | ||
| LOG.warn( | ||
| "Skipping duplicate column '{}' for table {} but types differ: " | ||
| + "existing={}, incoming={}", | ||
| colName, | ||
| event.tableId(), | ||
| existingCol.getType(), | ||
| cwp.getAddColumn().getType()); | ||
| } | ||
| } | ||
| } | ||
| if (nonRedundant.isEmpty()) { | ||
| return Optional.empty(); | ||
| } | ||
| if (!filteredColumnNames.isEmpty()) { | ||
| LOG.debug( | ||
| "Filtered redundant columns {} from AddColumnEvent for table {}", | ||
| filteredColumnNames, | ||
| event.tableId()); | ||
| } | ||
| if (nonRedundant.size() == event.getAddedColumns().size()) { | ||
| return Optional.of(event); | ||
| } | ||
| return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant)); | ||
| } | ||
|
|
||
| /** | ||
| * Filters duplicate {@link AddColumnEvent} columns that already exist in the given schema. For | ||
| * non-AddColumnEvent schema changes, the event is returned as-is. | ||
| * | ||
| * @param currentSchema the current schema to check against | ||
| * @param event the schema change event to filter | ||
| * @return the filtered event, or {@link Optional#empty()} if the event is fully redundant | ||
| */ | ||
| public static Optional<SchemaChangeEvent> filterDuplicateAddColumns( | ||
| Schema currentSchema, SchemaChangeEvent event) { | ||
| if (!(event instanceof AddColumnEvent)) { | ||
| return Optional.of(event); | ||
| } | ||
| Optional<AddColumnEvent> filtered = | ||
| filterRedundantAddColumns(currentSchema, (AddColumnEvent) event); | ||
| if (!filtered.isPresent()) { | ||
| LOG.debug( | ||
| "Skipping fully redundant AddColumnEvent for table {} " | ||
| + "- all columns already exist", | ||
| event.tableId()); | ||
| return Optional.empty(); | ||
| } | ||
| return Optional.of(filtered.get()); | ||
| } | ||
|
|
||
| private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) { | ||
| List<Column> columns = | ||
| oldSchema.getColumns().stream() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if existing column definition and incoming definition is incompatible? Will there be coercions or implicit casting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No coercions or implicit casting are performed. When a duplicate column is detected by name, the existing column definition is preserved as-is and the incoming duplicate is skipped. If the types differ, a WARN log is emitted: "Skipping duplicate column '{}' for table {} but types differ: existing={}, incoming={}". This is intentional for the gh-ost use case. Duplicate AddColumnEvents from online schema migration tools should have matching types. A type mismatch indicates a potential upstream inconsistency that should be investigated, but we don't want to crash the pipeline over it.