Skip to content

[Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled #4246

Open
ThorneANN wants to merge 67 commits intoapache:masterfrom
ThorneANN:FLINK-38911-binlog-added-new-table
Open

[Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled #4246
ThorneANN wants to merge 67 commits intoapache:masterfrom
ThorneANN:FLINK-38911-binlog-added-new-table

Conversation

@ThorneANN
Copy link
Contributor

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:

Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
Add validation logic to ensure binlog-only mode works only with stream-only startup modes
Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder
The feature converts table patterns (e.g., "db.table_.") to Debezium regex style (e.g., "db.table_.")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Thorne and others added 3 commits January 29, 2026 20:15
…r DataStream API

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:
- Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
- Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
- Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
- Add validation logic to ensure binlog-only mode works only with stream-only startup modes
- Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
- Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
- Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder

The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@github-actions github-actions bot added docs Improvements or additions to documentation mysql-cdc-connector labels Jan 30, 2026
@ThorneANN ThorneANN changed the title [Flink 38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled [Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled Feb 2, 2026
@ThorneANN
Copy link
Contributor Author

@copilot review

@ThorneANN
Copy link
Contributor Author

@Copilot review

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a new feature for the MySQL CDC DataStream connector to support capturing newly added tables in binlog-only mode without triggering a snapshot phase. The feature adds a new configuration option scan.binlog.newly-added-table.enabled that allows tables matching a pattern to be dynamically discovered and captured during the binlog reading phase.

Changes:

  • Added new configuration option scan.binlog.newly-added-table.enabled in MySqlSourceOptions with comprehensive documentation
  • Extended MySqlSource builder API and configuration classes to support the new feature
  • Added table pattern conversion logic from Flink CDC style to Debezium regex style
  • Added integration tests to validate binlog-only table capture functionality
  • Updated English and Chinese documentation with usage examples

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
MySqlSourceOptions.java Added new experimental config option for binlog-only newly added table capture
MySqlSourceConfig.java Added field and getter for scanBinlogNewlyAddedTableEnabled flag
MySqlSourceConfigFactory.java Added pattern conversion logic and validation for mutually exclusive modes
MySqlSourceBuilder.java Exposed scanBinlogNewlyAddedTableEnabled() API with detailed javadoc
MySqlTableSource.java Threaded scanBinlogNewlyAddedTableEnabled through table source implementation
MySqlTableSourceFactory.java Added config option to optional options and passed through to source
MySqlSnapshotSplitAssigner.java Added early return in captureNewlyAddedTables when binlog-only mode is enabled
BinlogOnlyNewlyAddedTableITCase.java Comprehensive integration tests for the new feature
binlog_test.sql Test database schema documentation with setup comments
MySqlTableSourceFactoryTest.java Updated test cases to include new config parameter
mysql-cdc.md (EN/ZH) Added documentation for the new configuration option with pattern examples

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

--
-- IMPORTANT: This SQL file defines the initial schema for reference and documentation.
-- The actual test creates tables dynamically during execution to validate binlog-only capture.
-- The initial_table is crea1ted in @BeforeEach to ensure binlog is active before CDC source starts.
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in the comment. "crea1ted" should be "created".

Suggested change
-- The initial_table is crea1ted in @BeforeEach to ensure binlog is active before CDC source starts.
-- The initial_table is created in @BeforeEach to ensure binlog is active before CDC source starts.

Copilot uses AI. Check for mistakes.
throw new IllegalArgumentException(
"Cannot enable both 'scan.binlog.newly-added-table.enabled' and "
+ "'scan.newly-added-table.enabled' as they may cause duplicate data");
}
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions that validation logic should ensure binlog-only mode works only with stream-only startup modes (latest, earliest, specific offset, timestamp). However, this validation is missing. When scanBinlogNewlyAddedTableEnabled is true but the startup mode is not stream-only (e.g., initial or snapshot), the behavior may be incorrect. Add validation to check if startupOptions.isStreamOnly() when scanBinlogNewlyAddedTableEnabled is enabled.

Suggested change
}
}
// Validate: binlog-only newly-added-table scan requires a stream-only startup mode
if (scanBinlogNewlyAddedTableEnabled && !startupOptions.isStreamOnly()) {
throw new IllegalArgumentException(
"'scan.binlog.newly-added-table.enabled' can only be enabled when using a "
+ "stream-only startup mode (latest-offset, earliest-offset, specific-offset, or timestamp).");
}

Copilot uses AI. Check for mistakes.
Comment on lines 225 to 236
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}

// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description states "Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern", and the comment here claims "BinlogSplitReader will handle the auto-capture", but there are no changes to BinlogSplitReader.java in this PR. The existing logic in BinlogSplitReader.hasEnterPureBinlogPhase() (line 318-323) only checks isScanNewlyAddedTableEnabled(), not isScanBinlogNewlyAddedTableEnabled(). This means the feature will not actually work as described. BinlogSplitReader needs to be enhanced to check isScanBinlogNewlyAddedTableEnabled() and handle newly added tables accordingly.

Suggested change
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
// Enable newly-added table discovery when either snapshot-based or binlog-only
// newly-added table capture is configured.
final boolean scanNewlyAddedTablesEnabled =
sourceConfig.isScanNewlyAddedTableEnabled()
|| sourceConfig.isScanBinlogNewlyAddedTableEnabled();
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured.");
}
// Don't scan newly added table in snapshot-only startup mode.
if (scanNewlyAddedTablesEnabled

Copilot uses AI. Check for mistakes.
LOG.debug("After replacing comma with pipe separator: {}", tables);

// Step 2: Replace escaped dot \. with placeholder
// In Flink CDC, \. means any character in regex, in Debezium it should be .
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is incorrect. In standard regex notation (including Flink CDC), the backslash escapes the dot: \. means a literal dot character, while . (unescaped) means any single character. The comment should say "In Flink CDC, . means a literal dot in regex, in Debezium it should be . (any character)".

Suggested change
// In Flink CDC, \. means any character in regex, in Debezium it should be .
// In Flink CDC, \. means a literal dot in regex, in Debezium it should be . (any character).

Copilot uses AI. Check for mistakes.
Comment on lines +114 to +124
testDatabase.getDatabaseName() + ".user_.*",
"user_profiles",
"user_settings",
"user_logs");
}

@Test
void testBinlogOnlyCaptureWithDatabasePattern() throws Exception {
// Test with database.* pattern
testBinlogOnlyCaptureWithPattern(
testDatabase.getDatabaseName() + ".*", "product_inventory", "product_catalog");
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inconsistency between the test patterns and the documented pattern format. The javadoc in MySqlSourceBuilder (lines 231-235) shows patterns like "db\." and "db\.user_\." (which become "db." and "db.user_." as string values), but these tests use unescaped patterns like ".user_." and ".". The pattern conversion function in MySqlSourceConfigFactory.convertToDebeziumStyle() expects escaped patterns (with backslash-dot), so these test patterns may not work correctly. Either the tests should use the documented format (e.g., "\.user_\.*"), or the conversion function needs to be updated to handle both escaped and unescaped patterns.

Copilot uses AI. Check for mistakes.
@ThorneANN
Copy link
Contributor Author

help retrigger ci @lvyanquan

@ThorneANN
Copy link
Contributor Author

can u help me retrigger ci once @lvyanquan

Comment on lines +225 to +234
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this check is unnecessary since sourceConfig.isScanNewlyAddedTableEnabled and sourceConfig.isScanBinlogNewlyAddedTableEnabled will not be enabled simultaneously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

* @param tables Flink CDC style table pattern
* @return Debezium style table pattern
*/
private static String convertToDebeziumStyle(String tables) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid rewriting TableIdRouter#convertTableListToRegExpPattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be batter

+ "'db\\.*' (all tables in database 'db'), "
+ "'db\\.user_\\.*' (tables like 'user_orders', 'user_profiles'), "
+ "'db\\.order_[0-9]+' (tables like 'order_1', 'order_2'), "
+ "'db1\\.*,db2\\.user_\\.*' (all tables in 'db1' and 'user_*' tables in 'db2').");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example seems incorrect, should be db.\\.*, db.user_\\.* ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

U are right and i forget it , fix it soon

Comment on lines +425 to +428
if (scanBinlogNewlyAddedTableEnabled) {
String originalPattern = String.join(",", tableList);
String debeziumPattern = convertToDebeziumStyle(originalPattern);
props.setProperty("table.include.list", debeziumPattern);
Copy link
Member

@yuxiqian yuxiqian Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC MySQL DataStream connector uses standard RegEx (. is wildcard matcher and \. is the dot character). But after your modification and binlog newly tables capturing enabled, . will be the dot character and db/table delimiter while \. is the meta character, which is strange. Keeping the same behavior in binlog capturing mode unchanged would make more sense.

Hisoka-X and others added 16 commits March 19, 2026 15:16
…partition and bucket. (apache#4298)

Signed-off-by: Pei Yu <125331682@qq.com>
…pache#4278)

When a table is excluded from configuration after a restart from savepoint,
the MySQL CDC source could get stuck in the INITIAL_ASSIGNING state. This
happened because table exclusion cleanup was only performed when
isAssigningFinished() was true, but the assigner couldn't finish because
excluded table splits were never reported as finished.

The fix separates two concerns in captureNewlyAddedTables():
- Adding new tables: should only happen when isAssigningFinished()
- Removing excluded tables: must happen regardless of assigner status

Added integration test TableExclusionDuringSnapshotIT that reproduces the
issue by using a blocking hook to take a savepoint during INITIAL_ASSIGNING
phase, then restarting with a table excluded from configuration.
…is not in the correct directory and should have a private instance (apache#4306)

Co-authored-by: Thorne <syyfffy@163.com>
…r DataStream API

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:
- Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
- Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
- Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
- Add validation logic to ensure binlog-only mode works only with stream-only startup modes
- Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
- Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
- Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder

The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@ThorneANN ThorneANN closed this Mar 19, 2026
Thorne and others added 10 commits March 19, 2026 15:31
…r DataStream API

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:
- Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
- Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
- Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
- Add validation logic to ensure binlog-only mode works only with stream-only startup modes
- Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
- Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
- Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder

The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…r DataStream API

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:
- Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
- Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
- Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
- Add validation logic to ensure binlog-only mode works only with stream-only startup modes
- Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
- Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
- Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder

The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…le' into FLINK-38911-binlog-added-new-table
@ThorneANN ThorneANN reopened this Mar 19, 2026
@yuxiqian
Copy link
Member

Kindly reminder: you may run git rebase master to keep commit history clean.

@ThorneANN
Copy link
Contributor Author

温馨提示:您可以运行此程序git rebase master以保持提交历史记录的整洁。

yes and i done it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Improvements or additions to documentation mysql-cdc-connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.