[FLINK-39775][mysql] Release snapshot split metadata after entering the stream phase#4418
Open
spoorthibasu wants to merge 1 commit into
Open
Conversation
022e1bd to
2d0d625
Compare
…he stream phase After the snapshot phase finishes and the binlog split is assigned, the source coordinator kept retaining the full snapshot split metadata (assigned splits, finished offsets, table schemas) indefinitely. snapshotState() rebuilt the complete SnapshotPendingSplitsState on every checkpoint and notifyCheckpointComplete() never dropped it, and the enumerator held a second copy in its binlog split meta cache. With many finished splits (for example ~300K for a 2.5B-row table at the default chunk size) this kept JobManager memory high and drove up direct memory during checkpoint serialization, eventually OOM-killing the container. The reader now tells the coordinator (BinlogSplitMetaAssembledEvent) once it holds the complete binlog split. The coordinator schedules the release in snapshotState() and performs it in notifyCheckpointComplete() after the covering checkpoint completes, so the binlog split assignment is always checkpoint-covered and can never be added back into an emptied assigner. Release is gated off when scan.newly-added-table is enabled, and the state/serializer format is unchanged (a released state is just empty maps).
2d0d625 to
a7df816
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TL;DR: Fix JobManager OOM with a large number of snapshot splits by releasing the snapshot split metadata after the source enters the binlog phase, once the binlog split is assigned and covered by a completed checkpoint.
Root Cause
After the snapshot phase finishes and the binlog split is assigned, the source coordinator keeps the full snapshot split metadata indefinitely.
MySqlSnapshotSplitAssignerholdsassignedSplits,splitFinishedOffsets, andtableSchemasin memory.snapshotState()rebuilds the completeSnapshotPendingSplitsStateon every checkpoint,MySqlHybridSplitAssignerwraps it, andnotifyCheckpointComplete()never drops it.MySqlSourceEnumeratorkeeps a second copy of the same finished-split metadata in itsbinlogSplitMetacache.With many finished splits (for example ~300K for a 2.5B-row table at the default chunk size of 8096), every post-snapshot checkpoint re-serializes the entire snapshot state. This keeps JobManager heap high, and because the large coordinator state is copied during checkpoint serialization and transfer, it also drives up direct/off-heap memory and can OOM-kill the container even when the heap itself does not exceed its limit.
Fix
Release the snapshot split metadata once it is no longer needed for correctness: after the binlog split has been created, its finished-split metadata fully transferred to and assembled by the reader, and that is covered by a completed checkpoint.
Reader-side (completion signal)
MySqlSourceReadersends a newBinlogSplitMetaAssembledEventto the coordinator.Coordinator-side (gated release)
MySqlHybridSplitAssignerrecords that the reader has assembled the metadata.snapshotState()schedules the release at the current checkpoint id; the actual clearing happens innotifyCheckpointComplete()once that checkpoint completes.releaseSnapshotMetadata()clearsassignedSplits,splitFinishedOffsets, andtableSchemas.alreadyProcessedTables, the assigner status, and the chunk splitter state are kept so a restore does not re-discover tables.MySqlSourceEnumeratorclears its cachedbinlogSplitMetaat the same point.scan.newly-added-table.enabledis set, since that flow may need the metadata again to extend the binlog split.Why the release is safe
notifyCheckpointComplete(), never insnapshotState(). Flink only re-delivers (addSplitsBack) split assignments that are not covered by the last completed checkpoint. Because the binlog split was assigned before the release-scheduling checkpoint, that assignment is checkpoint-covered by the time release runs, so the binlog split can never be added back into an emptied assigner.createBinlogSplit()also asserts the metadata has not been released, turning any future regression into a fast failure instead of silent data loss.isBinlogSplitAssignedis true, andalreadyProcessedTablesis non-empty, so the assigner does not re-discover tables and does not recreate the binlog split. The reader restores its own complete binlog split, requests no metadata, and re-sends the assembled event, which makes the release idempotent.SnapshotPendingSplitsStatewith empty maps, so it serializes under the existing version and stays compatible on restore.Tests Added
MySqlSnapshotMetadataReleaseTest (new)
Release after the assembled event plus a completed checkpoint; no release without the event; no release before the checkpoint completes; no release when newly-added-table is enabled; add-back resets the schedule and the binlog split is recreated with full metadata.
PendingSplitsStateSerializerTest.testSerializeAndDeserializeReleasedHybridState
Round-trips the released ("light") hybrid state, confirming empty maps and
isBinlogSplitAssignedsurvive serialization under the current version.MySqlSourceReaderTest.testReaderSendsBinlogSplitMetaAssembledEventForCompleteSplit
The reader emits the assembled event when a complete binlog split enters reading.
MySqlSourceITCase
TaskManager and JobManager failover in the binlog phase, plus reads across multiple tables at higher parallelism, all pass with no data loss.
Notes