@@ -82,7 +82,10 @@ def _get_config(self, last_modified: Optional[float] = None):
8282 json_config = json .dumps (self ._config )
8383 self ._local_bucketing .store_config (json_config )
8484 if not self ._options .disable_realtime_updates :
85- if self ._sse_manager is None :
85+ if (
86+ self ._sse_manager is None
87+ or not self ._sse_manager .client .is_connected ()
88+ ):
8689 self ._sse_manager = SSEManager (
8790 self .sse_state ,
8891 self .sse_error ,
@@ -128,9 +131,9 @@ def run(self):
128131 time .sleep (self ._options .config_polling_interval_ms / 1000.0 )
129132
130133 def sse_message (self , message : ld_eventsource .actions .Event ):
131- if self . _sse_connected is False :
132- self ._sse_connected = True
133- logger . info ( "DevCycle: Connected to SSE stream" )
134+ # Received a message from the SSE stream but our sse_connected is False, so we need to set it to True
135+ if not self ._sse_connected :
136+ self . sse_state ( None )
134137 logger .info (f"DevCycle: Received message: { message .data } " )
135138 sse_message = json .loads (message .data )
136139 dvc_data = json .loads (sse_message .get ("data" ))
@@ -143,11 +146,13 @@ def sse_message(self, message: ld_eventsource.actions.Event):
143146 self ._get_config (dvc_data ["lastModified" ] / 1000.0 )
144147
145148 def sse_error (self , error : ld_eventsource .actions .Fault ):
149+ self ._sse_connected = False
146150 logger .debug (f"DevCycle: Received SSE error: { error } " )
147151
148- def sse_state (self , state : ld_eventsource .actions .Start ):
149- self ._sse_connected = True
150- logger .info ("DevCycle: Connected to SSE stream" )
152+ def sse_state (self , state : Optional [ld_eventsource .actions .Start ]):
153+ if not self ._sse_connected :
154+ self ._sse_connected = True
155+ logger .info ("DevCycle: Connected to SSE stream" )
151156
152157 def close (self ):
153158 self ._polling_enabled = False
0 commit comments