-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Fix](StreamingJob) Optimize CDC consumption strategy #60181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,7 +17,6 @@ | |||||||||
|
|
||||||||||
| package org.apache.doris.cdcclient.source.reader; | ||||||||||
|
|
||||||||||
| import org.apache.doris.cdcclient.common.Constants; | ||||||||||
| import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer; | ||||||||||
| import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer; | ||||||||||
| import org.apache.doris.cdcclient.source.factory.DataSource; | ||||||||||
|
|
@@ -51,8 +50,6 @@ | |||||||||
| import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent; | ||||||||||
| import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; | ||||||||||
| import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher; | ||||||||||
| import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher; | ||||||||||
| import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; | ||||||||||
| import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils; | ||||||||||
| import org.apache.flink.table.api.DataTypes; | ||||||||||
| import org.apache.flink.table.types.DataType; | ||||||||||
|
|
@@ -61,6 +58,7 @@ | |||||||||
|
|
||||||||||
| import java.io.IOException; | ||||||||||
| import java.util.ArrayList; | ||||||||||
| import java.util.Collections; | ||||||||||
| import java.util.Comparator; | ||||||||||
| import java.util.HashMap; | ||||||||||
| import java.util.Iterator; | ||||||||||
|
|
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq) | |||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exception { | ||||||||||
| public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throws Exception { | ||||||||||
| Map<String, Object> offsetMeta = baseReq.getMeta(); | ||||||||||
| if (offsetMeta == null || offsetMeta.isEmpty()) { | ||||||||||
| throw new RuntimeException("miss meta offset"); | ||||||||||
| } | ||||||||||
| LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); | ||||||||||
|
|
||||||||||
| // If there is an active split being consumed, reuse it directly; | ||||||||||
| // Otherwise, create a new snapshot/stream split based on offset and start the reader. | ||||||||||
| SourceSplitBase split = null; | ||||||||||
| SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); | ||||||||||
| if (currentSplitRecords == null) { | ||||||||||
| Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader(); | ||||||||||
| if (baseReq.isReload() || currentReader == null) { | ||||||||||
| LOG.info( | ||||||||||
| "No current reader or reload {}, create new split reader for job {}", | ||||||||||
| baseReq.isReload(), | ||||||||||
| baseReq.getJobId()); | ||||||||||
| // build split | ||||||||||
| Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); | ||||||||||
| split = splitFlag.f0; | ||||||||||
| // closeBinlogReader(); | ||||||||||
| currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); | ||||||||||
| this.setCurrentSplitRecords(currentSplitRecords); | ||||||||||
| this.setCurrentSplit(split); | ||||||||||
| } else if (currentReader instanceof IncrementalSourceStreamFetcher) { | ||||||||||
| LOG.info("Continue poll records with current binlog reader"); | ||||||||||
| // only for binlog reader | ||||||||||
| currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); | ||||||||||
| split = this.getCurrentSplit(); | ||||||||||
| } else { | ||||||||||
| throw new RuntimeException("Should not happen"); | ||||||||||
| } | ||||||||||
| LOG.info("Job {} prepare and submit split with offset: {}", baseReq.getJobId(), offsetMeta); | ||||||||||
| // Build split | ||||||||||
| Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); | ||||||||||
| this.currentSplit = splitFlag.f0; | ||||||||||
| LOG.info("Get a split: {}", this.currentSplit.toString()); | ||||||||||
|
|
||||||||||
| // Create reader based on split type | ||||||||||
| if (this.currentSplit.isSnapshotSplit()) { | ||||||||||
| this.currentReader = getSnapshotSplitReader(baseReq); | ||||||||||
| } else if (this.currentSplit.isStreamSplit()) { | ||||||||||
| this.currentReader = getBinlogSplitReader(baseReq); | ||||||||||
JNSimba marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| } else { | ||||||||||
| LOG.info( | ||||||||||
| "Continue read records with current split records, splitId: {}", | ||||||||||
| currentSplitRecords.getSplitId()); | ||||||||||
| throw new IllegalStateException( | ||||||||||
| "Unknown split type: " + this.currentSplit.getClass().getName()); | ||||||||||
| } | ||||||||||
|
Comment on lines
+162
to
169
|
||||||||||
|
|
||||||||||
| // build response with iterator | ||||||||||
| SplitReadResult result = new SplitReadResult(); | ||||||||||
| SourceSplitState currentSplitState = null; | ||||||||||
| SourceSplitBase currentSplit = this.getCurrentSplit(); | ||||||||||
| if (currentSplit.isSnapshotSplit()) { | ||||||||||
| currentSplitState = new SnapshotSplitState(currentSplit.asSnapshotSplit()); | ||||||||||
| // Submit split | ||||||||||
| FetchTask<SourceSplitBase> splitFetchTask = | ||||||||||
| createFetchTaskFromSplit(baseReq, this.currentSplit); | ||||||||||
| this.currentReader.submitTask(splitFetchTask); | ||||||||||
| this.setCurrentFetchTask(splitFetchTask); | ||||||||||
|
|
||||||||||
| // Create split state | ||||||||||
| SourceSplitState currentSplitState; | ||||||||||
| if (this.currentSplit.isSnapshotSplit()) { | ||||||||||
| currentSplitState = new SnapshotSplitState(this.currentSplit.asSnapshotSplit()); | ||||||||||
| } else { | ||||||||||
| currentSplitState = new StreamSplitState(currentSplit.asStreamSplit()); | ||||||||||
| currentSplitState = new StreamSplitState(this.currentSplit.asStreamSplit()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| Iterator<SourceRecord> filteredIterator = | ||||||||||
| new FilteredRecordIterator(currentSplitRecords, currentSplitState); | ||||||||||
|
|
||||||||||
| result.setRecordIterator(filteredIterator); | ||||||||||
| // Return result without iterator | ||||||||||
| SplitReadResult result = new SplitReadResult(); | ||||||||||
| result.setSplit(this.currentSplit); | ||||||||||
| result.setSplitState(currentSplitState); | ||||||||||
| result.setSplit(split); | ||||||||||
| return result; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public Iterator<SourceRecord> pollRecords(Object splitState) throws InterruptedException { | ||||||||||
| Preconditions.checkState(this.currentReader != null, "currentReader is null"); | ||||||||||
| Preconditions.checkNotNull(splitState, "splitState is null"); | ||||||||||
| Preconditions.checkState( | ||||||||||
| splitState instanceof SourceSplitState, | ||||||||||
| "splitState type is invalid " + splitState.getClass()); | ||||||||||
|
|
||||||||||
| // Poll data from Debezium queue | ||||||||||
| Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords(); | ||||||||||
| if (dataIt == null || !dataIt.hasNext()) { | ||||||||||
| return Collections.emptyIterator(); // No data available | ||||||||||
| } | ||||||||||
|
|
||||||||||
| SourceRecords sourceRecords = dataIt.next(); | ||||||||||
| SplitRecords splitRecords = | ||||||||||
| new SplitRecords(this.currentSplit.splitId(), sourceRecords.iterator()); | ||||||||||
| if (!sourceRecords.getSourceRecordList().isEmpty()) { | ||||||||||
| LOG.info("{} Records received.", sourceRecords.getSourceRecordList().size()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Return filtered iterator | ||||||||||
| return new FilteredRecordIterator(splitRecords, (SourceSplitState) splitState); | ||||||||||
| } | ||||||||||
|
Comment on lines
+193
to
+215
|
||||||||||
|
|
||||||||||
| protected abstract DataType fromDbzColumn(Column splitColumn); | ||||||||||
|
|
||||||||||
| protected abstract Fetcher<SourceRecords, SourceSplitBase> getSnapshotSplitReader( | ||||||||||
|
|
@@ -453,91 +460,6 @@ private static void closeChunkSplitterOnly(HybridSplitAssigner<?> splitAssigner) | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseConfig jobConfig) | ||||||||||
| throws Exception { | ||||||||||
| Preconditions.checkState(split != null, "split is null"); | ||||||||||
| SourceRecords sourceRecords = null; | ||||||||||
| String currentSplitId = null; | ||||||||||
| Fetcher<SourceRecords, SourceSplitBase> currentReader = null; | ||||||||||
| LOG.info("Get a split: {}", split.splitId()); | ||||||||||
| if (split.isSnapshotSplit()) { | ||||||||||
| currentReader = getSnapshotSplitReader(jobConfig); | ||||||||||
| } else if (split.isStreamSplit()) { | ||||||||||
| currentReader = getBinlogSplitReader(jobConfig); | ||||||||||
| } | ||||||||||
| this.setCurrentReader(currentReader); | ||||||||||
| FetchTask<SourceSplitBase> splitFetchTask = createFetchTaskFromSplit(jobConfig, split); | ||||||||||
| currentReader.submitTask(splitFetchTask); | ||||||||||
| currentSplitId = split.splitId(); | ||||||||||
| this.setCurrentFetchTask(splitFetchTask); | ||||||||||
| // make split record available | ||||||||||
| sourceRecords = | ||||||||||
| pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500); | ||||||||||
| if (currentReader instanceof IncrementalSourceScanFetcher) { | ||||||||||
| closeCurrentReader(); | ||||||||||
| } | ||||||||||
| return new SplitRecords(currentSplitId, sourceRecords.iterator()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private SplitRecords pollSplitRecordsWithCurrentReader( | ||||||||||
| Fetcher<SourceRecords, SourceSplitBase> currentReader) throws Exception { | ||||||||||
| Iterator<SourceRecords> dataIt = null; | ||||||||||
| if (currentReader instanceof IncrementalSourceStreamFetcher) { | ||||||||||
| dataIt = currentReader.pollSplitRecords(); | ||||||||||
| return dataIt == null | ||||||||||
| ? null | ||||||||||
| : new SplitRecords(STREAM_SPLIT_ID, dataIt.next().iterator()); | ||||||||||
| } else { | ||||||||||
| throw new IllegalStateException("Unsupported reader type."); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Split tasks are submitted asynchronously, and data is sent to the Debezium queue. Therefore, | ||||||||||
| * there will be a time interval between retrieving data; it's necessary to fetch data until the | ||||||||||
| * queue has data. | ||||||||||
| */ | ||||||||||
| private SourceRecords pollUntilDataAvailable( | ||||||||||
| Fetcher<SourceRecords, SourceSplitBase> reader, long maxWaitTimeMs, long pollIntervalMs) | ||||||||||
| throws InterruptedException { | ||||||||||
| long startTime = System.currentTimeMillis(); | ||||||||||
| long elapsedTime = 0; | ||||||||||
| int attemptCount = 0; | ||||||||||
| LOG.info("Polling until data available"); | ||||||||||
| Iterator<SourceRecords> lastDataIt = null; | ||||||||||
| while (elapsedTime < maxWaitTimeMs) { | ||||||||||
| attemptCount++; | ||||||||||
| lastDataIt = reader.pollSplitRecords(); | ||||||||||
| if (lastDataIt != null && lastDataIt.hasNext()) { | ||||||||||
| SourceRecords sourceRecords = lastDataIt.next(); | ||||||||||
| if (sourceRecords != null && !sourceRecords.getSourceRecordList().isEmpty()) { | ||||||||||
| LOG.info( | ||||||||||
| "Data available after {} ms ({} attempts). {} Records received.", | ||||||||||
| elapsedTime, | ||||||||||
| attemptCount, | ||||||||||
| sourceRecords.getSourceRecordList().size()); | ||||||||||
| // todo: poll until heartbeat ? | ||||||||||
| return sourceRecords; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // No records yet, continue polling | ||||||||||
| if (elapsedTime + pollIntervalMs < maxWaitTimeMs) { | ||||||||||
| Thread.sleep(pollIntervalMs); | ||||||||||
| elapsedTime = System.currentTimeMillis() - startTime; | ||||||||||
| } else { | ||||||||||
| // Last attempt before timeout | ||||||||||
| break; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| LOG.warn( | ||||||||||
| "Timeout: No data (heartbeat or data change) received after {} ms ({} attempts).", | ||||||||||
| elapsedTime, | ||||||||||
| attemptCount); | ||||||||||
| return new SourceRecords(new ArrayList<>()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private void closeCurrentReader() { | ||||||||||
| Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader(); | ||||||||||
| if (currentReader != null) { | ||||||||||
|
|
@@ -672,6 +594,8 @@ public boolean hasNext() { | |||||||||
| Offset position = createOffset(element.sourceOffset()); | ||||||||||
| splitState.asStreamSplitState().setStartingOffset(position); | ||||||||||
| } | ||||||||||
| nextRecord = element; | ||||||||||
| return true; | ||||||||||
|
Comment on lines
+597
to
+598
|
||||||||||
| nextRecord = element; | |
| return true; | |
| // Do not emit heartbeat events as records; only update state. | |
| continue; |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,16 +18,12 @@ | |||||||||||
| package org.apache.doris.cdcclient.source.reader; | ||||||||||||
|
|
||||||||||||
| import org.apache.flink.api.connector.source.SourceSplit; | ||||||||||||
| import org.apache.kafka.connect.source.SourceRecord; | ||||||||||||
|
|
||||||||||||
| import java.util.Iterator; | ||||||||||||
|
|
||||||||||||
| import lombok.Data; | ||||||||||||
|
|
||||||||||||
| /** The result of reading a split with iterator. */ | ||||||||||||
|
||||||||||||
| /** The result of reading a split with iterator. */ | |
| /** | |
| * Container for a source split and its associated state. | |
| * Iteration over records for this split is handled separately (for example via pollRecords). | |
| */ |
Uh oh!
There was an error while loading. Please reload this page.