[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission#4087
[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission#4087morozov wants to merge 6 commits intoapache:masterfrom
Conversation
|
I'm not sure how to test this. The issue is reproducible if a source is restarted mid-snapshot of a newly added table and requires consuming the changes in the new table from the binlog. Could maintainers recommend an existing test on top of which I could build this? |
1069e6e to
9f16356
Compare
|
@leonardBang could you restart the tests? The logs are no longer available. |
|
Hey, @morozov Looks like Azure cannot re-trigger expired CI, could you rebase your PR to latest master branch to trigger new CI ? |
9f16356 to
ee9d6e1
Compare
|
@leonardBang, after the rebase, the build is green. |
|
@leonardBang, @lvyanquan do you still want to get this issue fixed? I closed the PR due to the lack of interest but you reopened it. |
|
Hi @morozov, apologies for the delayed response. As this change impacts a critical code path, we are proceeding with utmost caution. I will review and verify both the issue and the proposed fix this week. |
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug in MySQL CDC where binlog split metadata transmission could fail due to inconsistent ordering of finished snapshot split information. The root cause was that assignedSplits were being reordered during restoration from state, breaking the protocol's assumption that split infos maintain their assignment order.
Changes:
- Enforces stable ordering of assigned snapshot splits using LinkedHashMap throughout the codebase
- Replaces split ID-based duplicate detection with simpler position-based logic in metadata transmission
- Adds validation to detect and fail fast on duplicate split IDs in binlog splits
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| MySqlSnapshotSplitAssigner.java | Removes sorting of assignedSplits and adds documentation explaining order requirements; changes assignedSplits type to LinkedHashMap |
| SnapshotPendingSplitsState.java | Changes assignedSplits field type to LinkedHashMap to preserve insertion order |
| PendingSplitsStateSerializer.java | Updates deserialization to use LinkedHashMap instead of HashMap for assignedSplits |
| MySqlSourceReader.java | Replaces split ID-based duplicate filtering with position-based element skipping using modulo arithmetic |
| MySqlBinlogSplit.java | Adds duplicate split ID validation in constructor and consolidates two constructors to eliminate code duplication |
| MySqlHybridSplitAssigner.java | Removes sorting when creating binlog splits to preserve natural order from assignedSplits |
| MySqlSnapshotSplitAssignerTest.java | Adds parameterized test to verify finished split infos maintain assignment order across checkpoints; updates to use LinkedHashMap |
| MySqlBinlogSplitTest.java | Adds test to verify duplicate split IDs are rejected with IllegalArgumentException |
| MySqlHybridSplitAssignerTest.java | Updates test setup to use LinkedHashMap for assignedSplits |
| PendingSplitsStateSerializerTest.java | Updates test setup to use LinkedHashMap for assignedSplits |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...l-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
Outdated
Show resolved
Hide resolved
| snapshotSplitAssigner.getAssignedSplits().values().stream() | ||
| .sorted(Comparator.comparing(MySqlSplit::splitId)) | ||
| .collect(Collectors.toList()); | ||
| new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values()); |
There was a problem hiding this comment.
Assuming the shards saved by the Enumerator are split3, split2, split1, they will be sorted in the original way as split1, split2, split3.
If the Source Reader of a job has already obtained split3 and split2 according to the original sorting method, and the job has been upgraded in version, requesting the third shard according to the latest implementation method will request split3 again. Will there be compatibility issues with this?
There was a problem hiding this comment.
Assuming the shards saved by the Enumerator are
split3,split2,split1, they will be sorted in the original way assplit1,split2,split3.
If the Source Reader of a job has already obtainedsplit1andsplit2[...]
Is this possible?
- Finished snapshot split infos are built in the order of split assignment:
- They are sent to the source reader in the same order:
This is true for both the pre-upgrade and post-upgrade versions.
So if before restoring the job from a checkpoint, the enumerator has the splits/infos in the order of split3, split2, split1, the source reader will have a prefix of this list (e.g. split3, split2) but not split1 and split2.
There was a problem hiding this comment.
Sorry, there was an error in the previous description. It was due to the inconsistency in the order before and after, and we only have a fixed sequence before. Will this result in duplicate requests for splits?
There was a problem hiding this comment.
requesting the third shard according to the latest implementation method will request
split3again. Will there be compatibility issues with this?
It will not request split3 again. The reader will know that there are 3 split infos in total and it has 2, so it needs to request the remaining one.
Let's assume that chunk-meta.group.size = 2.
- The source reader state is
[split3, split2](receivedMetaNum= 2) - The next meta group ID is 1 (
1 / 1 = 1): - The splits on the enumerator are partitioned by 2:
[ 0: [split3, split2] 1: [split1] ]
- The source reader requests group 1, the enumerator returns
[split1] - The expected number of already retrieved elements in the last group on the reader is 0 (
2 % 2 = 0): - The source reader discards 0 first elements from the response, so
[split1]remains[split1]. - The source reader appends
[split1]to[split3, split2], so the resulting source reader state becomes[split3, split2, split1]– correct.
Let's assume that chunk-meta.group.size = 3.
- The source reader state is
[split3, split2](receivedMetaNum= 2) - The next meta group ID is 0 (
2 / 3 = 0) - The splits on the enumerator are partitioned by 3:
[ 0: [split3, split2, split1] ]
- The source reader requests group 0, the enumerator returns
[split3, split2, split1] - The expected number of already retrieved elements in the last group on the reader is 2 (
2 % 3 = 2): - The source reader discards 2 first elements from the response, so
[split3, split2, split1]becomes[split1]. - The source reader appends
[split1]to[split3, split2], so the resulting source reader state becomes[split3, split2, split1]– correct.
There was a problem hiding this comment.
Thank you @morozov for the detailed explanation. However, I believe there's still a compatibility issue that wasn't fully addressed. Let me clarify my concern with a concrete job upgrade scenario.
The Scenario
Consider an existing running job that is being upgraded from the old version to the new version:
Before Upgrade (Old Version)
-
Splits are assigned in order:
split3 → split2 → split1 -
Enumerator restores from checkpoint and sorts by split ID:
// Old code sorts on restore .sorted(Entry.comparingByKey())
Result:
assignedSplits = [split1, split2, split3] -
Reader has already received partial splits based on the sorted order, e.g.,
[split1, split2] -
finishedSnapshotSplitInfoson Enumerator side is built from sorted order:[split1, split2, split3] -
If
chunk-meta.group.size = 2, the partitions are:[ 0: [split1, split2] 1: [split3] ] -
Reader has received group 0, next request will be for group 1 (
split3)
After Upgrade (New Version)
-
Job restarts from checkpoint and upgrades to new version
-
Enumerator restores WITHOUT sorting:
// New code does NOT sort on restore this.assignedSplits = assignedSplits;
Result:
assignedSplits = [split3, split2, split1](preserves original insertion order) -
finishedSnapshotSplitInfosis now built from unsorted order:[split3, split2, split1] -
The partitions are now:
[ 0: [split3, split2] 1: [split1] ] -
Reader still has state
[split1, split2](unchanged from checkpoint) -
Reader requests group 1 (because
receivedMetaNum = 2, and2 / 2 = 1) -
Enumerator returns
[split1]instead of the expectedsplit3 -
Reader calculates
expectedNumberOfAlreadyRetrievedElements = 2 % 2 = 0 -
Reader discards 0 elements and appends
[split1]to[split1, split2] -
Result: Reader state becomes
[split1, split2, split1]— contains duplicatesplit1and missingsplit3
The Root Cause
The issue is that the order semantics changed between versions, but the state serializer version was NOT incremented:
private static final int VERSION = 5; // Still 5, not incrementedThe old version and new version interpret the same serialized state differently:
- Old version: sorts on restore → order is by split ID
- New version: does not sort on restore → order is by insertion time
This breaks the invariant that both Enumerator and Reader must use the same order.
Suggested Fix
-
Increment the state serializer version:
private static final int VERSION = 6;
-
Add migration logic for version 5 and below:
private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( int version, int splitVersion, DataInputDeserializer in) throws IOException { // ... existing deserialization logic ... // For old versions, sort the assigned splits to maintain compatibility if (version < 6) { LinkedHashMap<String, MySqlSchemalessSnapshotSplit> sortedSplits = assignedSchemalessSnapshotSplits.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (o1, o2) -> o1, LinkedHashMap::new)); assignedSchemalessSnapshotSplits = sortedSplits; } // ... }
This ensures backward compatibility for existing jobs upgrading to the new version.
There was a problem hiding this comment.
Before Upgrade (Old Version)
- Reader has already received partial splits based on the sorted order, e.g.,
[split1, split2]
This doesn't look right. The reader has received the splits in the same order as they were on the enumerator before the checkpoint, unsorted, so it will be [split3, split2].
After Upgrade (New Version)
- Reader still has state
[split1, split2](unchanged from checkpoint)
The same. How is this possible? The reader doesn't sort the elements.
The root cause is that the binlog split metadata transfer protocol relies on the order of finished snapshot split infos to be stable and corresponding to the order of split assignment (the infos of newly added/snapshotted tables are appended to the end of the list). However, when
MySqlSnapshotSplitAssigneris restored from state,assignedSplitsare reordered, which breaks this assumption.Change summary
MySqlSnapshotSplitAssigner#assignedSplits. If the order is important, the type should guarantee that it's preserved. Note the changes in the deserialization code. Not using an ordered map there while the order is important may cause other hard to diagnose issues.MySqlBinlogSplit. By design, a binlog split cannot contain duplicate finished snapshot split infos. If it does, it indicates the fact that it was constructed incorrectly. If it happens, it's a bug, and we want to fail as early as possible.