Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");

private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1";

protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);

-- table has auto increment primary key for pt-osc testing
CREATE TABLE customers_auto_id (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);

INSERT INTO customers_auto_id
VALUES (default, 'user_1', 'Shanghai', '123567891234'),
(default, 'user_2', 'Shanghai', '123567891234'),
(default, 'user_3', 'Shanghai', '123567891234'),
(default, 'user_4', 'Shanghai', '123567891234'),
(default, 'user_5', 'Shanghai', '123567891234'),
(default, 'user_6', 'Shanghai', '123567891234'),
(default, 'user_7', 'Shanghai', '123567891234'),
(default, 'user_8', 'Shanghai', '123567891234'),
(default, 'user_9', 'Shanghai', '123567891234'),
(default, 'user_10', 'Shanghai', '123567891234'),
(default, 'user_11', 'Shanghai', '123567891234'),
(default, 'user_12', 'Shanghai', '123567891234'),
(default, 'user_13', 'Shanghai', '123567891234'),
(default, 'user_14', 'Shanghai', '123567891234'),
(default, 'user_15', 'Shanghai', '123567891234'),
(default, 'user_16', 'Shanghai', '123567891234'),
(default, 'user_17', 'Shanghai', '123567891234'),
(default, 'user_18', 'Shanghai', '123567891234'),
(default, 'user_19', 'Shanghai', '123567891234'),
(default, 'user_20', 'Shanghai', '123567891234'),
(default, 'user_21', 'Shanghai', '123567891234');
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private final boolean isBackfillSkipped;
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;

private static final long READER_CLOSE_TIMEOUT = 30L;

Expand All @@ -114,6 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
this.pendingSchemaChangeEvents = new HashMap<>();
}

public void submitSplit(MySqlSplit mySqlSplit) {
Expand Down Expand Up @@ -181,8 +184,35 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
if (oscRecord.isPresent()) {
sourceRecords.add(oscRecord.get());
continue;
TableId tableId = RecordUtils.getTableId(oscRecord.get());
if (tableId != null) {
LOG.info(
"Received the start event of online schema change: {}. Save it for later.",
oscRecord.get());
pendingSchemaChangeEvents
.computeIfAbsent(tableId.toString(), k -> new ArrayList<>())
.add(oscRecord.get());
continue;
}
}

Optional<String> finishedTables =
OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord());
if (finishedTables.isPresent()) {
TableId tableId = RecordUtils.getTableId(event.getRecord());
String finishedTableId = tableId.catalog() + "." + finishedTables.get();
LOG.info(
"Received the ending event of table {}. Emit corresponding DDL event now.",
finishedTableId);

if (pendingSchemaChangeEvents.containsKey(finishedTableId)) {
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
} else {
LOG.error(
"Error: met an unexpected osc finish event. Current pending events: {}, Record: {}",
pendingSchemaChangeEvents,
event);
}
}
}
if (shouldEmit(event.getRecord())) {
Expand Down Expand Up @@ -228,11 +258,11 @@ public void close() {
}

private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) {
if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
// This is a gh-ost initialized schema change event and should be emitted if the
// peeled tableId matches the predicate.
TableId originalTableId = RecordUtils.getTableId(sourceRecord);
TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
TableId peeledTableId = OnlineSchemaChangeUtils.peelTableId(originalTableId);
if (capturedTableFilter.test(peeledTableId)) {
return Optional.of(
RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.source.utils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;

/** Utility class for handling gh-ost/pt-osc online schema change events. */
public class OnlineSchemaChangeUtils {

private static final Logger LOG = LoggerFactory.getLogger(OnlineSchemaChangeUtils.class);

private OnlineSchemaChangeUtils() {}

/**
* Pattern matching gh-ost shadow table ({@code _<name>_gho}) and pt-osc new table ({@code
* _<name>_new}), which carry the actual ALTER DDL during an online schema change.
*/
private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");

/**
* Pattern matching gh-ost delete table ({@code _<name>_del}) and pt-osc old table ({@code
* _<name>_old}), which are the temporary backup tables created during an online schema change.
*/
private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$");

/**
* Checks whether the given source record is a gh-ost/pt-osc initiated schema change event by
* inspecting the ALTER DDL statement targeting a shadow/new table.
*
* <p>There will be these schema change events generated in total during one transaction.
*
* <p>gh-ost:
*
* <pre>
* DROP TABLE IF EXISTS `db`.`_tb1_gho`
* DROP TABLE IF EXISTS `db`.`_tb1_del`
* DROP TABLE IF EXISTS `db`.`_tb1_ghc`
* create /* gh-ost *&#47; table `db`.`_tb1_ghc` ...
* create /* gh-ost *&#47; table `db`.`_tb1_gho` like `db`.`tb1`
* alter /* gh-ost *&#47; table `db`.`_tb1_gho` add column c varchar(255)
* alter /* gh-ost *&#47; table `db`.`_tb1_gho` AUTO_INCREMENT=N (only present when the table has an AUTO_INCREMENT column)
* create /* gh-ost *&#47; table `db`.`_tb1_del` ...
* DROP TABLE IF EXISTS `db`.`_tb1_del`
* rename /* gh-ost *&#47; table `db`.`tb1` to `db`.`_tb1_del`
* rename /* gh-ost *&#47; table `db`.`_tb1_gho` to `db`.`tb1`
* DROP TABLE IF EXISTS `db`.`_tb1_ghc`
* DROP TABLE IF EXISTS `db`.`_tb1_del`
* </pre>
*
* <p>pt-osc:
*
* <pre>
* CREATE TABLE `db`.`_test_tb1_new`
* ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
* CREATE TRIGGER `pt_osc_db_test_tb1_del`...
* CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
* CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
* ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change *&#47;
* RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO `db`.`test_tb1`
* DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server *&#47;
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
* </pre>
*
* <p>Among all these, only the ALTER statement targeting the {@code _gho}/{@code _new} table is
* stored temporarily, and emitted when the subsequent RENAME TABLE event arrives.
*/
public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
if (!RecordUtils.isSchemaChangeEvent(record)) {
return false;
}
Struct value = (Struct) record.value();
ObjectMapper mapper = new ObjectMapper();
try {
String ddl =
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
.get(HistoryRecord.Fields.DDL_STATEMENTS)
.asText()
.toLowerCase();
if (ddl.startsWith("alter")) {
String tableName =
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
}
return false;
} catch (JsonProcessingException e) {
return false;
}
}

/**
* Parses a gh-ost/pt-osc RENAME TABLE event and returns the original (user-facing) table name
* if the event represents the completion of an online schema change.
*
* @return the original table name if the record is an OSC completion rename, or {@link
* Optional#empty()} otherwise.
*/
public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord record) {
if (!RecordUtils.isSchemaChangeEvent(record)) {
return Optional.empty();
}
Struct value = (Struct) record.value();
ObjectMapper mapper = new ObjectMapper();

try {
String ddl =
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
.get(HistoryRecord.Fields.DDL_STATEMENTS)
.asText()
.toLowerCase();
if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) {
LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl);
List<String> tableNames =
Arrays.asList(
value.getStruct(Envelope.FieldName.SOURCE)
.getString(TABLE_NAME_KEY)
.split(","));
if (tableNames.size() != 2) {
LOG.info(
"Table name {} is malformed, skip it.",
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY));
return Optional.empty();
}

String renamedFromTableName =
Collections.min(tableNames, Comparator.comparingInt(String::length));
String renamedToTableName =
Collections.max(tableNames, Comparator.comparingInt(String::length));

LOG.info(
"Determined the shorter TableId {} is the renaming source.",
renamedFromTableName);
LOG.info(
"Determined the longer TableId {} is the renaming target.",
renamedToTableName);

if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) {
LOG.info(
"Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.",
renamedToTableName,
renamedFromTableName);
return Optional.of(renamedFromTableName);
}

LOG.info(
"Renamed to TableId {} does not match any RegEx pattern, skip it.",
renamedToTableName);
}
return Optional.empty();
} catch (JsonProcessingException e) {
LOG.warn("Failed to parse schema change event {}", value, e);
return Optional.empty();
}
}

/**
* Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original user-facing one.
*
* <p>For example, {@code _customers_gho} → {@code customers}, {@code _orders_new} → {@code
* orders}.
*/
public static TableId peelTableId(TableId tableId) {
Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
if (matchingResult.matches()) {
return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1));
}
return tableId;
}
}
Loading
Loading