Skip to content

Conversation

@JNSimba
Copy link
Member

@JNSimba JNSimba commented Jan 23, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #58898 #59461

Optimize CDC consumption strategy

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba
Copy link
Member Author

JNSimba commented Jan 23, 2026

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the CDC (Change Data Capture) consumption strategy for streaming jobs by refactoring the data polling mechanism and improving heartbeat handling.

Changes:

  • Refactored the SourceReader interface to split readSplitRecords into two methods: prepareAndSubmitSplit (for split preparation) and pollRecords (for data retrieval)
  • Moved polling logic from individual readers to the PipelineCoordinator, introducing heartbeat-based synchronization to determine when to stop polling
  • Reduced the Debezium heartbeat interval from 10 seconds to 3 seconds for faster offset updates and connection issue detection

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
Constants.java Reduced DEBEZIUM_HEARTBEAT_INTERVAL_MS from 10000ms to 3000ms
SourceReader.java Split readSplitRecords into prepareAndSubmitSplit and pollRecords methods
SplitReadResult.java Removed recordIterator field as polling is now handled at coordinator level
JdbcIncrementalSourceReader.java Implemented new split preparation/polling pattern, removed old pollUntilDataAvailable logic
MySqlSourceReader.java Similar refactoring as JdbcIncrementalSourceReader for MySQL-specific implementation
PostgresSourceReader.java Added heartbeat interval configuration for Postgres CDC
PipelineCoordinator.java Major refactoring: added heartbeat-aware polling loops in fetchRecords and writeRecords, extracted helper methods for cleanup and offset extraction
StreamingMultiTblTask.java Minor log message simplification
Comments suppressed due to low confidence (1)

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:355

  • The closeJobStreamLoad method is not synchronized, which could lead to a race condition with getOrCreateBatchStreamLoad. If closeJobStreamLoad is called while another thread is accessing the batch stream load, the stream load could be removed and closed while being used, potentially causing NullPointerException or data loss. Consider synchronizing this method or using a more fine-grained locking mechanism.
    public void closeJobStreamLoad(Long jobId) {
        DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId);
        if (batchStreamLoad != null) {
            LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId);
            batchStreamLoad.close();
            batchStreamLoad = null;
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 182 to 187
if (this.currentSplit instanceof MySqlSnapshotSplit) {
this.currentReader = getSnapshotSplitReader(baseReq);
} else if (this.currentSplit instanceof MySqlBinlogSplit) {
this.currentReader = getBinlogSplitReader(baseReq);
}
this.currentReader.submitSplit(this.currentSplit);
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential resource leak: if prepareAndSubmitSplit is called on a SourceReader that already has a currentReader, the old reader will be overwritten without being closed. Since SourceReader instances are reused across multiple requests for the same jobId (as seen in Env.getOrCreateReader), this could happen if finishSplitRecords fails or if there's an exception before cleanup. Consider closing the existing currentReader before creating a new one, or add a check to ensure currentReader is null before proceeding.

Copilot uses AI. Check for mistakes.
@doris-robot
Copy link

TPC-H: Total hot run time: 30679 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 6bfc454ee6d045f4fc0697234b62203eeb073a80, data reload: false

------ Round 1 ----------------------------------
q1	17596	4798	4843	4798
q2	2043	321	199	199
q3	10230	1304	710	710
q4	10192	790	295	295
q5	7540	2072	1819	1819
q6	197	176	139	139
q7	857	687	587	587
q8	9249	1284	1081	1081
q9	4905	4542	4495	4495
q10	6784	1637	1290	1290
q11	524	286	281	281
q12	336	372	213	213
q13	17776	3830	3069	3069
q14	236	227	214	214
q15	604	522	516	516
q16	650	637	574	574
q17	640	803	472	472
q18	6687	6480	6214	6214
q19	1221	968	636	636
q20	394	332	225	225
q21	2560	1935	1904	1904
q22	1016	989	948	948
Total cold run time: 102237 ms
Total hot run time: 30679 ms

