Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1075,10 +1075,16 @@ public ServerConfiguration setMaxAddsInProgressLimit(int value) {
/**
* Get max number of reads in progress. 0 == unlimited.
*
* <p>This limit bounds the memory used by read responses that have been read from storage
* but not yet flushed to the network. Since read response writes are non-blocking,
* without this limit a slow consumer could cause unbounded memory growth.
* The default value of 10000 provides a reasonable balance between throughput and memory usage.
* Tune based on your average entry size: memoryBudget / avgEntrySize.
*
* @return Max number of reads in progress.
*/
public int getMaxReadsInProgressLimit() {
return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 10000);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
Expand Down Expand Up @@ -74,10 +74,12 @@ protected void sendWriteReqResponse(int rc, Object response, OpStatsLogger stats
protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) {
if (throttle) {
sendResponseAndWait(rc, response, statsLogger);
// onReadRequestFinish is called asynchronously in the ChannelFutureListener
// inside sendResponseAndWait to maintain throttling without blocking the thread.
} else {
sendResponse(rc, response, statsLogger);
requestProcessor.onReadRequestFinish();
}
requestProcessor.onReadRequestFinish();
}

protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
Expand Down Expand Up @@ -150,27 +152,44 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger)
}

/**
* Write on the channel and wait until the write is completed.
* Write on the channel and notify completion via a listener.
*
* <p>That will make the thread to get blocked until we're able to
* write everything on the TCP stack, providing auto-throttling
* and avoiding using too much memory when handling read-requests.
* <p>This provides auto-throttling by holding the read semaphore until the write completes,
* without blocking the read thread pool thread. The read thread is freed immediately to
* process other requests, while the semaphore prevents unbounded read concurrency.
*/
protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) {
// Capture fields before the processor may be recycled after this method returns.
final long capturedEnqueueNanos = this.enqueueNanos;
final BookieRequestProcessor processor = this.requestProcessor;
try {
Channel channel = requestHandler.ctx().channel();
ChannelFuture future = channel.writeAndFlush(response);
if (!channel.eventLoop().inEventLoop()) {
future.get();
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess() && logger.isDebugEnabled()) {
logger.debug("Netty channel write exception. ", f.cause());
}
if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
} else {
statsLogger.registerFailedEvent(
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
}
processor.onReadRequestFinish();
});
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Netty channel write exception. ", e);
}
} catch (ExecutionException | InterruptedException e) {
logger.debug("Netty channel write exception. ", e);
return;
}
if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
} else {
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
} else {
statsLogger.registerFailedEvent(
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
}
processor.onReadRequestFinish();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.bookkeeper.proto;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -38,10 +40,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -195,4 +199,178 @@ public void testNonFenceRequest() throws Exception {
assertEquals(BookieProtocol.READENTRY, response.getOpCode());
assertEquals(BookieProtocol.EOK, response.getErrorCode());
}

/**
* Test that when throttleReadResponses=true and the caller is not in the Netty event loop,
* the read thread is not blocked by the write. onReadRequestFinish() should only be called
* after the write future completes, preserving throttling without blocking the thread.
*/
@Test
public void testThrottledReadNonBlockingOnSuccess() throws Exception {
// Setup event loop to simulate read worker thread (not event loop thread)
EventLoop eventLoop = mock(EventLoop.class);
when(eventLoop.inEventLoop()).thenReturn(false);
doAnswer(inv -> {
((Runnable) inv.getArgument(0)).run();
return null;
}).when(eventLoop).execute(any(Runnable.class));
when(channel.eventLoop()).thenReturn(eventLoop);

// Use a controllable promise so we can verify deferred behavior
DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = ReadRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, null, true /* throttle */);

// run() should return immediately without blocking on the write
processor.run();

// Write should have been issued
verify(channel, times(1)).writeAndFlush(any(Response.class));
// But onReadRequestFinish should NOT have been called yet — write not completed
verify(requestProcessor, never()).onReadRequestFinish();

// Complete the write
writeFuture.setSuccess();

// Now onReadRequestFinish should have been called
verify(requestProcessor, times(1)).onReadRequestFinish();
}

