Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand All @@ -62,16 +65,23 @@
public final class RpcResolver implements Resolver {
private static final int QUEUE_SIZE = 5;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
private final AtomicBoolean successfulConnection = new AtomicBoolean(false);
private final ChannelConnector connector;
private final Cache cache;
private final ResolveStrategy strategy;
private final FlagdOptions options;
private final int maxBackoffMs;
private final LinkedBlockingQueue<StreamResponseModel<EventStreamResponse>> incomingQueue;
private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onProviderEvent;
private final ServiceStub stub;
private final ServiceBlockingStub blockingStub;
private final List<String> fatalStatusCodes;
private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "flagd-rpc-retry-scheduler");
t.setDaemon(true);
return t;
});

/**
* Resolves flag values using
Expand All @@ -96,6 +106,7 @@ public RpcResolver(
this.blockingStub =
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
this.fatalStatusCodes = options.getFatalStatusCodes();
this.maxBackoffMs = options.getRetryBackoffMaxMs();
}

// testing only
Expand All @@ -115,23 +126,14 @@ protected RpcResolver(
this.stub = mockStub;
this.blockingStub = mockBlockingStub;
this.fatalStatusCodes = options.getFatalStatusCodes();
this.maxBackoffMs = options.getRetryBackoffMaxMs();
}

/**
* Initialize RpcResolver resolver.
*/
public void init() throws Exception {
Thread listener = new Thread(() -> {
try {
observeEventStream();
} catch (InterruptedException e) {
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
Thread.currentThread().interrupt();
}
});

listener.setDaemon(true);
listener.start();
retryScheduler.execute(this::observeEventStream);
}

/**
Expand All @@ -141,6 +143,7 @@ public void shutdown() throws Exception {
if (shutdown.getAndSet(true)) {
return;
}
this.retryScheduler.shutdownNow();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to awaitTermination on the retryScheduler

this.connector.shutdown();
}

Expand Down Expand Up @@ -337,63 +340,81 @@ private void restartStream() {
}

/** Contains blocking calls, to be used concurrently. */
private void observeEventStream() throws InterruptedException {
private void observeEventStream() {

log.info("Initializing event stream observer");

// outer loop for re-issuing the stream request
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
while (!shutdown.get()) {

log.debug("Initializing event stream request");
restartStream();
// inner loop for handling messages
while (!shutdown.get()) {
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();
if (taken.isComplete()) {
log.debug("Event stream completed, will reconnect");
this.handleErrorOrComplete(false);
// The stream is complete, we still try to reconnect
break;
try {
// explicit backoff after stream errors to prevent tight loops when errors are returned immediately
// (e.g., by intervening proxies like Envoy)
if (shouldThrottle.getAndSet(false)) {
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
try {
retryScheduler.schedule(this::observeEventStream, this.maxBackoffMs, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this schedule fails, we will still return and never attempt to call observeEventStream again. Should we only return in the non-exceptional case?

} catch (RejectedExecutionException e) {
log.debug("Retry scheduling rejected, most likely shutdown was invoked", e);
}
return;
}

Throwable streamException = taken.getError();
if (streamException != null) {
if (streamException instanceof StatusRuntimeException
&& fatalStatusCodes.contains(((StatusRuntimeException) streamException)
.getStatus()
.getCode()
.name())
&& !successfulConnection.get()) {
log.debug(
"Fatal error code received: {}",
((StatusRuntimeException) streamException)
.getStatus()
.getCode());
this.handleErrorOrComplete(true);
} else {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
log.debug("Initializing event stream request");
restartStream();
// inner loop for handling messages
while (!shutdown.get()) {
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();
if (taken.isComplete()) {
log.debug("Event stream completed, will reconnect");
this.handleErrorOrComplete(false);
shouldThrottle.set(true);
// the stream is complete, we still try to reconnect
break;
}
break;
}

successfulConnection.set(true);
final EventStreamResponse response = taken.getResponse();
log.debug("Got stream response: {}", response);

switch (response.getType()) {
case Constants.CONFIGURATION_CHANGE:
this.handleConfigurationChangeEvent(response);
break;
case Constants.PROVIDER_READY:
this.handleProviderReadyEvent();
Throwable streamException = taken.getError();
if (streamException != null) {
if (streamException instanceof StatusRuntimeException
&& fatalStatusCodes.contains(((StatusRuntimeException) streamException)
.getStatus()
.getCode()
.name())
&& !successfulConnection.get()) {
log.debug(
"Fatal error code received: {}",
((StatusRuntimeException) streamException)
.getStatus()
.getCode());
this.handleErrorOrComplete(true);
} else {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
this.handleErrorOrComplete(false);
}
shouldThrottle.set(true);
break;
default:
log.debug("Unhandled event type {}", response.getType());
}

successfulConnection.set(true);
final EventStreamResponse response = taken.getResponse();
log.debug("Got stream response: {}", response);

switch (response.getType()) {
case Constants.CONFIGURATION_CHANGE:
this.handleConfigurationChangeEvent(response);
break;
case Constants.PROVIDER_READY:
this.handleProviderReadyEvent();
break;
default:
log.debug("Unhandled event type {}", response.getType());
}
}
} catch (InterruptedException ie) {
log.debug("Stream observer interrupted, most likely shutdown was invoked", ie);
Thread.currentThread().interrupt();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static Object getPrivateField(Object instance, String fieldName) {
}

@Test
void syncInitError_DoesNotBusyWait() throws Exception {
void syncInitError_RetriesWithNonBlockingBackoff() throws Exception {
// make sure we do not spin in a busy loop on immediately errors

int maxBackoffMs = 1000;
Expand All @@ -201,7 +201,7 @@ void syncInitError_DoesNotBusyWait() throws Exception {
}

@Test
void asyncInitError_DoesNotBusyWait() throws Exception {
void asyncInitError_RetriesWithNonBlockingBackoff() throws Exception {
// make sure we do not spin in a busy loop on async errors

int maxBackoffMs = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class RpcResolverTest {
private ChannelConnector mockConnector;
private ServiceBlockingStub blockingStub;
private ServiceStub stub;
private ServiceStub errorStub;
private QueueingStreamObserver<EventStreamResponse> observer;
private TriConsumer<ProviderEvent, ProviderEventDetails, Structure> consumer;
private CountDownLatch latch; // used to wait for observer to be initialized
Expand Down Expand Up @@ -61,6 +62,29 @@ public Void answer(InvocationOnMock invocation) {
})
.when(stub)
.eventStream(any(), any()); // Mock the initialize method

// stub that immediately fires onError on every eventStream call
errorStub = mock(ServiceStub.class);
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
doAnswer((Answer<Void>) invocation -> {
@SuppressWarnings("unchecked")
QueueingStreamObserver<EventStreamResponse> obs = (QueueingStreamObserver<EventStreamResponse>)
invocation.getArguments()[1];
latch.countDown();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the latch counted down here? Wouldn't it make more sense to count it down after the onError was invoked

// immediately fire error on a separate thread
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting 10ms is not exactly immediately

new Thread(() -> {
try {
Thread.sleep(10);
obs.onError(new Exception("immediate error"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the obs.onError out of the try block, so that we still emit the error even if the thread gets interrupted?

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.start();
return null;
})
.when(errorStub)
.eventStream(any(), any());
}

@Test
Expand Down Expand Up @@ -102,8 +126,13 @@ void onNextWithChangedRunsConsumerWithChanged() throws Exception {

@Test
void onCompletedRerunsStreamWithError() throws Exception {
RpcResolver resolver =
new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector);
RpcResolver resolver = new RpcResolver(
FlagdOptions.builder().retryBackoffMaxMs(100).build(),
null,
consumer,
stub,
blockingStub,
mockConnector);
resolver.init();
latch.await();

Expand All @@ -118,8 +147,13 @@ void onCompletedRerunsStreamWithError() throws Exception {

@Test
void onErrorRunsConsumerWithError() throws Exception {
RpcResolver resolver =
new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector);
RpcResolver resolver = new RpcResolver(
FlagdOptions.builder().retryBackoffMaxMs(100).build(),
null,
consumer,
stub,
blockingStub,
mockConnector);
resolver.init();
latch.await();

Expand All @@ -131,4 +165,51 @@ void onErrorRunsConsumerWithError() throws Exception {
// should have restarted the stream (2 calls)
await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any()));
}

@Test
void onError_RetriesWithNonBlockingBackoff() throws Exception {
// make sure we do not spin in a busy loop on immediate errors
int maxBackoffMs = 1000;
RpcResolver resolver = new RpcResolver(
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
null,
consumer,
errorStub,
blockingStub,
mockConnector);
resolver.init();
latch.await();

// wait 1.5x our delay for retries
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2));

// should have retried the stream (2 calls); initial + 1 retry
// it's very important that the retry count is low, to confirm no busy-loop
verify(errorStub, times(2)).eventStream(any(), any());
}

@Test
void onCompleted_RetriesWithNonBlockingBackoff() throws Exception {
// make sure we do not spin in a busy loop on stream completion
int maxBackoffMs = 1000;
RpcResolver resolver = new RpcResolver(
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
null,
consumer,
stub,
blockingStub,
mockConnector);
resolver.init();
latch.await();

// fire completion
observer.onCompleted();

// wait 1.5x our delay for retries
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2));

// should have retried the stream (2 calls); initial + 1 retry
// it's very important that the retry count is low, to confirm no busy-loop
verify(stub, times(2)).eventStream(any(), any());
}
}
Loading