----- Round 2, with runtime_filter_mode=off -----
q1	4750	4702	4683	4683
q2	324	390	322	322
q3	2119	2639	2265	2265
q4	1312	1720	1323	1323
q5	4041	3945	3986	3945
q6	210	174	130	130
q7	1908	1856	1660	1660
q8	2771	2445	2468	2445
q9	7455	7109	7198	7109
q10	2585	2750	2397	2397
q11	617	480	469	469
q12	736	820	643	643
q13	3656	4088	3392	3392
q14	312	331	286	286
q15	624	509	502	502
q16	627	682	644	644
q17	1126	1264	1368	1264
q18	8275	8033	7678	7678
q19	872	815	819	815
q20	1965	2037	1945	1945
q21	4880	4500	4103	4103
q22	1092	1028	974	974
Total cold run time: 52257 ms
Total hot run time: 48994 ms

@doris-robot
Copy link

TPC-DS: Total hot run time: 172675 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit 6bfc454ee6d045f4fc0697234b62203eeb073a80, data reload: false

query5	4439	643	510	510
query6	333	232	209	209
query7	4224	459	259	259
query8	355	264	242	242
query9	8700	2858	2820	2820
query10	463	340	278	278
query11	15151	15122	15029	15029
query12	175	119	117	117
query13	1246	466	337	337
query14	5566	3033	2746	2746
query14_1	2684	2639	2651	2639
query15	201	192	175	175
query16	967	507	473	473
query17	973	677	561	561
query18	2440	440	335	335
query19	191	177	156	156
query20	126	119	118	118
query21	224	147	121	121
query22	3919	4198	3789	3789
query23	16140	15956	15347	15347
query23_1	15526	15471	15464	15464
query24	7078	1530	1164	1164
query24_1	1171	1148	1174	1148
query25	545	455	403	403
query26	1238	273	154	154
query27	2764	433	278	278
query28	4574	2146	2129	2129
query29	791	557	455	455
query30	315	248	210	210
query31	778	621	546	546
query32	88	83	76	76
query33	559	372	310	310
query34	899	895	542	542
query35	745	758	699	699
query36	889	923	802	802
query37	132	96	88	88
query38	2729	2673	2675	2673
query39	779	748	741	741
query39_1	715	741	706	706
query40	222	136	122	122
query41	75	67	67	67
query42	98	93	94	93
query43	475	464	421	421
query44	1308	751	739	739
query45	189	190	181	181
query46	831	950	574	574
query47	1438	1513	1376	1376
query48	330	323	248	248
query49	623	451	364	364
query50	677	265	218	218
query51	3763	3741	3728	3728
query52	93	97	82	82
query53	212	225	168	168
query54	298	270	264	264
query55	84	80	77	77
query56	318	311	310	310
query57	1023	1012	894	894
query58	290	277	273	273
query59	1989	2227	2023	2023
query60	352	341	325	325
query61	193	148	147	147
query62	402	371	312	312
query63	189	157	154	154
query64	4982	1159	860	860
query65	3866	3722	3725	3722
query66	1464	429	320	320
query67	15391	15504	15469	15469
query68	2389	1066	734	734
query69	408	309	286	286
query70	1001	959	871	871
query71	299	290	260	260
query72	5251	3119	3188	3119
query73	595	723	325	325
query74	8694	8808	8500	8500
query75	2275	2315	1894	1894
query76	2268	1051	638	638
query77	356	371	308	308
query78	9747	9914	9124	9124
query79	1385	881	586	586
query80	1288	533	447	447
query81	551	258	232	232
query82	1046	152	122	122
query83	360	270	247	247
query84	295	116	97	97
query85	867	481	452	452
query86	410	290	292	290
query87	2833	2820	2736	2736
query88	3512	2570	2522	2522
query89	309	268	236	236
query90	1951	170	163	163
query91	161	158	133	133
query92	77	70	69	69
query93	1148	1039	638	638
query94	642	322	302	302
query95	604	334	373	334
query96	630	506	225	225
query97	2357	2380	2340	2340
query98	213	205	200	200
query99	612	596	504	504
Total cold run time: 245645 ms
Total hot run time: 172675 ms

