Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,36 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByExpectedReadCount = true;

/**
* Enable batch read API when reading entries from bookkeeper.
* Batch read allows reading multiple entries in a single RPC call, reducing network overhead.
* Note: Batch read is only effective when ensembleSize equals writeQuorumSize (non-striped ledgers).
*/
@Setter
private boolean batchReadEnabled = false;

/**
* Max size in bytes for batch read requests. If set to 0 or negative,
* uses the netty max frame size (default 5MB).
* Batch read may return fewer entries if total size exceeds this limit.
*/
@Getter
@Setter
private int batchReadMaxSizeBytes = 0;

/**
* Returns whether batch read is enabled for this managed ledger.
* Batch read is only enabled when both conditions are met:
* 1. batchReadEnabled is set to true
* 2. ensembleSize equals writeQuorumSize (non-striped ledger)
*
* @return true if batch read should be used
*/
public boolean isBatchReadEnabled() {
return ensembleSize == writeQuorumSize && batchReadEnabled;
}

@Getter
private long continueCachingAddedEntriesAfterLastActiveCursorLeavesMillis;
private int minimumBacklogCursorsForCaching = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;

public class CompositeLedgerEntriesImpl implements LedgerEntries {
private List<LedgerEntry> entries;
private List<LedgerEntries> ledgerEntries;
private final Recycler.Handle<CompositeLedgerEntriesImpl> recyclerHandle;

private CompositeLedgerEntriesImpl(Recycler.Handle<CompositeLedgerEntriesImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<CompositeLedgerEntriesImpl> RECYCLER = new Recycler<>() {
@Override
protected CompositeLedgerEntriesImpl newObject(Recycler.Handle<CompositeLedgerEntriesImpl> handle) {
return new CompositeLedgerEntriesImpl(handle);
}
};

public static LedgerEntries create(List<LedgerEntry> entries, List<LedgerEntries> ledgerEntries) {
checkArgument(!entries.isEmpty(), "entries for create should not be empty.");
checkArgument(!ledgerEntries.isEmpty(), "ledgerEntries for create should not be empty.");
CompositeLedgerEntriesImpl instance = RECYCLER.get();
instance.entries = entries;
instance.ledgerEntries = ledgerEntries;
return instance;
}

private void recycle() {
if (ledgerEntries == null) {
return;
}
ledgerEntries.forEach(LedgerEntries::close);
entries = null;
ledgerEntries = null;
recyclerHandle.recycle(this);
}

@Override
public LedgerEntry getEntry(long entryId) {
checkNotNull(entries, "entries has been recycled");
long firstId = entries.get(0).getEntryId();
long lastId = entries.get(entries.size() - 1).getEntryId();
if (entryId < firstId || entryId > lastId) {
throw new IndexOutOfBoundsException("required index: " + entryId
+ " is out of bounds: [ " + firstId + ", " + lastId + " ].");
}
int index = (int) (entryId - firstId);
LedgerEntry entry = entries.get(index);
if (entry.getEntryId() != entryId) {
throw new IllegalStateException("Non-contiguous entries detected: expected entryId "
+ entryId + " at index " + index + " but found entryId " + entry.getEntryId());
}
return entry;
}

@Override
public Iterator<LedgerEntry> iterator() {
checkNotNull(entries, "entries has been recycled");
return entries.iterator();
}

@Override
public void close() {
recycle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
Expand Down Expand Up @@ -70,8 +71,11 @@ public void clear() {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
ManagedLedgerConfig config = ml.getConfig();
boolean isBatchReadEnabled = config.isBatchReadEnabled();
int batchReadMaxBytes = config.getBatchReadMaxSizeBytes();
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, isBatchReadEnabled, batchReadMaxBytes)
.thenAcceptAsync(ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
try {
Expand Down Expand Up @@ -99,8 +103,12 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu
@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
ManagedLedgerConfig config = ml.getConfig();
boolean isBatchReadEnabled = config.isBatchReadEnabled();
int batchReadMaxBytes = config.getBatchReadMaxSizeBytes();
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(),
isBatchReadEnabled, batchReadMaxBytes)
.whenCompleteAsync((ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,9 @@ CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, l
private CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
IntSupplier expectedReadCount, boolean allowRetry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<Entry>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
ManagedLedgerConfig mlConfig = ml.getConfig();
CompletableFuture<List<Entry>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry,
mlConfig.isBatchReadEnabled(), mlConfig.getBatchReadMaxSizeBytes())
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,31 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;


@Slf4j
class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
return readAsync(ml, handle, firstEntry, lastEntry, false, 0);
}

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry, boolean batchReadEnabled, int batchReadMaxSize) {
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
Expand All @@ -49,6 +64,117 @@ static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle h
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " + lastEntry));
}

int numberOfEntries = (int) (lastEntry - firstEntry + 1);

// Use batch read for multiple entries when enabled.
if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 0 && handle instanceof LedgerHandle lh) {
if (log.isDebugEnabled()) {
log.debug("Using batch read for ledger {} entries {}-{}, maxCount={}, maxSize={}",
handle.getId(), firstEntry, lastEntry, numberOfEntries, batchReadMaxSize);
}
return batchReadUnconfirmedWithAutoRefill(lh, firstEntry, numberOfEntries, batchReadMaxSize);
}

return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static CompletableFuture<LedgerEntries> batchReadUnconfirmedWithAutoRefill(LedgerHandle lh, long firstEntry,
int maxCount, int maxSize) {
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
List<LedgerEntry> receivedEntries = new ArrayList<>(maxCount);
List<LedgerEntries> ledgerEntries = new ArrayList<>(4);
doBatchRead(lh, firstEntry, maxCount, maxSize, receivedEntries, ledgerEntries, future);
return future;
}

private static void doBatchRead(LedgerHandle lh, long firstEntry, int maxCount, int maxSize,
List<LedgerEntry> receivedEntries, List<LedgerEntries> ledgerEntries,
CompletableFuture<LedgerEntries> future) {
batchReadUnconfirmed(lh, firstEntry, maxCount - receivedEntries.size(), maxSize)
.whenComplete((entries, throwable) -> {
if (throwable != null) {
onBatchReadComplete(lh, firstEntry, maxCount, receivedEntries, ledgerEntries, future,
throwable);
return;
}
long lastReceivedEntry = -1;
int prevReceivedCount = receivedEntries.size();
for (LedgerEntry entry : entries) {
receivedEntries.add(entry);
lastReceivedEntry = entry.getEntryId();
}
ledgerEntries.add(entries);
if (receivedEntries.size() >= maxCount || prevReceivedCount == receivedEntries.size()) {
onBatchReadComplete(lh, firstEntry, maxCount, receivedEntries, ledgerEntries, future, null);
return;
}
doBatchRead(lh, lastReceivedEntry + 1, maxCount, maxSize,
receivedEntries, ledgerEntries, future);
});
}

private static void onBatchReadComplete(LedgerHandle lh, long firstEntry, int maxCount,
List<LedgerEntry> receivedEntries, List<LedgerEntries> ledgerEntries,
CompletableFuture<LedgerEntries> future, Throwable error) {
if (error != null && receivedEntries.isEmpty()) {
ledgerEntries.forEach(LedgerEntries::close);
long lastEntry = firstEntry + maxCount - 1;
log.warn("Batch read failed for ledger {} entries {}-{}, falling back to readUnconfirmed",
lh.getId(), firstEntry, lastEntry, error);
lh.readUnconfirmedAsync(firstEntry, lastEntry)
.whenComplete((result, fallbackError) -> {
if (fallbackError != null) {
future.completeExceptionally(fallbackError);
} else {
future.complete(result);
}
});
return;
}
if (error != null) {
ledgerEntries.forEach(LedgerEntries::close);
future.completeExceptionally(error);
return;
}
if (receivedEntries.isEmpty()) {
ledgerEntries.forEach(LedgerEntries::close);
future.completeExceptionally(new ManagedLedgerException(
"Batch read returned no entries for ledger " + lh.getId()
+ " starting from entry " + firstEntry));
return;
}
future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries));
}


private static CompletableFuture<LedgerEntries> batchReadUnconfirmed(LedgerHandle lh, long firstEntry,
int maxCount, int maxSize) {
CompletableFuture<LedgerEntries> f = new CompletableFuture<>();

lh.asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, (rc, ignore, seq, ctx) -> {
if (rc != BKException.Code.OK) {
f.completeExceptionally(BKException.create(rc));
return;
}
List<LedgerEntry> entries = new ArrayList<>(maxCount);
while (seq.hasMoreElements()) {
var oldEntry = seq.nextElement();
entries.add(LedgerEntryImpl.create(
oldEntry.getLedgerId(),
oldEntry.getEntryId(),
oldEntry.getLength(),
oldEntry.getEntryBuffer()));
}
if (entries.isEmpty()) {
f.completeExceptionally(new ManagedLedgerException(
"Batch read returned no entries for ledger " + lh.getId()
+ " starting from entry " + firstEntry));
return;
}
f.complete(LedgerEntriesImpl.create(entries));
}, null);

return f;
}
}
Loading
Loading