Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public String getTimeoutReason() {
log.warn("Failed to get task timeout reason, response: {}", response);
}
} catch (ExecutionException | InterruptedException ex) {
log.error("Send get task fail reason request failed: ", ex);
log.error("Send get fail reason request failed: ", ex);
}
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class Constants {
public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L;

// Debezium default properties
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
public static final String DEBEZIUM_MAX_QUEUE_SIZE = "162580";
public static final String DEBEZIUM_MAX_BATCH_SIZE = "40960";
public static final String DEBEZIUM_POLL_INTERVAL_MS = "50";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();
private final Map<String, ReadWriteLock> bufferMapLock = new ConcurrentHashMap<>();
@Setter private String currentTaskId;
@Setter @Getter private String currentTaskId;
private String targetDb;
private long jobId;
@Setter private String token;
Expand Down Expand Up @@ -481,13 +482,14 @@ public void resetTaskId() {
}

/** commit offfset to frontends. */
public void commitOffset(Map<String, String> meta, long scannedRows, long scannedBytes) {
public void commitOffset(
String taskId, Map<String, String> meta, long scannedRows, long scannedBytes) {
try {
String url = String.format(COMMIT_URL_PATTERN, frontendAddress, targetDb);
Map<String, Object> commitParams = new HashMap<>();
commitParams.put("offset", OBJECT_MAPPER.writeValueAsString(meta));
commitParams.put("jobId", jobId);
commitParams.put("taskId", currentTaskId);
commitParams.put("taskId", taskId);
commitParams.put("scannedRows", scannedRows);
commitParams.put("scannedBytes", scannedBytes);
String param = OBJECT_MAPPER.writeValueAsString(commitParams);
Expand All @@ -501,8 +503,7 @@ public void commitOffset(Map<String, String> meta, long scannedRows, long scanne
.commit()
.setEntity(new StringEntity(param));

LOG.info(
"commit offset for jobId {} taskId {}, params {}", jobId, currentTaskId, param);
LOG.info("commit offset for jobId {} taskId {}, params {}", jobId, taskId, param);
Throwable resEx = null;
int retry = 0;
while (retry <= RETRY) {
Expand All @@ -516,7 +517,7 @@ public void commitOffset(Map<String, String> meta, long scannedRows, long scanne
: "";
LOG.info("commit result {}", responseBody);
if (statusCode == 200) {
LOG.info("commit offset for jobId {} taskId {}", jobId, currentTaskId);
LOG.info("commit offset for jobId {} taskId {}", jobId, taskId);
// A 200 response indicates that the request was successful, and
// information such as offset and statistics may have already been
// updated. Retrying may result in repeated updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.cdcclient.source.reader;

import org.apache.doris.cdcclient.common.Constants;
import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
import org.apache.doris.cdcclient.source.factory.DataSource;
Expand Down Expand Up @@ -51,8 +50,6 @@
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
Expand All @@ -61,6 +58,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -149,64 +147,73 @@ public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq)
}

@Override
public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exception {
public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta);

// If there is an active split being consumed, reuse it directly;
// Otherwise, create a new snapshot/stream split based on offset and start the reader.
SourceSplitBase split = null;
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
if (currentSplitRecords == null) {
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (baseReq.isReload() || currentReader == null) {
LOG.info(
"No current reader or reload {}, create new split reader for job {}",
baseReq.isReload(),
baseReq.getJobId());
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
// closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
} else if (currentReader instanceof IncrementalSourceStreamFetcher) {
LOG.info("Continue poll records with current binlog reader");
// only for binlog reader
currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader);
split = this.getCurrentSplit();
} else {
throw new RuntimeException("Should not happen");
}
LOG.info("Job {} prepare and submit split with offset: {}", baseReq.getJobId(), offsetMeta);
// Build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
this.currentSplit = splitFlag.f0;
LOG.info("Get a split: {}", this.currentSplit.toString());

// Create reader based on split type
if (this.currentSplit.isSnapshotSplit()) {
this.currentReader = getSnapshotSplitReader(baseReq);
} else if (this.currentSplit.isStreamSplit()) {
this.currentReader = getBinlogSplitReader(baseReq);
} else {
LOG.info(
"Continue read records with current split records, splitId: {}",
currentSplitRecords.getSplitId());
throw new IllegalStateException(
"Unknown split type: " + this.currentSplit.getClass().getName());
}
Comment on lines +162 to 169
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, a new reader is created every time prepareAndSubmitSplit is called (lines 163 or 165), but the old currentReader is not closed before being replaced. This could lead to resource leaks. Consider closing the existing currentReader before creating a new one.

Copilot uses AI. Check for mistakes.

// build response with iterator
SplitReadResult result = new SplitReadResult();
SourceSplitState currentSplitState = null;
SourceSplitBase currentSplit = this.getCurrentSplit();
if (currentSplit.isSnapshotSplit()) {
currentSplitState = new SnapshotSplitState(currentSplit.asSnapshotSplit());
// Submit split
FetchTask<SourceSplitBase> splitFetchTask =
createFetchTaskFromSplit(baseReq, this.currentSplit);
this.currentReader.submitTask(splitFetchTask);
this.setCurrentFetchTask(splitFetchTask);

// Create split state
SourceSplitState currentSplitState;
if (this.currentSplit.isSnapshotSplit()) {
currentSplitState = new SnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
currentSplitState = new StreamSplitState(currentSplit.asStreamSplit());
currentSplitState = new StreamSplitState(this.currentSplit.asStreamSplit());
}

Iterator<SourceRecord> filteredIterator =
new FilteredRecordIterator(currentSplitRecords, currentSplitState);

result.setRecordIterator(filteredIterator);
// Return result without iterator
SplitReadResult result = new SplitReadResult();
result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
result.setSplit(split);
return result;
}

@Override
public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException {
Preconditions.checkState(this.currentReader != null, "currentReader is null");
Preconditions.checkNotNull(splitState, "splitState is null");
Preconditions.checkState(
splitState instanceof SourceSplitState,
"splitState type is invalid " + splitState.getClass());

// Poll data from Debezium queue
Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
return Collections.emptyIterator(); // No data available
}

SourceRecords sourceRecords = dataIt.next();
SplitRecords splitRecords =
new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator());
if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size());
}

// Return filtered iterator
return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState);
}
Comment on lines +193 to +215
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, if pollRecords is called multiple times on the same split, a new FilteredRecordIterator is created each time while reusing the underlying currentSplit and currentReader. This could lead to state inconsistencies if the iterator is not fully consumed before the next call to pollRecords.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +215
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, the SourceReader instance is shared per jobId, and currentReader/currentSplit are instance variables. Concurrent calls to prepareAndSubmitSplit or pollRecords for the same job could cause race conditions. Consider adding synchronization.

Copilot uses AI. Check for mistakes.

protected abstract DataType fromDbzColumn(Column splitColumn);

protected abstract Fetcher<SourceRecords, SourceSplitBase> getSnapshotSplitReader(
Expand Down Expand Up @@ -453,91 +460,6 @@ private static void closeChunkSplitterOnly(HybridSplitAssigner<?> splitAssigner)
}
}

private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseConfig jobConfig)
throws Exception {
Preconditions.checkState(split != null, "split is null");
SourceRecords sourceRecords = null;
String currentSplitId = null;
Fetcher<SourceRecords, SourceSplitBase> currentReader = null;
LOG.info("Get a split: {}", split.splitId());
if (split.isSnapshotSplit()) {
currentReader = getSnapshotSplitReader(jobConfig);
} else if (split.isStreamSplit()) {
currentReader = getBinlogSplitReader(jobConfig);
}
this.setCurrentReader(currentReader);
FetchTask<SourceSplitBase> splitFetchTask = createFetchTaskFromSplit(jobConfig, split);
currentReader.submitTask(splitFetchTask);
currentSplitId = split.splitId();
this.setCurrentFetchTask(splitFetchTask);
// make split record available
sourceRecords =
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
if (currentReader instanceof IncrementalSourceScanFetcher) {
closeCurrentReader();
}
return new SplitRecords(currentSplitId, sourceRecords.iterator());
}

private SplitRecords pollSplitRecordsWithCurrentReader(
Fetcher<SourceRecords, SourceSplitBase> currentReader) throws Exception {
Iterator<SourceRecords> dataIt = null;
if (currentReader instanceof IncrementalSourceStreamFetcher) {
dataIt = currentReader.pollSplitRecords();
return dataIt == null
? null
: new SplitRecords(STREAM_SPLIT_ID, dataIt.next().iterator());
} else {
throw new IllegalStateException("Unsupported reader type.");
}
}