@doris-robot
Copy link

ClickBench: Total hot run time: 26.61 s
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/clickbench-tools
ClickBench test result on commit 6bfc454ee6d045f4fc0697234b62203eeb073a80, data reload: false

query1	0.06	0.05	0.05
query2	0.10	0.05	0.04
query3	0.25	0.08	0.09
query4	1.61	0.12	0.11
query5	0.28	0.26	0.27
query6	1.15	0.65	0.65
query7	0.03	0.03	0.03
query8	0.06	0.04	0.04
query9	0.57	0.50	0.50
query10	0.55	0.54	0.56
query11	0.15	0.10	0.10
query12	0.14	0.11	0.11
query13	0.59	0.58	0.58
query14	0.95	0.97	0.92
query15	0.78	0.78	0.79
query16	0.39	0.39	0.39
query17	1.00	1.02	1.07
query18	0.23	0.22	0.21
query19	1.97	1.84	1.89
query20	0.02	0.01	0.01
query21	15.42	0.25	0.14
query22	5.34	0.05	0.04
query23	16.00	0.29	0.10
query24	1.66	0.52	0.25
query25	0.11	0.06	0.08
query26	0.14	0.13	0.14
query27	0.05	0.08	0.04
query28	3.96	1.06	0.88
query29	12.57	3.91	3.12
query30	0.29	0.14	0.11
query31	2.86	0.63	0.39
query32	3.26	0.56	0.45
query33	3.05	3.02	2.97
query34	15.91	5.11	4.40
query35	4.44	4.42	4.50
query36	0.64	0.50	0.49
query37	0.11	0.06	0.06
query38	0.07	0.05	0.04
query39	0.04	0.03	0.03
query40	0.18	0.15	0.13
query41	0.09	0.03	0.03
query42	0.04	0.03	0.03
query43	0.05	0.04	0.04
Total cold run time: 97.16 s
Total hot run time: 26.61 s

@hello-stephen
Copy link
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/1) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Contributor

FE Regression Coverage Report

Increment line coverage 0.00% (0/1) 🎉
Increment coverage report
Complete coverage report

@JNSimba JNSimba requested a review from Copilot January 23, 2026 12:45
@JNSimba
Copy link
Member Author

JNSimba commented Jan 23, 2026

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +193 to +215
public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException {
Preconditions.checkState(this.currentReader != null, "currentReader is null");
Preconditions.checkNotNull(splitState, "splitState is null");
Preconditions.checkState(
splitState instanceof SourceSplitState,
"splitState type is invalid " + splitState.getClass());

// Poll data from Debezium queue
Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
return Collections.emptyIterator(); // No data available
}

SourceRecords sourceRecords = dataIt.next();
SplitRecords splitRecords =
new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator());
if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size());
}

// Return filtered iterator
return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState);
}
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, if pollRecords is called multiple times on the same split, a new FilteredRecordIterator is created each time while reusing the underlying currentSplit and currentReader. This could lead to state inconsistencies if the iterator is not fully consumed before the next call to pollRecords.

Copilot uses AI. Check for mistakes.
Comment on lines +162 to 169
if (this.currentSplit.isSnapshotSplit()) {
this.currentReader = getSnapshotSplitReader(baseReq);
} else if (this.currentSplit.isStreamSplit()) {
this.currentReader = getBinlogSplitReader(baseReq);
} else {
LOG.info(
"Continue read records with current split records, splitId: {}",
currentSplitRecords.getSplitId());
throw new IllegalStateException(
"Unknown split type: " + this.currentSplit.getClass().getName());
}
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, a new reader is created every time prepareAndSubmitSplit is called (lines 163 or 165), but the old currentReader is not closed before being replaced. This could lead to resource leaks. Consider closing the existing currentReader before creating a new one.

