Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
*
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*
* <p>Line 106-109, 631, 641-646, 1578-1583 : Add REPLACE INTO filtering in handleQueryEvent to
* prevent pt-table-checksum STATEMENT-format DML from being parsed as DDL. Sync from Debezium PR
* #7004 (DBZ-9428).
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
Expand All @@ -103,6 +107,11 @@ public class MySqlStreamingChangeEventSource

private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";

private static final String DML_INSERT_PREFIX = "INSERT ";
private static final String DML_UPDATE_PREFIX = "UPDATE ";
private static final String DML_DELETE_PREFIX = "DELETE ";
private static final String DML_REPLACE_PREFIX = "REPLACE ";

private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers =
new EnumMap<>(EventType.class);
private final BinaryLogClient client;
Expand Down Expand Up @@ -623,7 +632,7 @@ protected void handleQueryEvent(
return;
}

String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
String upperCasedStatementBegin = Strings.getBegin(sql, 8).toUpperCase();

if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
Expand All @@ -633,13 +642,10 @@ protected void handleQueryEvent(
LOGGER.debug("DDL '{}' was filtered out of processing", sql);
return;
}
if (upperCasedStatementBegin.equals("INSERT ")
|| upperCasedStatementBegin.equals("UPDATE ")
|| upperCasedStatementBegin.equals("DELETE ")) {
if (isDmlStatement(upperCasedStatementBegin)) {
LOGGER.warn(
"Received DML '"
+ sql
+ "' for processing, binlog probably contains events generated with statement or mixed based replication format");
"Received DML '{}' for processing, binlog probably contains events generated with statement or mixed based replication format",
sql);
return;
}
if (sql.equalsIgnoreCase("ROLLBACK")) {
Expand Down Expand Up @@ -1573,6 +1579,13 @@ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex)
}
}

private static boolean isDmlStatement(String upperCasedStatementBegin) {
return upperCasedStatementBegin.startsWith(DML_INSERT_PREFIX)
|| upperCasedStatementBegin.startsWith(DML_UPDATE_PREFIX)
|| upperCasedStatementBegin.startsWith(DML_DELETE_PREFIX)
|| upperCasedStatementBegin.startsWith(DML_REPLACE_PREFIX);
}

@FunctionalInterface
private interface TableIdProvider<E extends EventData> {
TableId getTableId(E data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void after() throws Exception {
binaryLogClient.disconnect();
}
customerDatabase.dropDatabase();
customerDatabaseNoGtid.dropDatabase();
}

@Test
Expand Down Expand Up @@ -526,6 +527,68 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
reader.close();
}

@Test
void testDmlStatementFilteringWithStatementBinlogFormat() throws Exception {
customerDatabaseNoGtid.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL_CONTAINER_NOGTID,
customerDatabaseNoGtid,
StartupOptions.latest(),
new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);

// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = createBinlogReader(sourceConfig);
reader.submitSplit(split);

// Simulate pt-table-checksum: use an independent JDBC connection to set session-level
// binlog_format to STATEMENT, then execute DML statements that will appear as QueryEvents
// in binlog. These should be filtered by isDmlStatement() in handleQueryEvent().
String qualifiedTableName = customerDatabaseNoGtid.qualifiedTableName("customers");
try (Connection conn = customerDatabaseNoGtid.getJdbcConnection();
Statement stmt = conn.createStatement()) {
stmt.execute("SET SESSION binlog_format = 'STATEMENT'");
// REPLACE INTO - the key DML that was missing from the filter before DBZ-9428
stmt.execute(
"REPLACE INTO "
+ qualifiedTableName
+ " VALUES(103, 'user_3', 'Shanghai', '123567891234')");
// Other DML statements in STATEMENT mode
stmt.execute(
"INSERT INTO "
+ qualifiedTableName
+ " VALUES(9999, 'pt_checksum_user', 'TestCity', '000000000000')");
stmt.execute(
"UPDATE " + qualifiedTableName + " SET address = 'PtCity' WHERE id = 9999");
stmt.execute("DELETE FROM " + qualifiedTableName + " WHERE id = 9999");
// Restore ROW mode for subsequent normal DML
stmt.execute("SET SESSION binlog_format = 'ROW'");
}

// Execute a normal ROW-mode DML to verify the reader is still healthy
mySqlConnection.execute(
"INSERT INTO "
+ qualifiedTableName
+ " VALUES(2001, 'user_22', 'Shanghai', '123567891234')");
mySqlConnection.commit();

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
// Only the ROW-mode INSERT should be captured; STATEMENT-mode DMLs are filtered out
String[] expected = new String[] {"+I[2001, user_22, Shanghai, 123567891234]"};
List<String> actual = readBinlogSplits(dataType, reader, expected.length);
assertEqualsInOrder(Arrays.asList(expected), actual);

reader.close();
}

@Test
void testReadBinlogFromEarliestOffset() throws Exception {
customerDatabase.createAndInitialize();
Expand Down
Loading