@@ -197,32 +197,61 @@ def _send_msg_raw(self, msg: bytes) -> None:
197197 if info .rc != mqtt .MQTT_ERR_SUCCESS :
198198 raise RoborockException (f"Failed to publish ({ mqtt .error_string (info .rc )} )" )
199199
200- async def validate_connection (self ) -> None :
201- """Override the default validate connection to try to re-subscribe rather than disconnect."""
202- if self .previous_attempt_was_subscribe :
203- # If we have already tried to unsub and resub, and we are still in this state,
204- # we should just do the normal validate connection.
205- return await super ().validate_connection ()
206- try :
207- if not self .should_keepalive ():
208- self .previous_attempt_was_subscribe = True
209- loop = asyncio .get_running_loop ()
200+ async def unsubscribe (self ):
201+ """Unsubscribe from the topic."""
202+ loop = asyncio .get_running_loop ()
203+ (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .unsubscribe , self ._topic )
210204
211- self ._logger .info ("Resetting Roborock connection due to keepalive timeout" )
212- (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .unsubscribe , self ._topic )
205+ if result != 0 :
206+ message = f"Failed to unsubscribe ({ mqtt .error_string (result )} )"
207+ self ._logger .error (message )
208+ else :
209+ self ._logger .info (f"Unsubscribed from topic { self ._topic } " )
210+ return result
213211
214- if result != 0 :
215- message = f"Failed to unsubscribe ({ mqtt .error_string (result )} )"
216- self ._logger .error (message )
217- return await super ().validate_connection ()
218- (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .subscribe , self ._topic )
212+ async def subscribe (self ):
213+ """Subscribe to the topic."""
214+ loop = asyncio .get_running_loop ()
215+ (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .subscribe , self ._topic )
219216
220- if result != 0 :
221- message = f"Failed to subscribe ({ mqtt .error_string (result )} )"
222- self ._logger .error (message )
223- return await super ().validate_connection ()
217+ if result != 0 :
218+ message = f"Failed to subscribe ({ mqtt .error_string (result )} )"
219+ self ._logger .error (message )
220+ else :
221+ self ._logger .info (f"Subscribed to topic { self ._topic } " )
222+ return result
223+
224+ async def reconnect (self ) -> None :
225+ """Reconnect to the MQTT broker."""
226+ await self .async_disconnect ()
227+ await self .async_connect ()
224228
225- self ._logger .info (f"Subscribed to topic { self ._topic } " )
226- except Exception : # noqa
227- return await super ().validate_connection ()
229+ async def validate_connection (self ) -> None :
230+ """Override the default validate connection to try to re-subscribe rather than disconnect.
231+ When something seems to be wrong with our connection, we should follow the following steps:
232+ 1. Try to unsubscribe and resubscribe from the topic.
233+ 2. If we don't end up getting a message, we should completely disconnect and reconnect to the MQTT broker.
234+ 3. We will continue to try to disconnect and reconnect until we get a message.
235+ 4. If we get a message, the next time connection is lost, We will go back to step 1.
236+ """
237+ if not self .should_keepalive ():
238+ self ._logger .info ("Resetting Roborock connection due to keepalive timeout" )
239+ if self .previous_attempt_was_subscribe :
240+ # If we have already tried to unsub and resub, and we are still in this state,
241+ # we should try to reconnect.
242+ return await self .reconnect ()
243+ try :
244+ # Mark that we have tried to unsubscribe and resubscribe
245+ self .previous_attempt_was_subscribe = True
246+ if await self .unsubscribe () == 0 :
247+ # If we fail to unsubscribe, reconnect to the broker
248+ return await self .reconnect ()
249+ if await self .subscribe () == 0 :
250+ # If we fail to subscribe, reconnected to the broker.
251+ return await self .reconnect ()
252+
253+ except Exception : # noqa
254+ # If we get any errors at all, we should just reconnect.
255+ return await self .reconnect ()
256+ # Call connect to make sure everything is still in a good state.
228257 await self .async_connect ()
0 commit comments