Copilot uses AI. Check for mistakes.
Comment on lines +382 to +389
// 1. Snapshot split with data: if no more data in queue, stop immediately (no need to wait
// for timeout)
// snapshot split will be written to the debezium queue all at once.
if (isSnapshotSplit && hasData) {
LOG.info(
"Snapshot split finished, no more data available. Total elapsed: {} ms",
elapsedTime);
return true;
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shouldStop logic for snapshot splits at lines 385-389 returns true when "isSnapshotSplit && hasData" without checking if there's more data in the iterator. This is based on the assumption that "snapshot split will be written to the debezium queue all at once" (line 384). However, the check happens when "!recordIterator.hasNext()" (line 121), which means we've already exhausted the current batch from the queue. For snapshot splits, this is correct behavior, but the comment at line 384 could be misleading if Debezium actually streams snapshot data in multiple batches. Consider verifying this assumption or adding more defensive checks.

Copilot uses AI. Check for mistakes.
Comment on lines +418 to +423
if (elapsedTime > maxIntervalMillis + Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
LOG.warn(
"Binlog split heartbeat wait timeout after {} ms, force stopping. Total elapsed: {} ms",
elapsedTime - maxIntervalMillis,
elapsedTime);
return true;
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat wait protection at line 418 uses "maxIntervalMillis + Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3" as the maximum wait time. With the new heartbeat interval of 3000ms (3 seconds), this gives 9 extra seconds beyond the timeout. However, this magic number "3" is not documented. Consider adding a constant or comment explaining why 3 heartbeat intervals are chosen as the maximum wait time for the heartbeat protection.

Copilot uses AI. Check for mistakes.
Comment on lines +430 to +446
private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad(
WriteRecordRequest writeRecordRequest) {
DorisBatchStreamLoad batchStreamLoad =
batchStreamLoadMap.computeIfAbsent(
writeRecordRequest.getJobId(),
k -> {
LOG.info(
"Create DorisBatchStreamLoad for jobId={}",
writeRecordRequest.getJobId());
return new DorisBatchStreamLoad(
writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
});
batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
batchStreamLoad.setToken(writeRecordRequest.getToken());
return batchStreamLoad;
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method getOrCreateBatchStreamLoad has been synchronized, which is good for thread safety. However, after creating or retrieving the DorisBatchStreamLoad, the code sets mutable properties (currentTaskId, frontendAddress, token) on lines 443-445. If multiple threads call writeRecords concurrently for the same jobId, these properties could be overwritten by another thread between setting them and using them in the writeRecords method. Consider either: 1) keeping these as parameters passed through rather than set as instance state, or 2) documenting that writeRecords should not be called concurrently for the same jobId.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +215
public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta);

// If there is an active split being consumed, reuse it directly;
// Otherwise, create a new snapshot/stream split based on offset and start the reader.
SourceSplitBase split = null;
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
if (currentSplitRecords == null) {
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (baseReq.isReload() || currentReader == null) {
LOG.info(
"No current reader or reload {}, create new split reader for job {}",
baseReq.isReload(),
baseReq.getJobId());
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
// closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
} else if (currentReader instanceof IncrementalSourceStreamFetcher) {
LOG.info("Continue poll records with current binlog reader");
// only for binlog reader
currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader);
split = this.getCurrentSplit();
} else {
throw new RuntimeException("Should not happen");
}
LOG.info("Job {} prepare and submit split with offset: {}", baseReq.getJobId(), offsetMeta);
// Build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
this.currentSplit = splitFlag.f0;
LOG.info("Get a split: {}", this.currentSplit.toString());

// Create reader based on split type
if (this.currentSplit.isSnapshotSplit()) {
this.currentReader = getSnapshotSplitReader(baseReq);
} else if (this.currentSplit.isStreamSplit()) {
this.currentReader = getBinlogSplitReader(baseReq);
} else {
LOG.info(
"Continue read records with current split records, splitId: {}",
currentSplitRecords.getSplitId());
throw new IllegalStateException(
"Unknown split type: " + this.currentSplit.getClass().getName());
}

// build response with iterator
SplitReadResult result = new SplitReadResult();
SourceSplitState currentSplitState = null;
SourceSplitBase currentSplit = this.getCurrentSplit();
if (currentSplit.isSnapshotSplit()) {
currentSplitState = new SnapshotSplitState(currentSplit.asSnapshotSplit());
// Submit split
FetchTask<SourceSplitBase> splitFetchTask =
createFetchTaskFromSplit(baseReq, this.currentSplit);
this.currentReader.submitTask(splitFetchTask);
this.setCurrentFetchTask(splitFetchTask);

// Create split state
SourceSplitState currentSplitState;
if (this.currentSplit.isSnapshotSplit()) {
currentSplitState = new SnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
currentSplitState = new StreamSplitState(currentSplit.asStreamSplit());
currentSplitState = new StreamSplitState(this.currentSplit.asStreamSplit());
}

Iterator<SourceRecord> filteredIterator =
new FilteredRecordIterator(currentSplitRecords, currentSplitState);

result.setRecordIterator(filteredIterator);
// Return result without iterator
SplitReadResult result = new SplitReadResult();
result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
result.setSplit(split);
return result;
}

@Override
public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException {
Preconditions.checkState(this.currentReader != null, "currentReader is null");
Preconditions.checkNotNull(splitState, "splitState is null");
Preconditions.checkState(
splitState instanceof SourceSplitState,
"splitState type is invalid " + splitState.getClass());

// Poll data from Debezium queue
Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
return Collections.emptyIterator(); // No data available
}

SourceRecords sourceRecords = dataIt.next();
SplitRecords splitRecords =
new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator());
if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size());
}

