-
Notifications
You must be signed in to change notification settings - Fork 31
fix(flagd): back off before stream reconnect #391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -65,10 +65,12 @@ def __init__( | |||||
| logger.debug(self.config.fatal_status_codes) | ||||||
|
|
||||||
| self.retry_grace_period = config.retry_grace_period | ||||||
| self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001 | ||||||
| self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 | ||||||
| self.deadline = config.deadline_ms * 0.001 | ||||||
| self.connected = False | ||||||
| self._is_fatal = False | ||||||
| self._shutdown_event = threading.Event() | ||||||
| self.channel = self._generate_channel(config) | ||||||
| self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) | ||||||
|
|
||||||
|
|
@@ -135,6 +137,7 @@ def initialize(self, evaluation_context: EvaluationContext) -> None: | |||||
|
|
||||||
| def shutdown(self) -> None: | ||||||
| self.active = False | ||||||
| self._shutdown_event.set() | ||||||
| self.channel.unsubscribe(self._state_change_callback) | ||||||
| self.channel.close() | ||||||
| if self.timer and self.timer.is_alive(): | ||||||
|
|
@@ -145,6 +148,7 @@ def shutdown(self) -> None: | |||||
|
|
||||||
| def connect(self) -> None: | ||||||
| self.active = True | ||||||
| self._shutdown_event.clear() | ||||||
|
|
||||||
| # Run monitoring in a separate thread | ||||||
| self.monitor_thread = threading.Thread( | ||||||
|
|
@@ -215,6 +219,37 @@ def emit_error(self) -> None: | |||||
| ) | ||||||
| ) | ||||||
|
|
||||||
| def _wait_before_reconnect(self) -> None: | ||||||
| self._shutdown_event.wait(self.retry_backoff_max_seconds) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned in the initialization, using the maximum backoff for every retry significantly impacts availability. Consider using the initial backoff duration instead. Note that you will also need to update the corresponding regression tests in
Suggested change
|
||||||
|
|
||||||
| def _handle_rpc_error(self, e: grpc.RpcError) -> bool: | ||||||
| # although it seems like this error log is not interesting, without it, the retry is not working as expected | ||||||
| logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") | ||||||
| if e.code().name in self.config.fatal_status_codes: | ||||||
| self._is_fatal = True | ||||||
| self.active = False | ||||||
| self.emit_provider_error( | ||||||
| ProviderEventDetails( | ||||||
| message=f"Fatal gRPC status code: {e.code()}", | ||||||
| error_code=ErrorCode.PROVIDER_FATAL, | ||||||
| ) | ||||||
| ) | ||||||
| return True | ||||||
| return False | ||||||
|
|
||||||
| def _handle_event_stream_message( | ||||||
| self, message: evaluation_pb2.EventStreamResponse | ||||||
| ) -> None: | ||||||
| if message.type == "provider_ready": | ||||||
| self.emit_provider_ready( | ||||||
| ProviderEventDetails(message="gRPC sync connection established") | ||||||
| ) | ||||||
| self.connected = True | ||||||
| elif message.type == "configuration_change": | ||||||
| msg_dict = MessageToDict(message) | ||||||
| data = msg_dict.get("data", {}) | ||||||
| self.handle_changed_flags(data) | ||||||
|
|
||||||
| def listen(self) -> None: | ||||||
| logger.debug("gRPC starting listener thread") | ||||||
| call_args: GrpcMultiCallableArgs = {"wait_for_ready": True} | ||||||
|
|
@@ -227,38 +262,20 @@ def listen(self) -> None: | |||||
| try: | ||||||
| logger.debug("Setting up gRPC sync flags connection") | ||||||
| for message in self.stub.EventStream(request, **call_args): | ||||||
| if message.type == "provider_ready": | ||||||
| self.emit_provider_ready( | ||||||
| ProviderEventDetails( | ||||||
| message="gRPC sync connection established" | ||||||
| ) | ||||||
| ) | ||||||
| self.connected = True | ||||||
| elif message.type == "configuration_change": | ||||||
| msg_dict = MessageToDict(message) | ||||||
| data = msg_dict.get("data", {}) | ||||||
| self.handle_changed_flags(data) | ||||||
| self._handle_event_stream_message(message) | ||||||
|
|
||||||
| if not self.active: | ||||||
| logger.info("Terminating gRPC sync thread") | ||||||
| return | ||||||
| except grpc.RpcError as e: # noqa: PERF203 | ||||||
| # although it seems like this error log is not interesting, without it, the retry is not working as expected | ||||||
| logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") | ||||||
| if e.code().name in self.config.fatal_status_codes: | ||||||
| self._is_fatal = True | ||||||
| self.active = False | ||||||
| self.emit_provider_error( | ||||||
| ProviderEventDetails( | ||||||
| message=f"Fatal gRPC status code: {e.code()}", | ||||||
| error_code=ErrorCode.PROVIDER_FATAL, | ||||||
| ) | ||||||
| ) | ||||||
| except grpc.RpcError as e: | ||||||
| if self._handle_rpc_error(e): | ||||||
| return | ||||||
| except ParseError: | ||||||
| logger.exception( | ||||||
| f"Could not parse flag data using flagd syntax: {message=}" | ||||||
| ) | ||||||
| if self.active: | ||||||
| self._wait_before_reconnect() | ||||||
|
|
||||||
| def handle_changed_flags(self, data: typing.Any) -> None: | ||||||
| changed_flags = list(data.get("flags", {}).keys()) | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -44,7 +44,7 @@ def __init__( | |||||
| self.channel = self._generate_channel(config) | ||||||
| self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) | ||||||
| self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 | ||||||
| self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001 | ||||||
| self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001 | ||||||
| self.retry_grace_period = config.retry_grace_period | ||||||
| self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 | ||||||
| self.deadline = config.deadline_ms * 0.001 | ||||||
|
|
@@ -56,6 +56,7 @@ def __init__( | |||||
|
|
||||||
| self.connected = False | ||||||
| self._is_fatal = False | ||||||
| self._shutdown_event = threading.Event() | ||||||
| self.thread: threading.Thread | None = None | ||||||
| self.timer: threading.Timer | None = None | ||||||
|
|
||||||
|
|
@@ -129,6 +130,7 @@ def initialize(self, context: EvaluationContext) -> None: | |||||
|
|
||||||
| def connect(self) -> None: | ||||||
| self.active = True | ||||||
| self._shutdown_event.clear() | ||||||
|
|
||||||
| # Run monitoring in a separate thread | ||||||
| self.monitor_thread = threading.Thread( | ||||||
|
|
@@ -199,6 +201,7 @@ def emit_error(self) -> None: | |||||
|
|
||||||
| def shutdown(self) -> None: | ||||||
| self.active = False | ||||||
| self._shutdown_event.set() | ||||||
| self.channel.close() | ||||||
|
|
||||||
| def _create_request_args(self) -> dict: | ||||||
|
|
@@ -283,6 +286,9 @@ def _handle_rpc_error(self, e: grpc.RpcError) -> bool: | |||||
| return True | ||||||
| return False | ||||||
|
|
||||||
| def _wait_before_reconnect(self) -> None: | ||||||
| self._shutdown_event.wait(self.retry_backoff_max_seconds) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the maximum backoff duration (
Suggested change
|
||||||
|
|
||||||
| def listen(self) -> None: | ||||||
| call_args = self.generate_grpc_call_args() | ||||||
| request_args = self._create_request_args() | ||||||
|
|
@@ -295,7 +301,7 @@ def listen(self) -> None: | |||||
| for flag_rsp in self.stub.SyncFlags(request, **call_args): | ||||||
| if self._handle_flag_response(flag_rsp, context_values_response): | ||||||
| return | ||||||
| except grpc.RpcError as e: # noqa: PERF203 | ||||||
| except grpc.RpcError as e: | ||||||
| if self._handle_rpc_error(e): | ||||||
| return | ||||||
| except json.JSONDecodeError: | ||||||
|
|
@@ -304,6 +310,8 @@ def listen(self) -> None: | |||||
| ) | ||||||
| except ParseError: | ||||||
| logger.exception("Could not parse flag data using flagd syntax") | ||||||
| if self.active: | ||||||
| self._wait_before_reconnect() | ||||||
|
|
||||||
| def generate_grpc_call_args(self) -> GrpcMultiCallableArgs: | ||||||
| call_args: GrpcMultiCallableArgs = {"wait_for_ready": True} | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import unittest | ||
| from unittest.mock import MagicMock, Mock, patch | ||
|
|
||
| import grpc | ||
| from grpc import Channel | ||
|
|
||
| from openfeature.contrib.provider.flagd.config import CacheType, Config | ||
| from openfeature.contrib.provider.flagd.resolvers.grpc import GrpcResolver | ||
|
|
||
|
|
||
| class FakeRpcError(grpc.RpcError): | ||
| def code(self): | ||
| return grpc.StatusCode.UNAVAILABLE | ||
|
|
||
| def details(self): | ||
| return "stream unavailable" | ||
|
|
||
|
|
||
| class TestGrpcResolver(unittest.TestCase): | ||
| def setUp(self): | ||
| config = Config( | ||
| cache=CacheType.DISABLED, | ||
| deadline_ms=100, | ||
| retry_backoff_ms=1000, | ||
| retry_backoff_max_ms=5000, | ||
| stream_deadline_ms=1000, | ||
| ) | ||
| channel = Mock(spec=Channel) | ||
|
|
||
| with patch( | ||
| "openfeature.contrib.provider.flagd.resolvers.grpc.GrpcResolver._generate_channel", | ||
| return_value=channel, | ||
| ): | ||
| self.grpc_resolver = GrpcResolver( | ||
| config=config, | ||
| emit_provider_ready=Mock(), | ||
| emit_provider_error=Mock(), | ||
| emit_provider_stale=Mock(), | ||
| emit_provider_configuration_changed=Mock(), | ||
| ) | ||
|
|
||
| self.grpc_resolver.stub = MagicMock() | ||
| self.grpc_resolver.active = True | ||
|
|
||
| def test_uses_max_retry_backoff_for_application_level_reconnect_delay(self): | ||
| self.assertEqual(self.grpc_resolver.retry_backoff_max_seconds, 5) | ||
|
|
||
| def test_listen_backs_off_after_rpc_stream_error(self): | ||
| self.grpc_resolver.stub.EventStream = Mock(side_effect=FakeRpcError()) | ||
|
|
||
| with patch.object( | ||
| self.grpc_resolver, | ||
| "_wait_before_reconnect", | ||
| side_effect=lambda: setattr(self.grpc_resolver, "active", False), | ||
| ) as wait_before_reconnect: | ||
| self.grpc_resolver.listen() | ||
|
|
||
| wait_before_reconnect.assert_called_once() | ||
|
|
||
| def test_listen_backs_off_after_stream_completion(self): | ||
| self.grpc_resolver.stub.EventStream = Mock(return_value=iter([])) | ||
|
|
||
| with patch.object( | ||
| self.grpc_resolver, | ||
| "_wait_before_reconnect", | ||
| side_effect=lambda: setattr(self.grpc_resolver, "active", False), | ||
| ) as wait_before_reconnect: | ||
| self.grpc_resolver.listen() | ||
|
|
||
| wait_before_reconnect.assert_called_once() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the maximum backoff duration (
retry_backoff_max_ms) as a constant delay for every reconnect attempt can lead to significant delays in recovery from transient failures. It is generally better to use the initial backoff duration (retry_backoff_ms) or implement an exponential backoff strategy. Since gRPC already handles connection-level backoff internally, a small application-level delay is sufficient to prevent tight loops while allowing faster recovery.