Skip to content

[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission#4087

Open
morozov wants to merge 6 commits intoapache:masterfrom
morozov:FLINK-38218-fix-binlog-split-construction
Open

[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission#4087
morozov wants to merge 6 commits intoapache:masterfrom
morozov:FLINK-38218-fix-binlog-split-construction

Conversation

@morozov
Copy link
Contributor

@morozov morozov commented Aug 8, 2025

The root cause is that the binlog split metadata transfer protocol relies on the order of finished snapshot split infos to be stable and corresponding to the order of split assignment (the infos of newly added/snapshotted tables are appended to the end of the list). However, when MySqlSnapshotSplitAssigner is restored from state, assignedSplits are reordered, which breaks this assumption.

Change summary

  1. Require assigned snapshot splits to be ordered. This isn't strictly necessary to fix the bug but follows directly from the JavaDoc I added to MySqlSnapshotSplitAssigner#assignedSplits. If the order is important, the type should guarantee that it's preserved. Note the changes in the deserialization code. Not using an ordered map there while the order is important may cause other hard to diagnose issues.
  2. Rely on stable order of assigned splits. Instead of identifying duplicate received split infos by split ID, ignore the first N elements that we know we already have.
  3. Eliminate code duplication in MySqlBinlogSplit constructors. There are currently two constructors where one doesn't call the other. The subsequent commit adds a check that needs to be enforced regardless of which of the constructors was used, so I'm combining them.
  4. Enforce no duplicate finished snapshot split infos in MySqlBinlogSplit. By design, a binlog split cannot contain duplicate finished snapshot split infos. If it does, it indicates the fact that it was constructed incorrectly. If it happens, it's a bug, and we want to fail as early as possible.

@morozov morozov marked this pull request as ready for review August 8, 2025 21:39
@morozov
Copy link
Contributor Author

morozov commented Aug 8, 2025

I'm not sure how to test this. The issue is reproducible if a source is restarted mid-snapshot of a newly added table and requires consuming the changes in the new table from the binlog. Could maintainers recommend an existing test on top of which I could build this?

@morozov morozov force-pushed the FLINK-38218-fix-binlog-split-construction branch 2 times, most recently from 1069e6e to 9f16356 Compare August 13, 2025 23:33
@leonardBang leonardBang self-requested a review August 27, 2025 03:21
@lvyanquan lvyanquan self-assigned this Sep 12, 2025
@morozov morozov closed this Sep 26, 2025
@leonardBang leonardBang reopened this Nov 3, 2025
@morozov
Copy link
Contributor Author

morozov commented Dec 10, 2025

@leonardBang could you restart the tests? The logs are no longer available.

@leonardBang
Copy link
Contributor

Hey, @morozov Looks like Azure cannot re-trigger expired CI, could you rebase your PR to latest master branch to trigger new CI ?

@morozov morozov force-pushed the FLINK-38218-fix-binlog-split-construction branch from 9f16356 to ee9d6e1 Compare December 10, 2025 06:20
@morozov
Copy link
Contributor Author

morozov commented Dec 10, 2025

@leonardBang, after the rebase, the build is green.

@morozov
Copy link
Contributor Author

morozov commented Jan 16, 2026

@leonardBang, @lvyanquan do you still want to get this issue fixed? I closed the PR due to the lack of interest but you reopened it.

@lvyanquan
Copy link
Contributor

Hi @morozov, apologies for the delayed response. As this change impacts a critical code path, we are proceeding with utmost caution. I will review and verify both the issue and the proposed fix this week.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a bug in MySQL CDC where binlog split metadata transmission could fail due to inconsistent ordering of finished snapshot split information. The root cause was that assignedSplits were being reordered during restoration from state, breaking the protocol's assumption that split infos maintain their assignment order.

Changes:

  • Enforces stable ordering of assigned snapshot splits using LinkedHashMap throughout the codebase
  • Replaces split ID-based duplicate detection with simpler position-based logic in metadata transmission
  • Adds validation to detect and fail fast on duplicate split IDs in binlog splits

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.

Show a summary per file
File Description
MySqlSnapshotSplitAssigner.java Removes sorting of assignedSplits and adds documentation explaining order requirements; changes assignedSplits type to LinkedHashMap
SnapshotPendingSplitsState.java Changes assignedSplits field type to LinkedHashMap to preserve insertion order
PendingSplitsStateSerializer.java Updates deserialization to use LinkedHashMap instead of HashMap for assignedSplits
MySqlSourceReader.java Replaces split ID-based duplicate filtering with position-based element skipping using modulo arithmetic
MySqlBinlogSplit.java Adds duplicate split ID validation in constructor and consolidates two constructors to eliminate code duplication
MySqlHybridSplitAssigner.java Removes sorting when creating binlog splits to preserve natural order from assignedSplits
MySqlSnapshotSplitAssignerTest.java Adds parameterized test to verify finished split infos maintain assignment order across checkpoints; updates to use LinkedHashMap
MySqlBinlogSplitTest.java Adds test to verify duplicate split IDs are rejected with IllegalArgumentException
MySqlHybridSplitAssignerTest.java Updates test setup to use LinkedHashMap for assignedSplits
PendingSplitsStateSerializerTest.java Updates test setup to use LinkedHashMap for assignedSplits

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
Copy link
Contributor

@lvyanquan lvyanquan Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the shards saved by the Enumerator are split3, split2, split1, they will be sorted in the original way as split1, split2, split3.
If the Source Reader of a job has already obtained split3 and split2 according to the original sorting method, and the job has been upgraded in version, requesting the third shard according to the latest implementation method will request split3 again. Will there be compatibility issues with this?

Copy link
Contributor Author

@morozov morozov Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the shards saved by the Enumerator are split3, split2, split1, they will be sorted in the original way as split1, split2, split3.
If the Source Reader of a job has already obtained split1 and split2 [...]

Is this possible?

  1. Finished snapshot split infos are built in the order of split assignment:
    for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
    BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
    finishedSnapshotSplitInfos.add(
  2. They are sent to the source reader in the same order:
    binlogSplitMeta =
    Lists.partition(
    finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());

This is true for both the pre-upgrade and post-upgrade versions.

So if before restoring the job from a checkpoint, the enumerator has the splits/infos in the order of split3, split2, split1, the source reader will have a prefix of this list (e.g. split3, split2) but not split1 and split2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was an error in the previous description. It was due to the inconsistency in the order before and after, and we only have a fixed sequence before. Will this result in duplicate requests for splits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requesting the third shard according to the latest implementation method will request split3 again. Will there be compatibility issues with this?

It will not request split3 again. The reader will know that there are 3 split infos in total and it has 2, so it needs to request the remaining one.

Let's assume that chunk-meta.group.size = 2.

  1. The source reader state is [split3, split2] (receivedMetaNum = 2)
  2. The next meta group ID is 1 (1 / 1 = 1):
    public static int getNextMetaGroupId(int receivedMetaNum, int metaGroupSize) {
    Preconditions.checkState(metaGroupSize > 0);
    return receivedMetaNum / metaGroupSize;
    }
  3. The splits on the enumerator are partitioned by 2:
    binlogSplitMeta =
    Lists.partition(
    finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
    [
      0: [split3, split2]
      1: [split1]
    ]
  4. The source reader requests group 1, the enumerator returns [split1]
  5. The expected number of already retrieved elements in the last group on the reader is 0 (2 % 2 = 0):
    int expectedNumberOfAlreadyRetrievedElements =
    binlogSplit.getFinishedSnapshotSplitInfos().size()
    % sourceConfig.getSplitMetaGroupSize();
  6. The source reader discards 0 first elements from the response, so [split1] remains [split1].
  7. The source reader appends [split1] to [split3, split2], so the resulting source reader state becomes [split3, split2, split1] – correct.

Let's assume that chunk-meta.group.size = 3.

  1. The source reader state is [split3, split2] (receivedMetaNum = 2)
  2. The next meta group ID is 0 (2 / 3 = 0)
  3. The splits on the enumerator are partitioned by 3:
    [
      0: [split3, split2, split1]
    ]
  4. The source reader requests group 0, the enumerator returns [split3, split2, split1]
  5. The expected number of already retrieved elements in the last group on the reader is 2 (2 % 3 = 2):
  6. The source reader discards 2 first elements from the response, so [split3, split2, split1] becomes [split1].
  7. The source reader appends [split1] to [split3, split2], so the resulting source reader state becomes [split3, split2, split1] – correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @morozov for the detailed explanation. However, I believe there's still a compatibility issue that wasn't fully addressed. Let me clarify my concern with a concrete job upgrade scenario.

The Scenario

Consider an existing running job that is being upgraded from the old version to the new version:

Before Upgrade (Old Version)

  1. Splits are assigned in order: split3 → split2 → split1

  2. Enumerator restores from checkpoint and sorts by split ID:

    // Old code sorts on restore
    .sorted(Entry.comparingByKey())

    Result: assignedSplits = [split1, split2, split3]

  3. Reader has already received partial splits based on the sorted order, e.g., [split1, split2]

  4. finishedSnapshotSplitInfos on Enumerator side is built from sorted order: [split1, split2, split3]

  5. If chunk-meta.group.size = 2, the partitions are:

    [
      0: [split1, split2]
      1: [split3]
    ]
    
  6. Reader has received group 0, next request will be for group 1 (split3)

After Upgrade (New Version)

  1. Job restarts from checkpoint and upgrades to new version

  2. Enumerator restores WITHOUT sorting:

    // New code does NOT sort on restore
    this.assignedSplits = assignedSplits;

    Result: assignedSplits = [split3, split2, split1] (preserves original insertion order)

  3. finishedSnapshotSplitInfos is now built from unsorted order: [split3, split2, split1]

  4. The partitions are now:

    [
      0: [split3, split2]
      1: [split1]
    ]
    
  5. Reader still has state [split1, split2] (unchanged from checkpoint)

  6. Reader requests group 1 (because receivedMetaNum = 2, and 2 / 2 = 1)

  7. Enumerator returns [split1] instead of the expected split3

  8. Reader calculates expectedNumberOfAlreadyRetrievedElements = 2 % 2 = 0

  9. Reader discards 0 elements and appends [split1] to [split1, split2]

  10. Result: Reader state becomes [split1, split2, split1]contains duplicate split1 and missing split3

The Root Cause

The issue is that the order semantics changed between versions, but the state serializer version was NOT incremented:

private static final int VERSION = 5;  // Still 5, not incremented

The old version and new version interpret the same serialized state differently:

  • Old version: sorts on restore → order is by split ID
  • New version: does not sort on restore → order is by insertion time

This breaks the invariant that both Enumerator and Reader must use the same order.

Suggested Fix

  1. Increment the state serializer version:

    private static final int VERSION = 6;
  2. Add migration logic for version 5 and below:

    private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
            int version, int splitVersion, DataInputDeserializer in) throws IOException {
        // ... existing deserialization logic ...
        
        // For old versions, sort the assigned splits to maintain compatibility
        if (version < 6) {
            LinkedHashMap<String, MySqlSchemalessSnapshotSplit> sortedSplits =
                assignedSchemalessSnapshotSplits.entrySet().stream()
                    .sorted(Map.Entry.comparingByKey())
                    .collect(Collectors.toMap(
                        Map.Entry::getKey,
                        Map.Entry::getValue,
                        (o1, o2) -> o1,
                        LinkedHashMap::new));
            assignedSchemalessSnapshotSplits = sortedSplits;
        }
        // ...
    }

This ensures backward compatibility for existing jobs upgrading to the new version.

Copy link
Contributor Author

@morozov morozov Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before Upgrade (Old Version)

  1. Reader has already received partial splits based on the sorted order, e.g., [split1, split2]

This doesn't look right. The reader has received the splits in the same order as they were on the enumerator before the checkpoint, unsorted, so it will be [split3, split2].

After Upgrade (New Version)

  1. Reader still has state [split1, split2] (unchanged from checkpoint)

The same. How is this possible? The reader doesn't sort the elements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants