Skip to content

Conversation

@fightBoxing
Copy link

This PR adds CDC support for Flink Iceberg source, allowing users to read changelog data with proper RowKind (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER).

Key changes:

  • Add StreamingReadMode enum (APPEND_ONLY, CHANGELOG)
  • Add ChangelogDataIterator for iterating changelog scan tasks
  • Add RowDataChangelogScanTaskReader for reading changelog data
  • Add ChangelogScanSplit for CDC split handling
  • Add ChangelogRowDataReaderFunction for reader function support
  • Modify FlinkReadOptions/FlinkReadConf to support streaming-read-mode config
  • Modify ScanContext to support changelog scan mode
  • Modify FlinkSplitPlanner to plan changelog scan tasks
  • Modify ContinuousSplitPlannerImpl to support CDC mode
  • Modify IcebergSource to support streamingReadMode builder method
  • Modify IcebergTableSource to support CDC ChangelogMode
  • Add integration tests for CDC streaming read

Usage:

  • Java API: IcebergSource.forRowData().streamingReadMode(StreamingReadMode.CHANGELOG)
  • SQL: SELECT * FROM table /*+ OPTIONS('streaming-read-mode' = 'CHANGELOG') */

Supported Flink versions: v1.20, v2.0, v2.1

This PR adds CDC support for Flink Iceberg source, allowing users to read
changelog data with proper RowKind (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER).

Key changes:
- Add StreamingReadMode enum (APPEND_ONLY, CHANGELOG)
- Add ChangelogDataIterator for iterating changelog scan tasks
- Add RowDataChangelogScanTaskReader for reading changelog data
- Add ChangelogScanSplit for CDC split handling
- Add ChangelogRowDataReaderFunction for reader function support
- Modify FlinkReadOptions/FlinkReadConf to support streaming-read-mode config
- Modify ScanContext to support changelog scan mode
- Modify FlinkSplitPlanner to plan changelog scan tasks
- Modify ContinuousSplitPlannerImpl to support CDC mode
- Modify IcebergSource to support streamingReadMode builder method
- Modify IcebergTableSource to support CDC ChangelogMode
- Add integration tests for CDC streaming read

Usage:
- Java API: IcebergSource.forRowData().streamingReadMode(StreamingReadMode.CHANGELOG)
- SQL: SELECT * FROM table /*+ OPTIONS('streaming-read-mode' = 'CHANGELOG') */

Supported Flink versions: v1.20, v2.0, v2.1
@github-actions github-actions bot added the flink label Feb 10, 2026
rockyyin added 2 commits February 10, 2026 17:09
- Fix line length violations in FlinkSplitPlanner, IcebergSource, RowDataChangelogScanTaskReader
- Remove unused import FlinkReadConf in IcebergTableSource
- Fix indentation in ContinuousSplitPlannerImpl
- Remove trailing blank lines in ChangelogDataIterator
- Fix test formatting in TestIcebergSourceCdcStreaming
- Rewrite ChangelogRowDataReaderFunction to implement ReaderFunction<RowData>
  directly instead of extending DataIteratorReaderFunction, fixing:
  - batcher() method not accessible (private in parent class)
  - ChangelogDataIterator not compatible with DataIterator type hierarchy
- Add custom ChangelogBatchIterator using Pool and ArrayBatchRecords for
  efficient batch processing of changelog records
- Delegate normal (non-changelog) splits to RowDataReaderFunction
- Fix v2.0 API incompatibility: use correct Flink 2.0 imports
  (org.apache.flink.table.legacy.api.TableSchema)
- Fix v2.0 IcebergTableSource: use correct applyProjection(int[][], DataType)
  signature matching Flink 2.0 SupportsProjectionPushDown interface
- Add CDC getChangelogMode support to v2.0 and v2.1 IcebergTableSource
- Fix PreferUncheckedIoException: use UncheckedIOException instead of
  RuntimeException when wrapping IOException in ContinuousSplitPlannerImpl
- Remove unused 'limit' field from ChangelogRowDataReaderFunction
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant