Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ but there are tradeoffs (e.g. Write and Read performance) between different type
| **Type** | TimeDuration |
| **Default** | 10ms |

| **Property** | `raft.server.read.read-index.batch.enabled` |
|:----------------|:----------------------------------------------------------------------------------|
| **Description** | whether to batch follower-to-leader ReadIndex RPCs for plain linearizable reads |
| **Type** | boolean |
| **Default** | false |

| **Property** | `raft.server.read.read-index.batch.size` |
|:----------------|:----------------------------------------------------|
| **Description** | maximum number of reads in one opportunistic ReadIndex batch |
| **Type** | int |
| **Default** | 64 |

When ReadIndex batching is enabled, a follower batches plain linearizable read
requests opportunistically and sends a single ReadIndex request for the reads
already queued when the batch is drained. `batch.size` is a maximum cap, not a
target size; the follower does not wait to fill a batch. Read-after-write
requests bypass batching so that the leader can evaluate each request's
client-specific write index.

| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
| **Description** | whether to check heartbeat for read index. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,28 @@ static TimeDuration repliedIndexBatchInterval(RaftProperties properties) {
static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}

interface Batch {
String PREFIX = ReadIndex.PREFIX + ".batch";

String ENABLED_KEY = PREFIX + ".enabled";
boolean ENABLED_DEFAULT = false;
static boolean enabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT, getDefaultLog());
}
static void setEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ENABLED_KEY, enabled);
}

String BATCH_SIZE_KEY = PREFIX + ".size";
int BATCH_SIZE_DEFAULT = 64;
static int batchSize(RaftProperties properties) {
return getInt(properties::getInt, BATCH_SIZE_KEY, BATCH_SIZE_DEFAULT, getDefaultLog(), requireMin(1));
}
static void setBatchSize(RaftProperties properties, int batchSize) {
setInt(properties::setInt, BATCH_SIZE_KEY, batchSize, requireMin(1));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction";
static final String READ_INDEX = CLASS_NAME + ".readIndexAsync";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
static final String START_COMPLETE = CLASS_NAME + ".startComplete";
Expand Down Expand Up @@ -241,6 +242,7 @@ public long[] getFollowerMatchIndices() {
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
private final NavigableIndices appendLogTermIndices;
private final ReadIndexBatching readIndexBatching;

private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this);
private final LeaderElectionMetrics leaderElectionMetrics;
Expand Down Expand Up @@ -301,6 +303,11 @@ public long[] getFollowerMatchIndices() {
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
id + "-client");
this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ?
new ReadIndexBatching(
serverExecutor,
RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties),
this::sendReadIndexAsyncImpl) : null;
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
}

