Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public LogScanner createLogScanner() {
tableInfo,
conn.getMetadataUpdater(),
conn.getClientMetricGroup(),
conn.getOrCreateRemoteFileDownloader(),
projectedColumns,
schemaGetter,
recordBatchFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
while (pendings != null && !pendings.isEmpty()) {
PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
CompletedFetch completedFetch = pendingFetch.toCompletedFetch();
completedFetches.add(completedFetch);
pendings.poll();
hasCompleted = true;
try {
CompletedFetch completedFetch = pendingFetch.toCompletedFetch();
completedFetches.add(completedFetch);
hasCompleted = true;
} catch (Throwable t) {
// If toCompletedFetch() fails (e.g. the underlying chunk
// future completed exceptionally), discard this entry so
// the queue is not blocked. The bucket will become fetchable
// again and the server can re-issue a remote fetch.
LOG.warn(
"Discarding failed pending fetch for bucket {}",
tableBucket,
t);
}
} else {
break;
}
}
if (hasCompleted) {
notEmptyCondition.signalAll();
// clear the bucket entry if there is no pending fetches for the bucket.
if (pendings.isEmpty()) {
this.pendingFetches.remove(tableBucket);
}
}
// clear the bucket entry if there are no pending fetches for the bucket.
if (pendings != null && pendings.isEmpty()) {
this.pendingFetches.remove(tableBucket);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand Down Expand Up @@ -126,7 +125,6 @@ public LogFetcher(
Configuration conf,
MetadataUpdater metadataUpdater,
ScannerMetricGroup scannerMetricGroup,
RemoteFileDownloader remoteFileDownloader,
SchemaGetter schemaGetter) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.isPartitioned();
Expand Down Expand Up @@ -163,8 +161,7 @@ public LogFetcher(
this.logFetchCollector =
new LogFetchCollector(tablePath, logScannerStatus, conf, metadataUpdater);
this.scannerMetricGroup = scannerMetricGroup;
this.remoteLogDownloader =
new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup);
this.remoteLogDownloader = new RemoteLogDownloader(tablePath, conf, scannerMetricGroup);
remoteLogDownloader.start();
}

Expand Down Expand Up @@ -480,22 +477,37 @@ private void pendRemoteFetches(
fetchOffset = segment.remoteLogStartOffset();
}
RemoteLogDownloadFuture downloadFuture =
remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment);
RemotePendingFetch pendingFetch =
new RemotePendingFetch(
segment,
downloadFuture,
posInLogSegment,
fetchOffset,
highWatermark,
remoteReadContext,
logScannerStatus,
isCheckCrcs);
logFetchBuffer.pend(pendingFetch);
downloadFuture.onComplete(() -> logFetchBuffer.tryComplete(segment.tableBucket()));
remoteLogDownloader.requestRemoteLog(
remoteLogTabletDir, segment, posInLogSegment);
pendRemoteChunkFetch(segment, downloadFuture, fetchOffset, highWatermark);
}
}

/**
* Registers a pending fetch for a single chunk and sets up the callback chain so that
* subsequent chunks of the same segment are automatically pended.
*/
private void pendRemoteChunkFetch(
RemoteLogSegment segment,
RemoteLogDownloadFuture downloadFuture,
long fetchOffset,
long highWatermark) {
downloadFuture.setNextChunkCallback(
nextFuture ->
pendRemoteChunkFetch(segment, nextFuture, fetchOffset, highWatermark));
RemotePendingFetch pendingFetch =
new RemotePendingFetch(
segment,
downloadFuture,
fetchOffset,
highWatermark,
remoteReadContext,
logScannerStatus,
isCheckCrcs);
logFetchBuffer.pend(pendingFetch);
downloadFuture.onComplete(() -> logFetchBuffer.tryComplete(segment.tableBucket()));
}

@VisibleForTesting
Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.WakeupException;
import org.apache.fluss.metadata.SchemaGetter;
Expand Down Expand Up @@ -80,7 +79,6 @@ public LogScannerImpl(
TableInfo tableInfo,
MetadataUpdater metadataUpdater,
ClientMetricGroup clientMetricGroup,
RemoteFileDownloader remoteFileDownloader,
@Nullable int[] projectedFields,
SchemaGetter schemaGetter,
@Nullable Predicate recordBatchFilter) {
Expand All @@ -102,7 +100,6 @@ public LogScannerImpl(
conf,
metadataUpdater,
scannerMetricGroup,
remoteFileDownloader,
schemaGetter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.FileLogRecords;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.rpc.protocol.ApiError;

import java.io.IOException;

/**
* {@link RemoteCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that
* the log records are fetched from remote log storage.
* the log records are fetched from remote log storage as file-backed or in-memory chunks.
*/
@Internal
class RemoteCompletedFetch extends CompletedFetch {

private final FileLogRecords fileLogRecords;

// recycle to clean up the fetched remote log files and increment the prefetch semaphore
// recycle to notify the downloader that this chunk has been consumed
private final Runnable recycleCallback;

RemoteCompletedFetch(
TableBucket tableBucket,
FileLogRecords fileLogRecords,
LogRecords logRecords,
long highWatermark,
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
Expand All @@ -49,28 +45,27 @@ class RemoteCompletedFetch extends CompletedFetch {
super(
tableBucket,
ApiError.NONE,
fileLogRecords.sizeInBytes(),
logRecords.sizeInBytes(),
highWatermark,
fileLogRecords.batches().iterator(),
logRecords.batches().iterator(),
readContext,
logScannerStatus,
isCheckCrc,
fetchOffset,
CompletedFetch.NO_FILTERED_END_OFFSET);
this.fileLogRecords = fileLogRecords;
this.recycleCallback = recycleCallback;
}

@Override
void drain() {
super.drain();
// close file channel only, don't need to flush the file which is very heavy
try {
fileLogRecords.closeHandlers();
} catch (IOException e) {
LOG.warn("Failed to close file channel for remote log records", e);
// Guard against double-drain: super.drain() is idempotent (checks isConsumed internally),
// but recycleCallback must only be called once to avoid double-incrementing chunksConsumed
// which would trigger a spurious semaphore release and corrupt flow-control counters.
if (isConsumed()) {
return;
}
// call recycle to remove the fetched files and increment the prefetch semaphore
super.drain();
// call recycle to notify the downloader and trigger next chunk read
recycleCallback.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,69 @@

package org.apache.fluss.client.table.scanner.log;

import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.record.FileLogRecords;
import org.apache.fluss.record.LogRecords;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/** Represents the future of a remote log download request. */
/**
* Represents the future of a single chunk read from a remote log segment. Each chunk is delivered
* as a {@link LogRecords} via a {@link CompletableFuture}.
*/
public class RemoteLogDownloadFuture {

private final CompletableFuture<File> logFileFuture;
private static final Logger LOG = LoggerFactory.getLogger(RemoteLogDownloadFuture.class);

private final CompletableFuture<LogRecords> chunkFuture;
private final Runnable recycleCallback;
private Consumer<RemoteLogDownloadFuture> nextChunkCallback;

public RemoteLogDownloadFuture(
CompletableFuture<File> logFileFuture, Runnable recycleCallback) {
this.logFileFuture = logFileFuture;
CompletableFuture<LogRecords> chunkFuture, Runnable recycleCallback) {
this.chunkFuture = chunkFuture;
this.recycleCallback = recycleCallback;
}

public boolean isDone() {
return logFileFuture.isDone();
return chunkFuture.isDone();
}

public FileLogRecords getFileLogRecords(int startPosition) {
try {
FileLogRecords fileLogRecords = FileLogRecords.open(logFileFuture.join(), false);
if (startPosition > 0) {
return fileLogRecords.slice(startPosition, Integer.MAX_VALUE);
} else {
return fileLogRecords;
}
} catch (IOException e) {
throw new FlussRuntimeException(e);
}
/** Returns the chunk data. Blocks until the chunk is ready. */
public LogRecords getLogRecords() {
return chunkFuture.join();
}

public Runnable getRecycleCallback() {
return recycleCallback;
}

public void onComplete(Runnable callback) {
logFileFuture.thenRun(callback);
// Use whenComplete (instead of thenRun) so the callback fires regardless of whether
// the chunkFuture completed successfully or exceptionally. This is critical: if a chunk
// read fails, the LogFetchBuffer still needs tryComplete() to be called so the failed
// PendingFetch can be drained (otherwise the bucket would be stuck permanently).
chunkFuture.whenComplete(
(result, throwable) -> {
try {
callback.run();
} catch (Throwable t) {
LOG.error("Exception in chunk completion callback", t);
}
});
}

/**
* Sets a callback that will be invoked when the next chunk's future is created. This allows the
* {@link LogFetcher} to register a new {@link RemotePendingFetch} for each subsequent chunk.
*/
public void setNextChunkCallback(Consumer<RemoteLogDownloadFuture> callback) {
this.nextChunkCallback = callback;
}

public Consumer<RemoteLogDownloadFuture> getNextChunkCallback() {
return nextChunkCallback;
}
}
Loading