// Return filtered iterator
return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState);
}
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, the SourceReader instance is shared per jobId, and currentReader/currentSplit are instance variables. Concurrent calls to prepareAndSubmitSplit or pollRecords for the same job could cause race conditions. Consider adding synchronization.

Copilot uses AI. Check for mistakes.
Comment on lines +157 to +163
if (hasReceivedData || timeoutReached) {
LOG.info(
"Heartbeat received after {} data records, stopping",
recordResponse.getRecords().size());
shouldStop = true;
break;
}
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the fetchRecords flow, if hasReceivedData is true (data was received) or timeoutReached is true, the loop stops immediately when a heartbeat is received. However, the condition at line 157 uses OR logic: "if (hasReceivedData || timeoutReached)". This means that even if we have received data but timeout hasn't been reached yet, we'll stop on the first heartbeat. This could result in returning prematurely when more data is still available in the queue. Consider using AND logic: "if (hasReceivedData && timeoutReached)" to only stop when both conditions are met.

Copilot uses AI. Check for mistakes.
sourceReader.pollRecords(readResult.getSplitState());

if (!recordIterator.hasNext()) {
Thread.sleep(100);
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an InterruptedException occurs during Thread.sleep(100), the exception is thrown but the thread's interrupted status is not preserved. This violates the standard practice for handling InterruptedException. Consider either catching and re-interrupting the thread, or propagating the InterruptedException properly. For example: catch (InterruptedException e) { Thread.currentThread().interrupt(); throw e; }

Suggested change
Thread.sleep(100);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}

Copilot uses AI. Check for mistakes.
Comment on lines +720 to +721
nextRecord = element;
return true;
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilteredRecordIterator now returns heartbeat events (sets nextRecord and returns true), but PipelineCoordinator also checks for heartbeat events separately. This creates double processing of heartbeat events. The FilteredRecordIterator should either filter out heartbeat events completely (not return them) or PipelineCoordinator should not do separate heartbeat checking. Consider either:

  1. Removing lines 720-721 so heartbeat events are filtered out in the iterator (like watermark events), OR
  2. Update PipelineCoordinator to not check for heartbeat events since the iterator already handles offset updates