/**
* Split tasks are submitted asynchronously, and data is sent to the Debezium queue. Therefore,
* there will be a time interval between retrieving data; it's necessary to fetch data until the
* queue has data.
*/
private SourceRecords pollUntilDataAvailable(
Fetcher<SourceRecords, SourceSplitBase> reader, long maxWaitTimeMs, long pollIntervalMs)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long elapsedTime = 0;
int attemptCount = 0;
LOG.info("Polling until data available");
Iterator<SourceRecords> lastDataIt = null;
while (elapsedTime < maxWaitTimeMs) {
attemptCount++;
lastDataIt = reader.pollSplitRecords();
if (lastDataIt != null && lastDataIt.hasNext()) {
SourceRecords sourceRecords = lastDataIt.next();
if (sourceRecords != null && !sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info(
"Data available after {} ms ({} attempts). {} Records received.",
elapsedTime,
attemptCount,
sourceRecords.getSourceRecordList().size());
// todo: poll until heartbeat ?
return sourceRecords;
}
}

// No records yet, continue polling
if (elapsedTime + pollIntervalMs < maxWaitTimeMs) {
Thread.sleep(pollIntervalMs);
elapsedTime = System.currentTimeMillis() - startTime;
} else {
// Last attempt before timeout
break;
}
}

LOG.warn(
"Timeout: No data (heartbeat or data change) received after {} ms ({} attempts).",
elapsedTime,
attemptCount);
return new SourceRecords(new ArrayList<>());
}

private void closeCurrentReader() {
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (currentReader != null) {
Expand Down Expand Up @@ -672,6 +594,8 @@ public boolean hasNext() {
Offset position = createOffset(element.sourceOffset());
splitState.asStreamSplitState().setStartingOffset(position);
}
nextRecord = element;
return true;
Comment on lines +597 to +598
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MySqlSourceReader, FilteredRecordIterator now returns heartbeat events (sets nextRecord and returns true at lines 597-598), but PipelineCoordinator also checks for heartbeat events separately. This creates double processing. The iterator should either filter out heartbeat events completely or PipelineCoordinator should not do separate heartbeat checking.

Suggested change
nextRecord = element;
return true;
// Do not emit heartbeat events as records; only update state.
continue;

Copilot uses AI. Check for mistakes.
} else if (SourceRecordUtils.isDataChangeRecord(element)) {
if (splitState.isStreamSplitState()) {
Offset position = createOffset(element.sourceOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.connect.source.SourceRecord;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -41,12 +42,11 @@ public interface SourceReader {
/** Divide the data to be read. For example: split mysql to chunks */
List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest config);

/**
* 1. If the SplitRecords iterator has it, read the iterator directly. 2. If there is a stream
* reader, poll it. 3. If there is none, resubmit split. 4. If reload is true, need to reset
* streamSplitReader and submit split.
*/
SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exception;
/** Construct a split and submit a split reading task. */
SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throws Exception;

/** Retrieve data from the current split. */
Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException;

/** Extract offset information from snapshot split state. */
Map<String, String> extractSnapshotStateOffset(Object splitState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
package org.apache.doris.cdcclient.source.reader;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Iterator;

import lombok.Data;

/** The result of reading a split with iterator. */
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class comment still says "The result of reading a split with iterator" but the recordIterator field has been removed as part of this refactoring. The comment should be updated to reflect that this class now only contains the split and splitState, and that iteration is now handled separately via the pollRecords method.

Suggested change
/** The result of reading a split with iterator. */
/**
* Container for a source split and its associated state.
* Iteration over records for this split is handled separately (for example via pollRecords).
*/

Copilot uses AI. Check for mistakes.
@Data
public class SplitReadResult {
private Iterator<SourceRecord> recordIterator;
// MySqlSplitState, SourceSplitState
private Object splitState;
// MySqlSplit SourceSplitBase
Expand Down
Loading
Loading