Skip to content

[FLINK-39775][mysql] Release snapshot split metadata after entering the stream phase#4418

Open
spoorthibasu wants to merge 1 commit into
apache:masterfrom
spoorthibasu:FLINK-39775-release-snapshot-split-metadata
Open

[FLINK-39775][mysql] Release snapshot split metadata after entering the stream phase#4418
spoorthibasu wants to merge 1 commit into
apache:masterfrom
spoorthibasu:FLINK-39775-release-snapshot-split-metadata

Conversation

@spoorthibasu
Copy link
Copy Markdown

TL;DR: Fix JobManager OOM with a large number of snapshot splits by releasing the snapshot split metadata after the source enters the binlog phase, once the binlog split is assigned and covered by a completed checkpoint.

Root Cause

After the snapshot phase finishes and the binlog split is assigned, the source coordinator keeps the full snapshot split metadata indefinitely.

  • MySqlSnapshotSplitAssigner holds assignedSplits, splitFinishedOffsets, and tableSchemas in memory.
  • snapshotState() rebuilds the complete SnapshotPendingSplitsState on every checkpoint, MySqlHybridSplitAssigner wraps it, and notifyCheckpointComplete() never drops it.
  • MySqlSourceEnumerator keeps a second copy of the same finished-split metadata in its binlogSplitMeta cache.

With many finished splits (for example ~300K for a 2.5B-row table at the default chunk size of 8096), every post-snapshot checkpoint re-serializes the entire snapshot state. This keeps JobManager heap high, and because the large coordinator state is copied during checkpoint serialization and transfer, it also drives up direct/off-heap memory and can OOM-kill the container even when the heap itself does not exceed its limit.

Fix

Release the snapshot split metadata once it is no longer needed for correctness: after the binlog split has been created, its finished-split metadata fully transferred to and assembled by the reader, and that is covered by a completed checkpoint.

Reader-side (completion signal)

  • When a complete binlog split enters reading (whether its metadata arrived inline or was assembled over the divided meta-group requests), MySqlSourceReader sends a new BinlogSplitMetaAssembledEvent to the coordinator.
  • The event is idempotent: it is re-sent whenever a complete binlog split (re-)enters reading, including after a restore.

Coordinator-side (gated release)

  • On receiving the event, MySqlHybridSplitAssigner records that the reader has assembled the metadata.
  • snapshotState() schedules the release at the current checkpoint id; the actual clearing happens in notifyCheckpointComplete() once that checkpoint completes.
  • releaseSnapshotMetadata() clears assignedSplits, splitFinishedOffsets, and tableSchemas. alreadyProcessedTables, the assigner status, and the chunk splitter state are kept so a restore does not re-discover tables.
  • MySqlSourceEnumerator clears its cached binlogSplitMeta at the same point.
  • Release is skipped entirely when scan.newly-added-table.enabled is set, since that flow may need the metadata again to extend the binlog split.

Why the release is safe

  • The clearing happens only in notifyCheckpointComplete(), never in snapshotState(). Flink only re-delivers (addSplitsBack) split assignments that are not covered by the last completed checkpoint. Because the binlog split was assigned before the release-scheduling checkpoint, that assignment is checkpoint-covered by the time release runs, so the binlog split can never be added back into an emptied assigner. createBinlogSplit() also asserts the metadata has not been released, turning any future regression into a fast failure instead of silent data loss.
  • If the binlog split is added back before release (reader failover between scheduling and the checkpoint completing), the scheduled release is reset and the metadata is still intact, so the split is recreated normally.
  • On restore from a released ("light") state, the maps are empty, isBinlogSplitAssigned is true, and alreadyProcessedTables is non-empty, so the assigner does not re-discover tables and does not recreate the binlog split. The reader restores its own complete binlog split, requests no metadata, and re-sends the assembled event, which makes the release idempotent.
  • The state and serializer format are unchanged. A released state is a normal SnapshotPendingSplitsState with empty maps, so it serializes under the existing version and stays compatible on restore.

Tests Added

  • MySqlSnapshotMetadataReleaseTest (new)
    Release after the assembled event plus a completed checkpoint; no release without the event; no release before the checkpoint completes; no release when newly-added-table is enabled; add-back resets the schedule and the binlog split is recreated with full metadata.

  • PendingSplitsStateSerializerTest.testSerializeAndDeserializeReleasedHybridState
    Round-trips the released ("light") hybrid state, confirming empty maps and isBinlogSplitAssigned survive serialization under the current version.

  • MySqlSourceReaderTest.testReaderSendsBinlogSplitMetaAssembledEventForCompleteSplit
    The reader emits the assembled event when a complete binlog split enters reading.

  • MySqlSourceITCase
    TaskManager and JobManager failover in the binlog phase, plus reads across multiple tables at higher parallelism, all pass with no data loss.

Notes

  • The common path is unchanged until the source is well into the binlog phase; release only happens after the binlog split is assigned and one covering checkpoint completes.
  • No new configuration is introduced, and no state or serializer version bump is required.

@spoorthibasu spoorthibasu force-pushed the FLINK-39775-release-snapshot-split-metadata branch from 022e1bd to 2d0d625 Compare May 29, 2026 05:07
…he stream phase

After the snapshot phase finishes and the binlog split is assigned, the source coordinator kept retaining the full snapshot split metadata (assigned splits, finished offsets, table schemas) indefinitely. snapshotState() rebuilt the complete SnapshotPendingSplitsState on every checkpoint and notifyCheckpointComplete() never dropped it, and the enumerator held a second copy in its binlog split meta cache. With many finished splits (for example ~300K for a 2.5B-row table at the default chunk size) this kept JobManager memory high and drove up direct memory during checkpoint serialization, eventually OOM-killing the container.

The reader now tells the coordinator (BinlogSplitMetaAssembledEvent) once it holds the complete binlog split. The coordinator schedules the release in snapshotState() and performs it in notifyCheckpointComplete() after the covering checkpoint completes, so the binlog split assignment is always checkpoint-covered and can never be added back into an emptied assigner. Release is gated off when scan.newly-added-table is enabled, and the state/serializer format is unchanged (a released state is just empty maps).
@spoorthibasu spoorthibasu force-pushed the FLINK-39775-release-snapshot-split-metadata branch from 2d0d625 to a7df816 Compare May 29, 2026 05:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant