mysql,sqlmodel: support table route in mysql sink#5006
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR makes the MySQL sink routing-aware: DDL execution and needSwitchDB use routed target schema names; DML builders and RowChange emit target-quoted table identifiers; the DDL rewriter fills default schema before routing; TableInfo init and dispatcher caching were updated; unit and integration tests validate routed behavior. ChangesTable Routing for MySQL Sink
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request ensures that table routing is correctly applied to MySQL sinks for both DML and DDL operations. It updates the SQL generation logic across several files to use target schema and table names (via GetTargetSchemaName and QuoteTargetString) instead of source names. Additionally, it introduces a new integration test suite and several unit tests to verify that routed tables are correctly handled during replication. I have no feedback to provide as there were no review comments.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/sink/sqlmodel/multi_row_test.go`:
- Around line 44-103: The test TestGenMultiRowSQLUsesRoutedTargetTable is
missing t.Parallel(); add t.Parallel() as the first statement in that function
so it runs concurrently with the other tests in multi_row_test.go, keeping test
behavior unchanged otherwise; locate the TestGenMultiRowSQLUsesRoutedTargetTable
function and insert the call at the top.
In `@tests/integration_tests/table_route/run.sh`:
- Around line 6-84: The script suffers from unquoted variable expansions which
can cause word-splitting/globbing issues; go through the run() function and
quote all shell variable expansions used in commands and args (e.g. "$WORK_DIR",
"$OUT_DIR", "$TEST_NAME", "$CUR", "$DOWN_TIDB_HOST", "$DOWN_TIDB_PORT",
"$UP_TIDB_HOST", "$UP_TIDB_PORT", "$CDC_BINARY", "$SINK_URI", "$KEYSPACE_NAME")
— update calls like rm -rf $WORK_DIR, mkdir -p $WORK_DIR, start_tidb_cluster
--workdir $WORK_DIR,
run_sql/run_sql_file/check_table_exists/check_table_not_exists/run_cdc_server/cdc_cli_changefeed
invocations and any literal path concatenations (e.g. $CUR/conf/changefeed.toml,
$CUR/data/prepare.sql) to use double-quoted expansions so arguments are passed
safely.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 13e6b16a-4f9e-4de3-943d-e295d7a16f94
📒 Files selected for processing (17)
pkg/sink/mysql/helper.gopkg/sink/mysql/mysql_writer_ddl.gopkg/sink/mysql/mysql_writer_dml_active_active_test.gopkg/sink/mysql/mysql_writer_test.gopkg/sink/mysql/sql_builder.gopkg/sink/mysql/sql_builder_test.gopkg/sink/sqlmodel/multi_row.gopkg/sink/sqlmodel/multi_row_test.gopkg/sink/sqlmodel/multi_row_v1.gopkg/sink/sqlmodel/row_change.gopkg/sink/sqlmodel/row_change_test.gotests/integration_tests/run_light_it_in_ci.shtests/integration_tests/table_route/README.mdtests/integration_tests/table_route/conf/changefeed.tomltests/integration_tests/table_route/data/prepare.sqltests/integration_tests/table_route/data/test.sqltests/integration_tests/table_route/run.sh
72dddae to
8729a9d
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
tests/integration_tests/table_route/run.sh (1)
6-50:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winQuote variable expansions in command arguments to avoid word-splitting/globbing bugs.
Unquoted expansions are still present across the run path (
source, path concatenations, host/port args, workdir args, trap/check_logs). This can break when values contain spaces or glob chars.Suggested minimal hardening patch
-source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME +source "$CUR/../_utils/test_prepare" +WORK_DIR="$OUT_DIR/$TEST_NAME" @@ - rm -rf $WORK_DIR && mkdir -p $WORK_DIR + rm -rf "$WORK_DIR" && mkdir -p "$WORK_DIR" @@ - start_tidb_cluster --workdir $WORK_DIR + start_tidb_cluster --workdir "$WORK_DIR" @@ - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --cluster-id "$KEYSPACE_NAME" + run_cdc_server --workdir "$WORK_DIR" --binary "$CDC_BINARY" --cluster-id "$KEYSPACE_NAME" @@ - run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file "$CUR/data/test.sql" "${UP_TIDB_HOST}" "${UP_TIDB_PORT}" @@ - check_table_exists target_db.finish_mark_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 120 + check_table_exists target_db.finish_mark_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" 90 + check_sync_diff "$WORK_DIR" "$CUR/conf/diff_config.toml" 120 @@ - check_table_not_exists source_db.users ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists source_db.orders ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.temp_table_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.multi_rename_a_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.multi_rename_b_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.to_be_dropped_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists source_db.users "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + check_table_not_exists source_db.orders "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + check_table_not_exists target_db.temp_table_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + check_table_not_exists target_db.multi_rename_a_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + check_table_not_exists target_db.multi_rename_b_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + check_table_not_exists target_db.to_be_dropped_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" + run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" @@ - check_table_not_exists target_db.transient_view_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.transient_view_routed "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" @@ - run_sql "DROP DATABASE source_db" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_db_not_exists target_db ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + run_sql "DROP DATABASE source_db" "${UP_TIDB_HOST}" "${UP_TIDB_PORT}" + check_db_not_exists target_db "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" 90 @@ -trap 'stop_test $WORK_DIR' EXIT +trap 'stop_test "$WORK_DIR"' EXIT @@ -check_logs $WORK_DIR +check_logs "$WORK_DIR"#!/bin/bash # Verify remaining unquoted variable expansions in this script (read-only). # Expected result after fix: no SC2086 findings for this file. shellcheck -x tests/integration_tests/table_route/run.sh🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration_tests/table_route/run.sh` around lines 6 - 50, The script leaves many unquoted variable expansions which can cause word-splitting/globbing bugs; update all command arguments and path concatenations to quote variables (e.g., "$CUR", "$OUT_DIR", "$TEST_NAME", "$WORK_DIR", "$CDC_BINARY", "$SINK_TYPE", "$KEYSPACE_NAME", "$DOWN_TIDB_HOST", "$DOWN_TIDB_PORT", "$UP_TIDB_HOST", "$UP_TIDB_PORT", "$SINK_URI") used in run(), the trap/stop_test call, start_tidb_cluster, run_cdc_server, cdc_cli_changefeed create, run_sql_file, run_sql, check_* and check_logs so every invocation uses quoted expansions like "$VAR" to avoid SC2086-style issues.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@tests/integration_tests/table_route/run.sh`:
- Around line 6-50: The script leaves many unquoted variable expansions which
can cause word-splitting/globbing bugs; update all command arguments and path
concatenations to quote variables (e.g., "$CUR", "$OUT_DIR", "$TEST_NAME",
"$WORK_DIR", "$CDC_BINARY", "$SINK_TYPE", "$KEYSPACE_NAME", "$DOWN_TIDB_HOST",
"$DOWN_TIDB_PORT", "$UP_TIDB_HOST", "$UP_TIDB_PORT", "$SINK_URI") used in run(),
the trap/stop_test call, start_tidb_cluster, run_cdc_server, cdc_cli_changefeed
create, run_sql_file, run_sql, check_* and check_logs so every invocation uses
quoted expansions like "$VAR" to avoid SC2086-style issues.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e6191306-fa31-4e36-ab31-2c57e90b2dfe
📒 Files selected for processing (20)
downstreamadapter/routing/router_supported_ddl_test.gopkg/common/event/ddl_event.gopkg/common/event/ddl_event_test.gopkg/sink/mysql/helper.gopkg/sink/mysql/mysql_writer_ddl.gopkg/sink/mysql/mysql_writer_dml_active_active_test.gopkg/sink/mysql/mysql_writer_test.gopkg/sink/mysql/sql_builder.gopkg/sink/mysql/sql_builder_test.gopkg/sink/sqlmodel/multi_row.gopkg/sink/sqlmodel/multi_row_test.gopkg/sink/sqlmodel/multi_row_v1.gopkg/sink/sqlmodel/row_change.gopkg/sink/sqlmodel/row_change_test.gotests/integration_tests/run_light_it_in_ci.shtests/integration_tests/table_route/README.mdtests/integration_tests/table_route/conf/changefeed.tomltests/integration_tests/table_route/conf/diff_config.tomltests/integration_tests/table_route/data/test.sqltests/integration_tests/table_route/run.sh
💤 Files with no reviewable changes (1)
- pkg/common/event/ddl_event_test.go
✅ Files skipped from review due to trivial changes (2)
- tests/integration_tests/table_route/conf/diff_config.toml
- tests/integration_tests/table_route/conf/changefeed.toml
🚧 Files skipped from review as they are similar to previous changes (10)
- pkg/sink/mysql/mysql_writer_ddl.go
- tests/integration_tests/run_light_it_in_ci.sh
- pkg/sink/mysql/helper.go
- tests/integration_tests/table_route/README.md
- pkg/sink/mysql/sql_builder_test.go
- pkg/sink/mysql/mysql_writer_dml_active_active_test.go
- tests/integration_tests/table_route/data/test.sql
- pkg/sink/mysql/mysql_writer_test.go
- pkg/sink/sqlmodel/multi_row_test.go
- pkg/sink/sqlmodel/row_change_test.go
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/retest |
|
/test all |
|
/test all |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
downstreamadapter/eventcollector/dispatcher_stat.go (1)
463-472: 💤 Low valueOptional: hoist the cached-id lookup for clarity.
Minor readability nit only — the two-step
expectedTableIDcomputation is fine, but reading "use span id, then maybe override from cache" backwards is slightly awkward. A small refactor could fold the cache into the conditional directly:♻️ Suggested refactor (no behavior change)
- expectedTableID := tableSpan.TableID - current := d.tableInfo.Load() - if current != nil { - expectedTableID = current.(*common.TableInfo).TableName.TableID - } - if ddl.TableInfo.TableName.TableID != expectedTableID { - return - } + expectedTableID := tableSpan.TableID + if cached, ok := d.tableInfo.Load().(*common.TableInfo); ok && cached != nil { + // For partition tables, span TableID is physical; cache holds the logical id. + expectedTableID = cached.TableName.TableID + } + if ddl.TableInfo.TableName.TableID != expectedTableID { + return + }Feel free to skip if you prefer the current form.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 463 - 472, Hoist the cached-table-id lookup for clarity by first loading d.tableInfo.Load() into a local variable (e.g., cached := d.tableInfo.Load()), then set expectedTableID to the cached id when cached != nil (casting to *common.TableInfo and reading TableName.TableID), otherwise fall back to tableSpan.TableID; finally compare ddl.TableInfo.TableName.TableID to expectedTableID as before. Update references to expectedTableID, tableSpan.TableID, d.tableInfo.Load(), and common.TableInfo to reflect this reordered logic without changing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 463-472: Hoist the cached-table-id lookup for clarity by first
loading d.tableInfo.Load() into a local variable (e.g., cached :=
d.tableInfo.Load()), then set expectedTableID to the cached id when cached !=
nil (casting to *common.TableInfo and reading TableName.TableID), otherwise fall
back to tableSpan.TableID; finally compare ddl.TableInfo.TableName.TableID to
expectedTableID as before. Update references to expectedTableID,
tableSpan.TableID, d.tableInfo.Load(), and common.TableInfo to reflect this
reordered logic without changing behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6b15bd65-7674-4f51-be23-8552570a7506
📒 Files selected for processing (10)
downstreamadapter/eventcollector/dispatcher_stat.godownstreamadapter/eventcollector/dispatcher_stat_test.godownstreamadapter/routing/ddl_query_rewriter.godownstreamadapter/routing/router_apply_test.godownstreamadapter/routing/router_supported_ddl_test.gopkg/sink/sqlmodel/multi_row_test.gotests/integration_tests/table_route/conf/changefeed.tomltests/integration_tests/table_route/conf/diff_config.tomltests/integration_tests/table_route/data/test.sqltests/integration_tests/table_route/run.sh
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/integration_tests/table_route/conf/diff_config.toml
- pkg/sink/sqlmodel/multi_row_test.go
- tests/integration_tests/table_route/run.sh
|
/test all |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/common/event/dml_event.go (1)
295-301:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftUnresolved crash path: partition DDL + routing version mismatch will
log.Panicin production.The TODO correctly identifies the scenario: when a partition DDL (e.g.
TRUNCATE PARTITION,EXCHANGE PARTITION) races with the dispatcher's cached routedTableInfo,originVersion != routedVersioncan be legitimately true, andlog.Panicat line 298 will terminate the process.This PR enables table routing in the MySQL sink; any changefeed that combines routing with partition DDLs on the replicated tables would hit this path. The same version-mismatch guard in the remote-rows path (lines 315–323) has no TODO and would face the same risk.
Before this change ships, the expected behavior for this case needs to be defined — either:
- Accept that the stale local event is compatible with the new schema (e.g., columns didn't change, only partition layout did) and proceed without panicking; or
- Discard the event and signal a re-sync.
Do you want me to draft a fix that differentiates DDL-driven version bumps (where the column schema is still compatible) from genuine schema incompatibilities, so only the latter panics?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/common/event/dml_event.go` around lines 295 - 301, The log.Panic on a routed TableInfo version mismatch must be replaced with compatibility-aware handling: in the block where originVersion != routedVersion (around the routed table info check in dml_event.go), do not panic unconditionally—compare the origin TableInfo and routed TableInfo column schemas (implement or call a helper like areColumnSchemasCompatible(originTableInfo, routedTableInfo)); if schemas are compatible (e.g., only partition/metadata changed) proceed processing the local row using the routed TableInfo and log a warning; if incompatible, drop the event and surface a sync-needed error (or keep the existing panic only for confirmed incompatible schema changes) so consumers can trigger a re-sync. Also align behavior with the remote-rows path (the guard around lines 315–323) to ensure consistent handling of version mismatches.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@pkg/common/event/dml_event.go`:
- Around line 295-301: The log.Panic on a routed TableInfo version mismatch must
be replaced with compatibility-aware handling: in the block where originVersion
!= routedVersion (around the routed table info check in dml_event.go), do not
panic unconditionally—compare the origin TableInfo and routed TableInfo column
schemas (implement or call a helper like
areColumnSchemasCompatible(originTableInfo, routedTableInfo)); if schemas are
compatible (e.g., only partition/metadata changed) proceed processing the local
row using the routed TableInfo and log a warning; if incompatible, drop the
event and surface a sync-needed error (or keep the existing panic only for
confirmed incompatible schema changes) so consumers can trigger a re-sync. Also
align behavior with the remote-rows path (the guard around lines 315–323) to
ensure consistent handling of version mismatches.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c0eaaa4-def4-4a8a-8451-be89b07e3c0b
📒 Files selected for processing (10)
downstreamadapter/eventcollector/dispatcher_stat.gopkg/common/event/active_active_test.gopkg/common/event/ddl_event.gopkg/common/event/ddl_event_test.gopkg/common/event/dml_event.gopkg/common/event/handshake_event.gopkg/common/event/util.gopkg/common/table_info.gopkg/common/table_info_test.gopkg/sink/mysql/sql_builder_test.go
💤 Files with no reviewable changes (5)
- pkg/common/event/ddl_event.go
- pkg/common/event/active_active_test.go
- pkg/common/event/util.go
- pkg/common/event/handshake_event.go
- pkg/common/event/ddl_event_test.go
✅ Files skipped from review due to trivial changes (1)
- pkg/common/table_info_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/sink/mysql/sql_builder_test.go
- downstreamadapter/eventcollector/dispatcher_stat.go
|
/test all |
|
/test all |
|
/retest |
|
/test all |
|
/retest |
|
/test pull-cdc-kafka-integration-light |
|
/test pull-cdc-kafka-integration-light |
|
/retest |
|
/test all |
|
/test pull-cdc-kafka-integration-light |
|
/retest |
|
/test all |
|
/retest |
2 similar comments
|
/retest |
|
/retest |
What problem does this PR solve?
Issue Number: close #4818
This PR makes the MySQL sink honor table route rules when generating and
executing downstream SQL. Before this change, parts of the MySQL sink still used
source schema/table names directly, so a routed changefeed could generate DML or
execute DDL against the source database/table instead of the routed downstream
target.
What is changed and how it works?
Use routed schema names when executing DDL in the MySQL sink.
needSwitchDBchecksDDLEvent.GetTargetSchemaName().execDDLrunsUSE <target schema>when the DDL needs a default database.Use routed table names when generating MySQL DML SQL.
name stored in
TableInfo.sqlmodelsingle-row and multi-row insert/delete/update builders useQuoteTargetString(), so the same row model can generate SQL for routedtargets.
Keep routed metadata usable across DDL and DML paths.
unqualified table names in DDL can be routed correctly.
TableInfoclones initialize their private SQL fields before beingused by downstream sinks.
unrelated DDL table info, while still advancing the delivered table info
version for DDLs sent to that dispatcher.
Add a MySQL-only table route integration test.
source_db.*totarget_db.*_routedandsource_extra_db.*totarget_extra_db.*_routed.partition DDL, cross-database DDL, and drop database.
routed databases disappear downstream.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No compatibility break is expected. Without table route rules, target schema and
table names fall back to the original source names.
No meaningful performance regression is expected. The MySQL sink uses target
names already carried by routed events/table info instead of applying routing in
the DML hot path.
Do you need to update user documentation, design documentation or monitoring documentation?
No. This PR wires existing table route behavior into the MySQL sink and adds
integration coverage; it does not introduce new user-facing configuration or
monitoring items.
Release note