Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,11 @@ private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest requ
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
}

ReadRequests getReadRequests() {
return getState().getReadRequests();
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
final Throwable snapshotInstallation = snapshotInstallationHandler.getInProgressInstallSnapshotReadException();
if (snapshotInstallation != null) {
return JavaUtils.completeExceptionally(snapshotInstallation);
}
final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
Expand Down Expand Up @@ -1146,7 +1145,8 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
}

return replyFuture
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> getState().getReadRequests().waitToAdvance(readIndex,
snapshotInstallationHandler::getInProgressInstallSnapshotReadException))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

/** For supporting linearizable read. */
class ReadRequests {
private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);

static ReadException newException(Object server, long installSnapshot) {
return new ReadException(server + ": Failed read as snapshot (" + installSnapshot
+ ") installation is in progress");
}

static class ReadIndexQueue {
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
/** The log index known to be applied. */
Expand All @@ -52,10 +61,14 @@ static class ReadIndexQueue {
this.readTimeout = readTimeout;
}

CompletableFuture<Long> add(long readIndex) {
CompletableFuture<Long> add(long readIndex, Supplier<Throwable> failureSupplier) {
final CompletableFuture<Long> returned;
final boolean create;
synchronized (this) {
final Throwable failure = failureSupplier.get();
if (failure != null) {
return JavaUtils.completeExceptionally(failure);
}
if (readIndex <= lastAppliedIndex) {
return CompletableFuture.completedFuture(lastAppliedIndex);
}
Expand Down Expand Up @@ -88,6 +101,14 @@ private void handleTimeout(long readIndex) {
removed.completeExceptionally(new ReadException("Read timeout " + readTimeout + " for index " + readIndex));
}

void fail(Throwable cause) {
final Collection<CompletableFuture<Long>> futures;
synchronized (this) {
futures = new ArrayList<>(sorted.values());
sorted.clear();
}
futures.forEach(f -> f.completeExceptionally(cause));
}

/** Complete all the entries less than or equal to the given applied index. */
synchronized void complete(long appliedIndex) {
Expand Down Expand Up @@ -119,7 +140,11 @@ LongConsumer getAppliedIndexConsumer() {
return readIndexQueue::complete;
}

CompletableFuture<Long> waitToAdvance(long readIndex) {
return readIndexQueue.add(readIndex);
CompletableFuture<Long> waitToAdvance(long readIndex, Supplier<Throwable> failureSupplier) {
return readIndexQueue.add(readIndex, failureSupplier);
}

void fail(Throwable cause) {
readIndexQueue.fail(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ long getInProgressInstallSnapshotIndex() {
return inProgressInstallSnapshotIndex.get();
}

Throwable getInProgressInstallSnapshotReadException() {
final long installSnapshot = getInProgressInstallSnapshotIndex();
return installSnapshot != INVALID_LOG_INDEX ? ReadRequests.newException(getMemberId(), installSnapshot) : null;
}

InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
suffix -> LOG.info("{}: receive installSnapshot: {} {}",
Expand Down Expand Up @@ -276,6 +281,7 @@ private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstall
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return future.thenApply(dummy -> reply);
}
server.getState().getReadRequests().fail(ReadRequests.newException(getMemberId(), firstAvailableLogIndex));

final RaftPeerProto leaderProto;
if (!request.hasLastRaftConfigurationLogEntryProto()) {
Expand Down Expand Up @@ -401,4 +407,4 @@ private RoleInfoProto getRoleInfoProto(RaftPeerProto leader) {
.setFollowerInfo(followerInfo)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste
}
}

@Test
public void testFollowerLinearizableReadFailsWhenInstallingSnapshot() throws Exception {
runWithNewCluster(ReadOnlyRequestTests::runTestFollowerLinearizableReadFailsWhenInstallingSnapshot);
}

@Test
public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
Expand Down Expand Up @@ -285,4 +290,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
Expand Down Expand Up @@ -139,6 +144,76 @@ static <C extends MiniRaftCluster> void runTestReadOnlyRetryWhenLeaderDown(Retry
}
}

private static void setInProgressInstallSnapshotIndex(RaftServer.Division server, long index) throws Exception {
final Field snapshotInstallationHandler = server.getClass().getDeclaredField("snapshotInstallationHandler");
snapshotInstallationHandler.setAccessible(true);
final Object handler = snapshotInstallationHandler.get(server);
final Field inProgressInstallSnapshotIndex = handler.getClass()
.getDeclaredField("inProgressInstallSnapshotIndex");
inProgressInstallSnapshotIndex.setAccessible(true);
((AtomicLong) inProgressInstallSnapshotIndex.get(handler)).set(index);
}

private static void startSnapshotInstallation(RaftServer.Division server, long index) throws Exception {
setInProgressInstallSnapshotIndex(server, index);
final Method getState = server.getClass().getDeclaredMethod("getState");
getState.setAccessible(true);
final Object state = getState.invoke(server);
final Method getReadRequests = state.getClass().getDeclaredMethod("getReadRequests");
getReadRequests.setAccessible(true);
final Object readRequests = getReadRequests.invoke(state);
final Method fail = readRequests.getClass().getDeclaredMethod("fail", Throwable.class);
fail.setAccessible(true);
fail.invoke(readRequests, new ReadException(server.getMemberId()
+ ": Failed read as snapshot (" + index + ") installation is in progress"));
}

static void assertSnapshotInstallationReadException(Throwable exception) {
final Throwable cause = exception instanceof CompletionException && exception.getCause() != null
? exception.getCause() : exception;
Assertions.assertInstanceOf(ReadException.class, cause);
Assertions.assertTrue(cause.getMessage().contains("snapshot (1) installation is in progress"),
() -> "Unexpected exception: " + exception);
}

static <C extends MiniRaftCluster> void runTestFollowerLinearizableReadFailsWhenInstallingSnapshot(C cluster)
throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Assertions.assertEquals(2, followers.size());

final RaftServer.Division follower = followers.get(0);
final RaftPeerId followerId = follower.getId();

try (RaftClient leaderClient = cluster.createClient(leaderId);
RaftClient followerClient = cluster.createClient(followerId, RetryPolicies.noRetry())) {
assertReplyExact(1, leaderClient.io().send(INCREMENT));
assertReplyExact(1, followerClient.io().sendReadOnly(QUERY, followerId));

final CompletableFuture<RaftClientReply> writeReply = leaderClient.async().send(WAIT_AND_INCREMENT);
Thread.sleep(100);
final CompletableFuture<RaftClientReply> pendingRead = followerClient.async().sendReadOnly(QUERY, followerId);
Assertions.assertFalse(pendingRead.isDone(), () -> "pendingRead=" + pendingRead);

startSnapshotInstallation(follower, 1);
try {
final CompletionException pendingException = Assertions.assertThrows(CompletionException.class,
pendingRead::join);
assertSnapshotInstallationReadException(pendingException);

final ReadException readException = Assertions.assertThrows(ReadException.class,
() -> followerClient.io().sendReadOnly(QUERY, followerId));
assertSnapshotInstallationReadException(readException);
} finally {
setInProgressInstallSnapshotIndex(follower, -1);
}

assertReplyExact(2, writeReply.join());
assertReplyExact(2, followerClient.io().sendReadOnly(QUERY, followerId));
}
}

static int retrieve(RaftClientReply reply) {
Assertions.assertTrue(reply.isSuccess());
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
Expand Down