-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Fix](StreamingJob) Optimize CDC consumption strategy #60181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes the CDC (Change Data Capture) consumption strategy for streaming jobs by refactoring the data polling mechanism and improving heartbeat handling.
Changes:
- Refactored the SourceReader interface to split
readSplitRecordsinto two methods:prepareAndSubmitSplit(for split preparation) andpollRecords(for data retrieval) - Moved polling logic from individual readers to the PipelineCoordinator, introducing heartbeat-based synchronization to determine when to stop polling
- Reduced the Debezium heartbeat interval from 10 seconds to 3 seconds for faster offset updates and connection issue detection
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| Constants.java | Reduced DEBEZIUM_HEARTBEAT_INTERVAL_MS from 10000ms to 3000ms |
| SourceReader.java | Split readSplitRecords into prepareAndSubmitSplit and pollRecords methods |
| SplitReadResult.java | Removed recordIterator field as polling is now handled at coordinator level |
| JdbcIncrementalSourceReader.java | Implemented new split preparation/polling pattern, removed old pollUntilDataAvailable logic |
| MySqlSourceReader.java | Similar refactoring as JdbcIncrementalSourceReader for MySQL-specific implementation |
| PostgresSourceReader.java | Added heartbeat interval configuration for Postgres CDC |
| PipelineCoordinator.java | Major refactoring: added heartbeat-aware polling loops in fetchRecords and writeRecords, extracted helper methods for cleanup and offset extraction |
| StreamingMultiTblTask.java | Minor log message simplification |
Comments suppressed due to low confidence (1)
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:355
- The closeJobStreamLoad method is not synchronized, which could lead to a race condition with getOrCreateBatchStreamLoad. If closeJobStreamLoad is called while another thread is accessing the batch stream load, the stream load could be removed and closed while being used, potentially causing NullPointerException or data loss. Consider synchronizing this method or using a more fine-grained locking mechanism.
public void closeJobStreamLoad(Long jobId) {
DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId);
if (batchStreamLoad != null) {
LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId);
batchStreamLoad.close();
batchStreamLoad = null;
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (this.currentSplit instanceof MySqlSnapshotSplit) { | ||
| this.currentReader = getSnapshotSplitReader(baseReq); | ||
| } else if (this.currentSplit instanceof MySqlBinlogSplit) { | ||
| this.currentReader = getBinlogSplitReader(baseReq); | ||
| } | ||
| this.currentReader.submitSplit(this.currentSplit); |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential resource leak: if prepareAndSubmitSplit is called on a SourceReader that already has a currentReader, the old reader will be overwritten without being closed. Since SourceReader instances are reused across multiple requests for the same jobId (as seen in Env.getOrCreateReader), this could happen if finishSplitRecords fails or if there's an exception before cleanup. Consider closing the existing currentReader before creating a new one, or add a check to ensure currentReader is null before proceeding.
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Outdated
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
TPC-H: Total hot run time: 30679 ms |
TPC-DS: Total hot run time: 172675 ms |
ClickBench: Total hot run time: 26.61 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException { | ||
| Preconditions.checkState(this.currentReader != null, "currentReader is null"); | ||
| Preconditions.checkNotNull(splitState, "splitState is null"); | ||
| Preconditions.checkState( | ||
| splitState instanceof SourceSplitState, | ||
| "splitState type is invalid " + splitState.getClass()); | ||
|
|
||
| // Poll data from Debezium queue | ||
| Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords(); | ||
| if (dataIt == null || !dataIt.hasNext()) { | ||
| return Collections.emptyIterator(); // No data available | ||
| } | ||
|
|
||
| SourceRecords sourceRecords = dataIt.next(); | ||
| SplitRecords splitRecords = | ||
| new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator()); | ||
| if (!sourceRecords.getSourceRecordList().isEmpty()) { | ||
| LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size()); | ||
| } | ||
|
|
||
| // Return filtered iterator | ||
| return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState); | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to MySqlSourceReader, if pollRecords is called multiple times on the same split, a new FilteredRecordIterator is created each time while reusing the underlying currentSplit and currentReader. This could lead to state inconsistencies if the iterator is not fully consumed before the next call to pollRecords.
| if (this.currentSplit.isSnapshotSplit()) { | ||
| this.currentReader = getSnapshotSplitReader(baseReq); | ||
| } else if (this.currentSplit.isStreamSplit()) { | ||
| this.currentReader = getBinlogSplitReader(baseReq); | ||
| } else { | ||
| LOG.info( | ||
| "Continue read records with current split records, splitId: {}", | ||
| currentSplitRecords.getSplitId()); | ||
| throw new IllegalStateException( | ||
| "Unknown split type: " + this.currentSplit.getClass().getName()); | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to MySqlSourceReader, a new reader is created every time prepareAndSubmitSplit is called (lines 163 or 165), but the old currentReader is not closed before being replaced. This could lead to resource leaks. Consider closing the existing currentReader before creating a new one.
| // 1. Snapshot split with data: if no more data in queue, stop immediately (no need to wait | ||
| // for timeout) | ||
| // snapshot split will be written to the debezium queue all at once. | ||
| if (isSnapshotSplit && hasData) { | ||
| LOG.info( | ||
| "Snapshot split finished, no more data available. Total elapsed: {} ms", | ||
| elapsedTime); | ||
| return true; |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shouldStop logic for snapshot splits at lines 385-389 returns true when "isSnapshotSplit && hasData" without checking if there's more data in the iterator. This is based on the assumption that "snapshot split will be written to the debezium queue all at once" (line 384). However, the check happens when "!recordIterator.hasNext()" (line 121), which means we've already exhausted the current batch from the queue. For snapshot splits, this is correct behavior, but the comment at line 384 could be misleading if Debezium actually streams snapshot data in multiple batches. Consider verifying this assumption or adding more defensive checks.
| if (elapsedTime > maxIntervalMillis + Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) { | ||
| LOG.warn( | ||
| "Binlog split heartbeat wait timeout after {} ms, force stopping. Total elapsed: {} ms", | ||
| elapsedTime - maxIntervalMillis, | ||
| elapsedTime); | ||
| return true; |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The heartbeat wait protection at line 418 uses "maxIntervalMillis + Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3" as the maximum wait time. With the new heartbeat interval of 3000ms (3 seconds), this gives 9 extra seconds beyond the timeout. However, this magic number "3" is not documented. Consider adding a constant or comment explaining why 3 heartbeat intervals are chosen as the maximum wait time for the heartbeat protection.
| private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad( | ||
| WriteRecordRequest writeRecordRequest) { | ||
| DorisBatchStreamLoad batchStreamLoad = | ||
| batchStreamLoadMap.computeIfAbsent( | ||
| writeRecordRequest.getJobId(), | ||
| k -> { | ||
| LOG.info( | ||
| "Create DorisBatchStreamLoad for jobId={}", | ||
| writeRecordRequest.getJobId()); | ||
| return new DorisBatchStreamLoad( | ||
| writeRecordRequest.getJobId(), | ||
| writeRecordRequest.getTargetDb()); | ||
| }); | ||
| batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId()); | ||
| batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress()); | ||
| batchStreamLoad.setToken(writeRecordRequest.getToken()); | ||
| return batchStreamLoad; |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method getOrCreateBatchStreamLoad has been synchronized, which is good for thread safety. However, after creating or retrieving the DorisBatchStreamLoad, the code sets mutable properties (currentTaskId, frontendAddress, token) on lines 443-445. If multiple threads call writeRecords concurrently for the same jobId, these properties could be overwritten by another thread between setting them and using them in the writeRecords method. Consider either: 1) keeping these as parameters passed through rather than set as instance state, or 2) documenting that writeRecords should not be called concurrently for the same jobId.
| public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throws Exception { | ||
| Map<String, Object> offsetMeta = baseReq.getMeta(); | ||
| if (offsetMeta == null || offsetMeta.isEmpty()) { | ||
| throw new RuntimeException("miss meta offset"); | ||
| } | ||
| LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); | ||
|
|
||
| // If there is an active split being consumed, reuse it directly; | ||
| // Otherwise, create a new snapshot/stream split based on offset and start the reader. | ||
| SourceSplitBase split = null; | ||
| SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); | ||
| if (currentSplitRecords == null) { | ||
| Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader(); | ||
| if (baseReq.isReload() || currentReader == null) { | ||
| LOG.info( | ||
| "No current reader or reload {}, create new split reader for job {}", | ||
| baseReq.isReload(), | ||
| baseReq.getJobId()); | ||
| // build split | ||
| Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); | ||
| split = splitFlag.f0; | ||
| // closeBinlogReader(); | ||
| currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); | ||
| this.setCurrentSplitRecords(currentSplitRecords); | ||
| this.setCurrentSplit(split); | ||
| } else if (currentReader instanceof IncrementalSourceStreamFetcher) { | ||
| LOG.info("Continue poll records with current binlog reader"); | ||
| // only for binlog reader | ||
| currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); | ||
| split = this.getCurrentSplit(); | ||
| } else { | ||
| throw new RuntimeException("Should not happen"); | ||
| } | ||
| LOG.info("Job {} prepare and submit split with offset: {}", baseReq.getJobId(), offsetMeta); | ||
| // Build split | ||
| Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); | ||
| this.currentSplit = splitFlag.f0; | ||
| LOG.info("Get a split: {}", this.currentSplit.toString()); | ||
|
|
||
| // Create reader based on split type | ||
| if (this.currentSplit.isSnapshotSplit()) { | ||
| this.currentReader = getSnapshotSplitReader(baseReq); | ||
| } else if (this.currentSplit.isStreamSplit()) { | ||
| this.currentReader = getBinlogSplitReader(baseReq); | ||
| } else { | ||
| LOG.info( | ||
| "Continue read records with current split records, splitId: {}", | ||
| currentSplitRecords.getSplitId()); | ||
| throw new IllegalStateException( | ||
| "Unknown split type: " + this.currentSplit.getClass().getName()); | ||
| } | ||
|
|
||
| // build response with iterator | ||
| SplitReadResult result = new SplitReadResult(); | ||
| SourceSplitState currentSplitState = null; | ||
| SourceSplitBase currentSplit = this.getCurrentSplit(); | ||
| if (currentSplit.isSnapshotSplit()) { | ||
| currentSplitState = new SnapshotSplitState(currentSplit.asSnapshotSplit()); | ||
| // Submit split | ||
| FetchTask<SourceSplitBase> splitFetchTask = | ||
| createFetchTaskFromSplit(baseReq, this.currentSplit); | ||
| this.currentReader.submitTask(splitFetchTask); | ||
| this.setCurrentFetchTask(splitFetchTask); | ||
|
|
||
| // Create split state | ||
| SourceSplitState currentSplitState; | ||
| if (this.currentSplit.isSnapshotSplit()) { | ||
| currentSplitState = new SnapshotSplitState(this.currentSplit.asSnapshotSplit()); | ||
| } else { | ||
| currentSplitState = new StreamSplitState(currentSplit.asStreamSplit()); | ||
| currentSplitState = new StreamSplitState(this.currentSplit.asStreamSplit()); | ||
| } | ||
|
|
||
| Iterator<SourceRecord> filteredIterator = | ||
| new FilteredRecordIterator(currentSplitRecords, currentSplitState); | ||
|
|
||
| result.setRecordIterator(filteredIterator); | ||
| // Return result without iterator | ||
| SplitReadResult result = new SplitReadResult(); | ||
| result.setSplit(this.currentSplit); | ||
| result.setSplitState(currentSplitState); | ||
| result.setSplit(split); | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
| public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException { | ||
| Preconditions.checkState(this.currentReader != null, "currentReader is null"); | ||
| Preconditions.checkNotNull(splitState, "splitState is null"); | ||
| Preconditions.checkState( | ||
| splitState instanceof SourceSplitState, | ||
| "splitState type is invalid " + splitState.getClass()); | ||
|
|
||
| // Poll data from Debezium queue | ||
| Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords(); | ||
| if (dataIt == null || !dataIt.hasNext()) { | ||
| return Collections.emptyIterator(); // No data available | ||
| } | ||
|
|
||
| SourceRecords sourceRecords = dataIt.next(); | ||
| SplitRecords splitRecords = | ||
| new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator()); | ||
| if (!sourceRecords.getSourceRecordList().isEmpty()) { | ||
| LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size()); | ||
| } | ||
|
|
||
| // Return filtered iterator | ||
| return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState); | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to MySqlSourceReader, the SourceReader instance is shared per jobId, and currentReader/currentSplit are instance variables. Concurrent calls to prepareAndSubmitSplit or pollRecords for the same job could cause race conditions. Consider adding synchronization.
| if (hasReceivedData || timeoutReached) { | ||
| LOG.info( | ||
| "Heartbeat received after {} data records, stopping", | ||
| recordResponse.getRecords().size()); | ||
| shouldStop = true; | ||
| break; | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the fetchRecords flow, if hasReceivedData is true (data was received) or timeoutReached is true, the loop stops immediately when a heartbeat is received. However, the condition at line 157 uses OR logic: "if (hasReceivedData || timeoutReached)". This means that even if we have received data but timeout hasn't been reached yet, we'll stop on the first heartbeat. This could result in returning prematurely when more data is still available in the queue. Consider using AND logic: "if (hasReceivedData && timeoutReached)" to only stop when both conditions are met.
| sourceReader.pollRecords(readResult.getSplitState()); | ||
|
|
||
| if (!recordIterator.hasNext()) { | ||
| Thread.sleep(100); |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an InterruptedException occurs during Thread.sleep(100), the exception is thrown but the thread's interrupted status is not preserved. This violates the standard practice for handling InterruptedException. Consider either catching and re-interrupting the thread, or propagating the InterruptedException properly. For example: catch (InterruptedException e) { Thread.currentThread().interrupt(); throw e; }
| Thread.sleep(100); | |
| try { | |
| Thread.sleep(100); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw e; | |
| } |
| nextRecord = element; | ||
| return true; |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilteredRecordIterator now returns heartbeat events (sets nextRecord and returns true), but PipelineCoordinator also checks for heartbeat events separately. This creates double processing of heartbeat events. The FilteredRecordIterator should either filter out heartbeat events completely (not return them) or PipelineCoordinator should not do separate heartbeat checking. Consider either:
- Removing lines 720-721 so heartbeat events are filtered out in the iterator (like watermark events), OR
- Update PipelineCoordinator to not check for heartbeat events since the iterator already handles offset updates
| nextRecord = element; | |
| return true; |
| nextRecord = element; | ||
| return true; |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to MySqlSourceReader, FilteredRecordIterator now returns heartbeat events (sets nextRecord and returns true at lines 597-598), but PipelineCoordinator also checks for heartbeat events separately. This creates double processing. The iterator should either filter out heartbeat events completely or PipelineCoordinator should not do separate heartbeat checking.
| nextRecord = element; | |
| return true; | |
| // Do not emit heartbeat events as records; only update state. | |
| continue; |
TPC-H: Total hot run time: 31211 ms |
TPC-DS: Total hot run time: 172749 ms |
ClickBench: Total hot run time: 27.19 s |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #58898 #59461
Optimize CDC consumption strategy
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)