Suggested change
nextRecord = element;
return true;

Copilot uses AI. Check for mistakes.
Comment on lines +597 to +598
nextRecord = element;
return true;
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, FilteredRecordIterator now returns heartbeat events (sets nextRecord and returns true at lines 597-598), but PipelineCoordinator also checks for heartbeat events separately. This creates double processing. The iterator should either filter out heartbeat events completely or PipelineCoordinator should not do separate heartbeat checking.

Suggested change
nextRecord = element;
return true;
// Do not emit heartbeat events as records; only update state.
continue;

Copilot uses AI. Check for mistakes.
@doris-robot
Copy link

TPC-H: Total hot run time: 31211 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 56dabdea3282b772d63225c7ffc6092bb2e29b2f, data reload: false

------ Round 1 ----------------------------------
q1	17612	4739	4559	4559
q2	2013	338	201	201
q3	10220	1326	760	760
q4	10220	868	315	315
q5	7921	2135	1842	1842
q6	198	179	148	148
q7	875	736	597	597
q8	9328	1427	1136	1136
q9	5012	4796	4645	4645
q10	7000	1709	1330	1330
q11	536	319	290	290
q12	373	398	235	235
q13	17976	3829	3136	3136
q14	241	256	221	221
q15	600	522	521	521
q16	639	630	585	585
q17	657	731	543	543
q18	6627	6350	6344	6344
q19	1328	963	633	633
q20	383	340	233	233
q21	2745	2236	1955	1955
q22	1056	1025	982	982
Total cold run time: 103560 ms
Total hot run time: 31211 ms

----- Round 2, with runtime_filter_mode=off -----
q1	4764	4674	4698	4674
q2	333	402	325	325
q3	2149	2681	2246	2246
q4	1349	1763	1315	1315
q5	4130	3969	3986	3969
q6	220	176	136	136
q7	1910	1905	2154	1905
q8	2643	2403	2553	2403
q9	7153	7193	7122	7122
q10	2581	2689	2382	2382
q11	547	506	493	493
q12	740	847	687	687
q13	3696	4131	3416	3416
q14	301	305	285	285
q15	561	514	507	507
q16	655	700	652	652
q17	1230	1283	1375	1283
q18	8226	7999	7771	7771
q19	866	829	866	829
q20	1949	2091	2004	2004
q21	4905	4645	4331	4331
q22	1099	1051	943	943
Total cold run time: 52007 ms
Total hot run time: 49678 ms

@doris-robot
Copy link

TPC-DS: Total hot run time: 172749 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit 56dabdea3282b772d63225c7ffc6092bb2e29b2f, data reload: false