Expand Down Expand Up @@ -522,6 +529,11 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
public void close() {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown", getMemberId());
try {
Optional.ofNullable(readIndexBatching).ifPresent(ReadIndexBatching::close);
} catch (Exception e) {
LOG.warn("{}: Failed to close ReadIndexBatching", getMemberId(), e);
}
try {
jmxAdapter.unregister();
} catch (Exception e) {
Expand Down Expand Up @@ -1091,6 +1103,21 @@ ReadRequests getReadRequests() {
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
if (readIndexBatching != null
&& !role.getLeaderState().isPresent()
&& !clientRequest.getType().getRead().getReadAfterWriteConsistent()) {
return readIndexBatching.submit(clientRequest);
}
return sendReadIndexAsyncImpl(clientRequest);
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsyncImpl(RaftClientRequest clientRequest) {
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader != null) {
return getReadIndex(clientRequest, leader)
.thenApply(index -> toReadIndexReplyProto(getId(), getMemberId(), true, index));
}

final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
Expand Down Expand Up @@ -1569,6 +1596,7 @@ public CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestPro
assertLifeCycleState(LifeCycle.States.RUNNING);

final RaftPeerId peerId = RaftPeerId.valueOf(request.getServerRequest().getRequestorId());
CodeInjectionForTesting.execute(READ_INDEX, getId(), peerId, request);

final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.util.JavaUtils;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
* Opportunistically batch follower-to-leader ReadIndex requests.
*
* <p>The batch is drained on the server executor without waiting for a timer. {@code batchSize}
* is only a maximum drain cap, not a target size.
*/
class ReadIndexBatching {
private final Executor executor;
private final int batchSize;
private final Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl;

/** Guarded by {@code this}. */
private final Queue<Pending> pending = new ArrayDeque<>();
/** Guarded by {@code this}. */
private final HashSet<Batch> inFlight = new HashSet<>();
/** Guarded by {@code this}; at most one drain task is scheduled or running. */
private boolean drainScheduled;
/** Guarded by {@code this}. */
private boolean closed;

ReadIndexBatching(Executor executor, int batchSize,
Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl) {
this.executor = executor;
this.batchSize = batchSize;
this.readIndexAsyncImpl = readIndexAsyncImpl;
}

CompletableFuture<ReadIndexReplyProto> submit(RaftClientRequest request) {
final CompletableFuture<ReadIndexReplyProto> future = new CompletableFuture<>();
final boolean schedule;
synchronized (this) {
if (closed) {
return JavaUtils.completeExceptionally(newClosedException());
}
pending.add(new Pending(request, future));
schedule = !drainScheduled;
if (schedule) {
drainScheduled = true;
}
}

if (schedule) {
scheduleDrain();
}
return future;
}

void close() {
close(newClosedException());
}

private void close(Throwable throwable) {
final List<Pending> queued;
final List<Batch> running;
synchronized (this) {
if (closed) {
return;
}
closed = true;
drainScheduled = false;
queued = new ArrayList<>(pending);
pending.clear();
running = new ArrayList<>(inFlight);
inFlight.clear();
}
queued.forEach(p -> p.future.completeExceptionally(throwable));
running.forEach(batch -> batch.completeExceptionally(throwable));
}

private void scheduleDrain() {
try {
executor.execute(this::drain);
} catch (RejectedExecutionException e) {
close(new ReadIndexException("Failed to schedule ReadIndex batch drain.", e));
}
}

private static ReadIndexException newClosedException() {
return new ReadIndexException("ReadIndex batching is closed.");
}

private void drain() {
final Batch batch;
synchronized (this) {
if (closed || pending.isEmpty()) {
drainScheduled = false;
return;
}
batch = pollBatch();
inFlight.add(batch);
}

batch.send(readIndexAsyncImpl, () -> onBatchDone(batch));

final boolean scheduleNext;
synchronized (this) {
if (closed) {
scheduleNext = false;
} else if (pending.isEmpty()) {
drainScheduled = false;
scheduleNext = false;
} else {
scheduleNext = true;
}
}
if (scheduleNext) {
scheduleDrain();
}
}

private Batch pollBatch() {
final List<Pending> batch = new ArrayList<>(Math.min(batchSize, pending.size()));
for (int i = 0; i < batchSize; i++) {
final Pending next = pending.poll();
if (next == null) {
break;
}
batch.add(next);
}
return new Batch(batch);
}

private void onBatchDone(Batch batch) {
synchronized (this) {
inFlight.remove(batch);
}
}

private static class Pending {
private final RaftClientRequest request;
private final CompletableFuture<ReadIndexReplyProto> future;

Pending(RaftClientRequest request, CompletableFuture<ReadIndexReplyProto> future) {
this.request = request;
this.future = future;
}
}

private static class Batch {
private final AtomicBoolean completed = new AtomicBoolean();
private final List<Pending> pending;

Batch(List<Pending> pending) {
this.pending = pending;
}

void send(Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl,
Runnable onComplete) {
if (pending.isEmpty()) {
return;
}
if (completed.get()) {
return;
}

final CompletableFuture<ReadIndexReplyProto> replyFuture;
try {
// Plain reads only need one ReadIndex RPC for the batch. Read-after-write requests
// bypass batching before reaching this class, since their client request carries
// per-client write-index state.
replyFuture = readIndexAsyncImpl.apply(pending.get(0).request);
} catch (Throwable t) {
completeExceptionally(t);
onComplete.run();
return;
}

replyFuture.whenComplete((reply, throwable) -> {
try {
if (throwable != null) {
completeExceptionally(JavaUtils.unwrapCompletionException(throwable));
} else {
complete(reply);
}
} finally {
onComplete.run();
}
});
}

private void complete(ReadIndexReplyProto reply) {
if (completed.compareAndSet(false, true)) {
pending.forEach(p -> p.future.complete(reply));
}
}

private void completeExceptionally(Throwable throwable) {
if (completed.compareAndSet(false, true)) {
pending.forEach(p -> p.future.completeExceptionally(throwable));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>

public abstract Type readIndexType();

public boolean readIndexBatchEnabled() {
return false;
}

public final void assertRaftProperties(RaftProperties p) {
assertOption(LINEARIZABLE, p);
assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p));
assertEquals(readIndexBatchEnabled(), RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(p));
}

protected void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
Expand All @@ -88,6 +93,7 @@ public void setup() {
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, readIndexBatchEnabled());

// Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path.
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true);
Expand Down Expand Up @@ -285,4 +291,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Loading