/**
* Test that onReadRequestFinish() is still called even when the write fails,
* so the read semaphore is always released.
*/
@Test
public void testThrottledReadNonBlockingOnWriteFailure() throws Exception {
EventLoop eventLoop = mock(EventLoop.class);
when(eventLoop.inEventLoop()).thenReturn(false);
doAnswer(inv -> {
((Runnable) inv.getArgument(0)).run();
return null;
}).when(eventLoop).execute(any(Runnable.class));
when(channel.eventLoop()).thenReturn(eventLoop);

DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = ReadRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, null, true /* throttle */);

processor.run();

verify(channel, times(1)).writeAndFlush(any(Response.class));
verify(requestProcessor, never()).onReadRequestFinish();

// Fail the write
writeFuture.setFailure(new IOException("channel write failed"));

// onReadRequestFinish must still be called to release the read semaphore
verify(requestProcessor, times(1)).onReadRequestFinish();
}

/**
* Test that when throttleReadResponses=false, onReadRequestFinish() is called
* synchronously before run() returns.
*/
@Test
public void testNonThrottledReadCallsOnFinishSynchronously() throws Exception {
// sendResponse (non-throttle path) uses channel.isActive() and two-arg writeAndFlush
when(channel.isActive()).thenReturn(true);
when(channel.writeAndFlush(any(), any(ChannelPromise.class))).thenReturn(mock(ChannelPromise.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = ReadRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, null, false /* no throttle */);

processor.run();

verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
// onReadRequestFinish should have been called synchronously
verify(requestProcessor, times(1)).onReadRequestFinish();
}

/**
* Verify that maxReadsInProgressLimit defaults to 10000 (enabled),
* ensuring non-blocking read response writes are bounded by default.
*/
@Test
public void testDefaultMaxReadsInProgressLimitIsEnabled() {
ServerConfiguration conf = new ServerConfiguration();
assertEquals("maxReadsInProgressLimit should default to 10000",
10000, conf.getMaxReadsInProgressLimit());
}

/**
* Test that the read semaphore is held from request creation until the write future completes,
* not released when the read thread returns. This ensures that maxReadsInProgressLimit correctly
* bounds the number of read responses buffered in memory, even though the read thread is
* non-blocking.
*/
@Test
public void testThrottledReadHoldsSemaphoreUntilWriteCompletes() throws Exception {
// Simulate maxReadsInProgressLimit=1 with a real semaphore
Semaphore readsSemaphore = new Semaphore(1);

doAnswer(inv -> {
readsSemaphore.acquireUninterruptibly();
return null;
}).when(requestProcessor).onReadRequestStart(any(Channel.class));
doAnswer(inv -> {
readsSemaphore.release();
return null;
}).when(requestProcessor).onReadRequestFinish();

// Setup non-event-loop thread
EventLoop eventLoop = mock(EventLoop.class);
when(eventLoop.inEventLoop()).thenReturn(false);
doAnswer(inv -> {
((Runnable) inv.getArgument(0)).run();
return null;
}).when(eventLoop).execute(any(Runnable.class));
when(channel.eventLoop()).thenReturn(eventLoop);

// Controllable write future
DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = ReadRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});

// create() calls onReadRequestStart → semaphore acquired
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, null, true /* throttle */);

// Semaphore should be acquired (1 permit used)
assertEquals("semaphore should have 0 permits after read started",
0, readsSemaphore.availablePermits());

// Run the processor — thread returns immediately (non-blocking)
processor.run();

// Semaphore should STILL be held (write not completed)
assertEquals("semaphore should still have 0 permits while write is in progress",
0, readsSemaphore.availablePermits());

// A second read would be unable to acquire the semaphore
assertFalse("second read should not be able to acquire semaphore",
readsSemaphore.tryAcquire());

// Complete the write
writeFuture.setSuccess();

// Now semaphore should be released — a new read can proceed
assertEquals("semaphore should have 1 permit after write completes",
1, readsSemaphore.availablePermits());
}
}
Loading