Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
Expand Down Expand Up @@ -134,7 +135,9 @@ public void submitSplit(MySqlSplit mySqlSplit) {
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit,
createEventFilter());
createEventFilterWithTimestamp(
currentBinlogSplit.getStartingOffset().getTimestampSec(),
currentBinlogSplit.getStartingOffset().getOffsetKind()));

executorService.submit(
() -> {
Expand Down Expand Up @@ -391,6 +394,36 @@ private Predicate<Event> createEventFilter() {
return event -> true;
}

private Predicate<Event> createEventFilterWithTimestamp(
long startTimestampSec, BinlogOffsetKind offsetKind) {
StartupOptions startupOptions = statefulTaskContext.getSourceConfig().getStartupOptions();
// If startup mode is not TIMESTAMP, no filtering needed
if (!startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
return event -> true;
}
if (startupOptions.binlogOffset == null) {
throw new NullPointerException(
"The startup option was set to TIMESTAMP "
+ "but unable to find starting binlog offset. Please check if the timestamp is specified in "
+ "configuration. ");
}
if (offsetKind == BinlogOffsetKind.TIMESTAMP) {
return createEventFilter();
}
// If the offset kind is SPECIFIC, that means the job is restored from a savepoint which has
// a specific offset.
LOG.info(
"Creating an event filter that drops row mutation events occurring before the destination"
+ " timestamp in seconds {}",
startTimestampSec);
return event -> {
if (!EventType.isRowMutation(getEventType(event))) {
return true;
}
return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
};
}

public void stopBinlogReadTask() {
currentTaskRunning = false;
// Terminate the while loop in MySqlStreamingChangeEventSource's execute method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,9 +1137,9 @@ void testRestoreFromCheckpointWithTimestampStartingOffset() throws Exception {
header.setTimestamp(1L);
Event event = new Event(header, new WriteRowsEventData());

// Check if the filter works
// Check if the filter works, should be true
Predicate<Event> eventFilter = binlogReader.getBinlogSplitReadTask().getEventFilter();
assertThat(eventFilter.test(event)).isFalse();
assertThat(eventFilter.test(event)).isTrue();
}

@Test
Expand Down