-
Notifications
You must be signed in to change notification settings - Fork 75
fix: tight loop in RPC mode event stream reconnect #1789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,10 @@ | |
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.function.Function; | ||
|
|
@@ -62,16 +65,23 @@ | |
| public final class RpcResolver implements Resolver { | ||
| private static final int QUEUE_SIZE = 5; | ||
| private final AtomicBoolean shutdown = new AtomicBoolean(false); | ||
| private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); | ||
| private final AtomicBoolean successfulConnection = new AtomicBoolean(false); | ||
| private final ChannelConnector connector; | ||
| private final Cache cache; | ||
| private final ResolveStrategy strategy; | ||
| private final FlagdOptions options; | ||
| private final int maxBackoffMs; | ||
| private final LinkedBlockingQueue<StreamResponseModel<EventStreamResponse>> incomingQueue; | ||
| private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onProviderEvent; | ||
| private final ServiceStub stub; | ||
| private final ServiceBlockingStub blockingStub; | ||
| private final List<String> fatalStatusCodes; | ||
| private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> { | ||
| Thread t = new Thread(r, "flagd-rpc-retry-scheduler"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); | ||
|
|
||
| /** | ||
| * Resolves flag values using | ||
|
|
@@ -96,6 +106,7 @@ public RpcResolver( | |
| this.blockingStub = | ||
| ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady(); | ||
| this.fatalStatusCodes = options.getFatalStatusCodes(); | ||
| this.maxBackoffMs = options.getRetryBackoffMaxMs(); | ||
| } | ||
|
|
||
| // testing only | ||
|
|
@@ -115,23 +126,14 @@ protected RpcResolver( | |
| this.stub = mockStub; | ||
| this.blockingStub = mockBlockingStub; | ||
| this.fatalStatusCodes = options.getFatalStatusCodes(); | ||
| this.maxBackoffMs = options.getRetryBackoffMaxMs(); | ||
| } | ||
|
|
||
| /** | ||
| * Initialize RpcResolver resolver. | ||
| */ | ||
| public void init() throws Exception { | ||
| Thread listener = new Thread(() -> { | ||
| try { | ||
| observeEventStream(); | ||
| } catch (InterruptedException e) { | ||
| log.warn("gRPC event stream interrupted, flag configurations are stale", e); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| }); | ||
|
|
||
| listener.setDaemon(true); | ||
| listener.start(); | ||
| retryScheduler.execute(this::observeEventStream); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -141,6 +143,7 @@ public void shutdown() throws Exception { | |
| if (shutdown.getAndSet(true)) { | ||
| return; | ||
| } | ||
| this.retryScheduler.shutdownNow(); | ||
| this.connector.shutdown(); | ||
| } | ||
|
|
||
|
|
@@ -337,63 +340,81 @@ private void restartStream() { | |
| } | ||
|
|
||
| /** Contains blocking calls, to be used concurrently. */ | ||
| private void observeEventStream() throws InterruptedException { | ||
| private void observeEventStream() { | ||
|
|
||
| log.info("Initializing event stream observer"); | ||
|
|
||
| // outer loop for re-issuing the stream request | ||
| // "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions | ||
| while (!shutdown.get()) { | ||
|
|
||
| log.debug("Initializing event stream request"); | ||
| restartStream(); | ||
| // inner loop for handling messages | ||
| while (!shutdown.get()) { | ||
| final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take(); | ||
| if (taken.isComplete()) { | ||
| log.debug("Event stream completed, will reconnect"); | ||
| this.handleErrorOrComplete(false); | ||
| // The stream is complete, we still try to reconnect | ||
| break; | ||
| try { | ||
| // explicit backoff after stream errors to prevent tight loops when errors are returned immediately | ||
| // (e.g., by intervening proxies like Envoy) | ||
| if (shouldThrottle.getAndSet(false)) { | ||
| log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); | ||
| try { | ||
| retryScheduler.schedule(this::observeEventStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case this |
||
| } catch (RejectedExecutionException e) { | ||
| log.debug("Retry scheduling rejected, most likely shutdown was invoked", e); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| Throwable streamException = taken.getError(); | ||
| if (streamException != null) { | ||
| if (streamException instanceof StatusRuntimeException | ||
| && fatalStatusCodes.contains(((StatusRuntimeException) streamException) | ||
| .getStatus() | ||
| .getCode() | ||
| .name()) | ||
| && !successfulConnection.get()) { | ||
| log.debug( | ||
| "Fatal error code received: {}", | ||
| ((StatusRuntimeException) streamException) | ||
| .getStatus() | ||
| .getCode()); | ||
| this.handleErrorOrComplete(true); | ||
| } else { | ||
| log.debug( | ||
| "Exception in event stream connection, streamException {}, will reconnect", | ||
| streamException); | ||
| log.debug("Initializing event stream request"); | ||
| restartStream(); | ||
| // inner loop for handling messages | ||
| while (!shutdown.get()) { | ||
| final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take(); | ||
| if (taken.isComplete()) { | ||
| log.debug("Event stream completed, will reconnect"); | ||
| this.handleErrorOrComplete(false); | ||
| shouldThrottle.set(true); | ||
| // the stream is complete, we still try to reconnect | ||
| break; | ||
| } | ||
| break; | ||
| } | ||
|
|
||
| successfulConnection.set(true); | ||
| final EventStreamResponse response = taken.getResponse(); | ||
| log.debug("Got stream response: {}", response); | ||
|
|
||
| switch (response.getType()) { | ||
| case Constants.CONFIGURATION_CHANGE: | ||
| this.handleConfigurationChangeEvent(response); | ||
| break; | ||
| case Constants.PROVIDER_READY: | ||
| this.handleProviderReadyEvent(); | ||
| Throwable streamException = taken.getError(); | ||
| if (streamException != null) { | ||
| if (streamException instanceof StatusRuntimeException | ||
| && fatalStatusCodes.contains(((StatusRuntimeException) streamException) | ||
| .getStatus() | ||
| .getCode() | ||
| .name()) | ||
| && !successfulConnection.get()) { | ||
| log.debug( | ||
| "Fatal error code received: {}", | ||
| ((StatusRuntimeException) streamException) | ||
| .getStatus() | ||
| .getCode()); | ||
| this.handleErrorOrComplete(true); | ||
| } else { | ||
| log.debug( | ||
| "Exception in event stream connection, streamException {}, will reconnect", | ||
| streamException); | ||
| this.handleErrorOrComplete(false); | ||
| } | ||
| shouldThrottle.set(true); | ||
| break; | ||
| default: | ||
| log.debug("Unhandled event type {}", response.getType()); | ||
| } | ||
|
|
||
| successfulConnection.set(true); | ||
| final EventStreamResponse response = taken.getResponse(); | ||
| log.debug("Got stream response: {}", response); | ||
|
|
||
| switch (response.getType()) { | ||
| case Constants.CONFIGURATION_CHANGE: | ||
| this.handleConfigurationChangeEvent(response); | ||
| break; | ||
| case Constants.PROVIDER_READY: | ||
| this.handleProviderReadyEvent(); | ||
| break; | ||
| default: | ||
| log.debug("Unhandled event type {}", response.getType()); | ||
| } | ||
| } | ||
| } catch (InterruptedException ie) { | ||
| log.debug("Stream observer interrupted, most likely shutdown was invoked", ie); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ class RpcResolverTest { | |
| private ChannelConnector mockConnector; | ||
| private ServiceBlockingStub blockingStub; | ||
| private ServiceStub stub; | ||
| private ServiceStub errorStub; | ||
| private QueueingStreamObserver<EventStreamResponse> observer; | ||
| private TriConsumer<ProviderEvent, ProviderEventDetails, Structure> consumer; | ||
| private CountDownLatch latch; // used to wait for observer to be initialized | ||
|
|
@@ -61,6 +62,29 @@ public Void answer(InvocationOnMock invocation) { | |
| }) | ||
| .when(stub) | ||
| .eventStream(any(), any()); // Mock the initialize method | ||
|
|
||
| // stub that immediately fires onError on every eventStream call | ||
| errorStub = mock(ServiceStub.class); | ||
| when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub); | ||
| doAnswer((Answer<Void>) invocation -> { | ||
| @SuppressWarnings("unchecked") | ||
| QueueingStreamObserver<EventStreamResponse> obs = (QueueingStreamObserver<EventStreamResponse>) | ||
| invocation.getArguments()[1]; | ||
| latch.countDown(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the latch counted down here? Wouldn't it make more sense to count it down after the |
||
| // immediately fire error on a separate thread | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Waiting 10ms is not exactly |
||
| new Thread(() -> { | ||
| try { | ||
| Thread.sleep(10); | ||
| obs.onError(new Exception("immediate error")); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move the |
||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| }) | ||
| .start(); | ||
| return null; | ||
| }) | ||
| .when(errorStub) | ||
| .eventStream(any(), any()); | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -102,8 +126,13 @@ void onNextWithChangedRunsConsumerWithChanged() throws Exception { | |
|
|
||
| @Test | ||
| void onCompletedRerunsStreamWithError() throws Exception { | ||
| RpcResolver resolver = | ||
| new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector); | ||
| RpcResolver resolver = new RpcResolver( | ||
| FlagdOptions.builder().retryBackoffMaxMs(100).build(), | ||
| null, | ||
| consumer, | ||
| stub, | ||
| blockingStub, | ||
| mockConnector); | ||
| resolver.init(); | ||
| latch.await(); | ||
|
|
||
|
|
@@ -118,8 +147,13 @@ void onCompletedRerunsStreamWithError() throws Exception { | |
|
|
||
| @Test | ||
| void onErrorRunsConsumerWithError() throws Exception { | ||
| RpcResolver resolver = | ||
| new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector); | ||
| RpcResolver resolver = new RpcResolver( | ||
| FlagdOptions.builder().retryBackoffMaxMs(100).build(), | ||
| null, | ||
| consumer, | ||
| stub, | ||
| blockingStub, | ||
| mockConnector); | ||
| resolver.init(); | ||
| latch.await(); | ||
|
|
||
|
|
@@ -131,4 +165,51 @@ void onErrorRunsConsumerWithError() throws Exception { | |
| // should have restarted the stream (2 calls) | ||
| await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); | ||
| } | ||
|
|
||
| @Test | ||
| void onError_RetriesWithNonBlockingBackoff() throws Exception { | ||
| // make sure we do not spin in a busy loop on immediate errors | ||
| int maxBackoffMs = 1000; | ||
| RpcResolver resolver = new RpcResolver( | ||
| FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), | ||
| null, | ||
| consumer, | ||
| errorStub, | ||
| blockingStub, | ||
| mockConnector); | ||
| resolver.init(); | ||
| latch.await(); | ||
|
|
||
| // wait 1.5x our delay for retries | ||
| Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); | ||
|
|
||
| // should have retried the stream (2 calls); initial + 1 retry | ||
| // it's very important that the retry count is low, to confirm no busy-loop | ||
| verify(errorStub, times(2)).eventStream(any(), any()); | ||
| } | ||
|
|
||
| @Test | ||
| void onCompleted_RetriesWithNonBlockingBackoff() throws Exception { | ||
| // make sure we do not spin in a busy loop on stream completion | ||
| int maxBackoffMs = 1000; | ||
| RpcResolver resolver = new RpcResolver( | ||
| FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), | ||
| null, | ||
| consumer, | ||
| stub, | ||
| blockingStub, | ||
| mockConnector); | ||
| resolver.init(); | ||
| latch.await(); | ||
|
|
||
| // fire completion | ||
| observer.onCompleted(); | ||
|
|
||
| // wait 1.5x our delay for retries | ||
| Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); | ||
|
|
||
| // should have retried the stream (2 calls); initial + 1 retry | ||
| // it's very important that the retry count is low, to confirm no busy-loop | ||
| verify(stub, times(2)).eventStream(any(), any()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also want to
awaitTerminationon theretryScheduler