query5	4463	641	488	488
query6	341	217	192	192
query7	4215	454	276	276
query8	339	242	225	225
query9	8684	2822	2887	2822
query10	430	310	274	274
query11	15444	15278	14937	14937
query12	186	116	116	116
query13	1250	469	372	372
query14	5834	3023	2859	2859
query14_1	2805	2814	2752	2752
query15	202	192	169	169
query16	977	482	480	480
query17	1059	643	561	561
query18	2410	414	319	319
query19	190	175	152	152
query20	121	112	113	112
query21	214	135	113	113
query22	3886	4035	3846	3846
query23	15967	15452	15211	15211
query23_1	15259	15421	15523	15421
query24	7151	1536	1160	1160
query24_1	1170	1165	1189	1165
query25	508	425	382	382
query26	1237	271	157	157
query27	2761	450	278	278
query28	4595	2164	2132	2132
query29	736	539	437	437
query30	309	252	202	202
query31	790	631	538	538
query32	84	75	71	71
query33	516	356	310	310
query34	919	864	536	536
query35	723	753	675	675
query36	878	898	734	734
query37	136	107	93	93
query38	2752	2739	2649	2649
query39	788	745	718	718
query39_1	708	722	722	722
query40	220	139	121	121
query41	74	101	62	62
query42	94	92	90	90
query43	428	474	400	400
query44	1329	746	747	746
query45	187	189	176	176
query46	827	942	596	596
query47	1376	1458	1389	1389
query48	323	322	253	253
query49	638	435	344	344
query50	684	269	201	201
query51	3793	3749	3759	3749
query52	93	97	84	84
query53	207	220	170	170
query54	282	252	245	245
query55	77	80	77	77
query56	297	298	295	295
query57	1002	971	856	856
query58	269	260	256	256
query59	2106	2128	1935	1935
query60	334	323	310	310
query61	147	143	141	141
query62	367	375	331	331
query63	198	165	164	164
query64	4880	1144	826	826
query65	3836	3769	3791	3769
query66	1468	441	332	332
query67	15553	15577	15494	15494
query68	2479	1061	721	721
query69	427	331	294	294
query70	924	986	933	933
query71	307	294	272	272
query72	5456	3184	3356	3184
query73	592	726	317	317
query74	8728	8780	8588	8588
query75	2352	2334	1979	1979
query76	2244	1049	661	661
query77	350	382	311	311
query78	9793	9847	9150	9150
query79	1966	918	601	601
query80	1499	503	445	445
query81	539	269	240	240
query82	1005	147	118	118
query83	327	258	239	239
query84	252	123	95	95
query85	876	469	400	400
query86	409	297	280	280
query87	2900	2873	2835	2835
query88	3533	2596	2565	2565
query89	307	270	237	237
query90	1984	183	165	165
query91	160	154	130	130
query92	71	72	74	72
query93	1219	1021	645	645
query94	632	324	279	279
query95	573	336	315	315
query96	643	509	240	240
query97	2344	2365	2305	2305
query98	234	197	205	197
query99	608	561	515	515
Total cold run time: 247066 ms
Total hot run time: 172749 ms

@doris-robot
Copy link

ClickBench: Total hot run time: 27.19 s
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/clickbench-tools
ClickBench test result on commit 56dabdea3282b772d63225c7ffc6092bb2e29b2f, data reload: false

query1	0.05	0.05	0.04
query2	0.10	0.05	0.05
query3	0.26	0.09	0.08
query4	1.61	0.12	0.12
query5	0.28	0.25	0.25
query6	1.14	0.65	0.65
query7	0.02	0.02	0.02
query8	0.05	0.04	0.04
query9	0.57	0.50	0.49
query10	0.55	0.56	0.54
query11	0.14	0.10	0.10
query12	0.14	0.11	0.11
query13	0.61	0.58	0.58
query14	0.96	0.95	0.94
query15	0.80	0.78	0.80
query16	0.40	0.40	0.41
query17	0.99	0.98	1.05
query18	0.23	0.22	0.21
query19	1.86	1.90	1.83
query20	0.02	0.01	0.01
query21	15.43	0.27	0.14
query22	5.04	0.05	0.05
query23	15.93	0.29	0.10
query24	0.92	0.70	0.59
query25	0.11	0.15	0.13
query26	0.14	0.13	0.13
query27	0.08	0.05	0.06
query28	5.03	1.07	0.88
query29	12.53	4.06	3.22
query30	0.28	0.14	0.14
query31	2.82	0.62	0.39
query32	3.23	0.56	0.46
query33	2.96	2.96	3.06
query34	16.12	5.09	4.41
query35	4.47	4.50	4.49
query36	0.66	0.51	0.49
query37	0.11	0.07	0.06
query38	0.07	0.05	0.04
query39	0.04	0.02	0.03
query40	0.16	0.14	0.13
query41	0.08	0.04	0.03
query42	0.04	0.03	0.02
query43	0.05	0.04	0.04
Total cold run time: 97.08 s
Total hot run time: 27.19 s

@hello-stephen
Copy link
Contributor

FE Regression Coverage Report

Increment line coverage 0.00% (0/1) 🎉
Increment coverage report
Complete coverage report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants