Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.util.Preconditions;

import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -52,6 +55,12 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {

private boolean isBinlogSplitAssigned;

/** Whether the reader has reported holding the complete binlog split (FLINK-39775). */
private boolean binlogSplitMetaAssembled;

/** Checkpoint at which the metadata release was scheduled; released once it completes. */
@Nullable private Long checkpointIdToReleaseMeta;

private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

public MySqlHybridSplitAssigner(
Expand Down Expand Up @@ -161,20 +170,52 @@ public void addSplits(Collection<MySqlSplit> splits) {
} else {
// we don't store the split, but will re-create binlog split later
isBinlogSplitAssigned = false;
// re-creating the binlog split: the reader must re-assemble and re-report
// before the snapshot metadata can be released again
binlogSplitMetaAssembled = false;
checkpointIdToReleaseMeta = null;
}
}
snapshotSplitAssigner.addSplits(snapshotSplits);
}

@Override
public PendingSplitsState snapshotState(long checkpointId) {
// Schedule releasing the snapshot metadata once the binlog split is assigned and the reader
// has assembled it; the release happens in notifyCheckpointComplete. Skipped when
// newly-added-table scan is on, since that flow may still need the metadata.
if (isBinlogSplitAssigned
&& binlogSplitMetaAssembled
&& checkpointIdToReleaseMeta == null
&& !snapshotSplitAssigner.isSnapshotMetaReleased()
&& !sourceConfig.isScanNewlyAddedTableEnabled()) {
checkpointIdToReleaseMeta = checkpointId;
}
return new HybridPendingSplitsState(
snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
// Release the snapshot metadata only after the checkpoint covering the binlog split
// assignment completes. Doing it here (not in snapshotState) keeps the assignment
// checkpoint-covered, so addSplitsBack can never return the split to an emptied assigner.
if (checkpointIdToReleaseMeta != null
&& checkpointId >= checkpointIdToReleaseMeta
&& !snapshotSplitAssigner.isSnapshotMetaReleased()) {
snapshotSplitAssigner.releaseSnapshotMetadata();
}
}

/** Marks that the reader holds the complete binlog split, arming the metadata release. */
public void onBinlogSplitMetaAssembled() {
this.binlogSplitMetaAssembled = true;
}

/** Returns whether the snapshot split metadata has been released. */
public boolean isSnapshotMetaReleased() {
return snapshotSplitAssigner.isSnapshotMetaReleased();
}

@Override
Expand Down Expand Up @@ -205,6 +246,10 @@ public void close() {
// --------------------------------------------------------------------------------------------

private MySqlBinlogSplit createBinlogSplit() {
Preconditions.checkState(
!snapshotSplitAssigner.isSnapshotMetaReleased(),
"Snapshot metadata was already released; the binlog split must not be re-created "
+ "after that (FLINK-39775).");
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {

@Nullable private Long checkpointIdToFinish;

/** Whether the snapshot split metadata has been released (FLINK-39775). */
private boolean snapshotMetaReleased;

public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
Expand Down Expand Up @@ -571,6 +574,29 @@ public Map<String, BinlogOffset> getSplitFinishedOffsets() {
return splitFinishedOffsets;
}

/**
* Releases the heavyweight snapshot split metadata (assigned splits, finished offsets, table
* schemas) after the binlog phase begins, so it is no longer held in the coordinator nor
* checkpointed (FLINK-39775). Keeps {@link #alreadyProcessedTables} and the assigner status so
* a restore does not re-discover tables. Only called once the binlog split is assigned and
* checkpoint-covered.
*/
public void releaseSnapshotMetadata() {
if (snapshotMetaReleased) {
return;
}
assignedSplits.clear();
splitFinishedOffsets.clear();
tableSchemas.clear();
snapshotMetaReleased = true;
LOG.info("Released snapshot split metadata after entering the binlog phase.");
}

/** Returns whether {@link #releaseSnapshotMetadata()} has already been performed. */
public boolean isSnapshotMetaReleased() {
return snapshotMetaReleased;
}

// -------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaAssembledEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
Expand Down Expand Up @@ -180,6 +181,13 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
"The enumerator receives notice from subtask {} for the binlog split assignment. ",
subtaskId);
binlogSplitTaskId = subtaskId;
} else if (sourceEvent instanceof BinlogSplitMetaAssembledEvent) {
LOG.info(
"The enumerator receives notice from subtask {} that the binlog split metadata has been fully assembled. ",
subtaskId);
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
((MySqlHybridSplitAssigner) splitAssigner).onBinlogSplitMetaAssembled();
}
}
}

Expand All @@ -191,6 +199,14 @@ public PendingSplitsState snapshotState(long checkpointId) {
@Override
public void notifyCheckpointComplete(long checkpointId) {
splitAssigner.notifyCheckpointComplete(checkpointId);
// Once the assigner has released the snapshot metadata, drop the enumerator's cached binlog
// split meta groups too: it is a second copy of the same finished-split infos
// (FLINK-39775).
if (binlogSplitMeta != null
&& splitAssigner instanceof MySqlHybridSplitAssigner
&& ((MySqlHybridSplitAssigner) splitAssigner).isSnapshotMetaReleased()) {
binlogSplitMeta = null;
}
// binlog split may be available after checkpoint complete
assignSplits();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.source.events;

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;

/**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator}
* once it holds a complete {@link MySqlBinlogSplit}, signalling that all finished snapshot split
* metadata has been assembled by the reader. This lets the enumerator release the snapshot split
* metadata it retains (FLINK-39775). Idempotent: re-sent whenever a complete binlog split
* (re-)enters reading.
*/
public class BinlogSplitMetaAssembledEvent implements SourceEvent {

private static final long serialVersionUID = 1L;

public BinlogSplitMetaAssembledEvent() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaAssembledEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
Expand Down Expand Up @@ -285,6 +286,10 @@ private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlo
discoverTableSchemasForBinlogSplit(
binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit);
// binlog split is complete: tell the enumerator so it can release the snapshot
// metadata. Also fires on restore of a complete split, which is fine
// (FLINK-39775).
context.sendSourceEventToCoordinator(new BinlogSplitMetaAssembledEvent());
}
LOG.info(
"Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
Expand Down
Loading