88from typing import Any
99
1010import paho .mqtt .client as mqtt
11+ from paho .mqtt .enums import MQTTErrorCode
1112
1213# Mypy is not seeing this for some reason. It wants me to use the depreciated ReasonCodes
1314from paho .mqtt .reasoncodes import ReasonCode # type: ignore
@@ -80,7 +81,7 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None:
8081 self ._mutex = Lock ()
8182 self ._decoder : Decoder = create_mqtt_decoder (device_info .device .local_key )
8283 self ._encoder : Encoder = create_mqtt_encoder (device_info .device .local_key )
83- self .previous_attempt_was_subscribe = False
84+ self .received_message_since_last_disconnect = False
8485 self ._topic = f"rr/m/o/{ self ._mqtt_user } /{ self ._hashed_user } /{ self .device_info .device .duid } "
8586
8687 def _mqtt_on_connect (
@@ -113,7 +114,7 @@ def _mqtt_on_connect(
113114 connection_queue .set_result (True )
114115
115116 def _mqtt_on_message (self , * args , ** kwargs ):
116- self .previous_attempt_was_subscribe = False
117+ self .received_message_since_last_disconnect = False
117118 client , __ , msg = args
118119 try :
119120 messages = self ._decoder (msg .payload )
@@ -197,7 +198,7 @@ def _send_msg_raw(self, msg: bytes) -> None:
197198 if info .rc != mqtt .MQTT_ERR_SUCCESS :
198199 raise RoborockException (f"Failed to publish ({ mqtt .error_string (info .rc )} )" )
199200
200- async def unsubscribe (self ):
201+ async def _unsubscribe (self ) -> MQTTErrorCode :
201202 """Unsubscribe from the topic."""
202203 loop = asyncio .get_running_loop ()
203204 (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .unsubscribe , self ._topic )
@@ -209,7 +210,7 @@ async def unsubscribe(self):
209210 self ._logger .info (f"Unsubscribed from topic { self ._topic } " )
210211 return result
211212
212- async def subscribe (self ):
213+ async def _subscribe (self ) -> MQTTErrorCode :
213214 """Subscribe to the topic."""
214215 loop = asyncio .get_running_loop ()
215216 (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .subscribe , self ._topic )
@@ -221,37 +222,38 @@ async def subscribe(self):
221222 self ._logger .info (f"Subscribed to topic { self ._topic } " )
222223 return result
223224
224- async def reconnect (self ) -> None :
225+ async def _reconnect (self ) -> None :
225226 """Reconnect to the MQTT broker."""
226227 await self .async_disconnect ()
227228 await self .async_connect ()
228229
229- async def validate_connection (self ) -> None :
230+ async def _validate_connection (self ) -> None :
230231 """Override the default validate connection to try to re-subscribe rather than disconnect.
231232 When something seems to be wrong with our connection, we should follow the following steps:
232233 1. Try to unsubscribe and resubscribe from the topic.
233234 2. If we don't end up getting a message, we should completely disconnect and reconnect to the MQTT broker.
234235 3. We will continue to try to disconnect and reconnect until we get a message.
235236 4. If we get a message, the next time connection is lost, We will go back to step 1.
236237 """
238+ # If we should no longer keep the current connection alive...
237239 if not self .should_keepalive ():
238240 self ._logger .info ("Resetting Roborock connection due to keepalive timeout" )
239- if self .previous_attempt_was_subscribe :
241+ if self .received_message_since_last_disconnect :
240242 # If we have already tried to unsub and resub, and we are still in this state,
241243 # we should try to reconnect.
242- return await self .reconnect ()
244+ return await self ._reconnect ()
243245 try :
244246 # Mark that we have tried to unsubscribe and resubscribe
245- self .previous_attempt_was_subscribe = True
246- if await self .unsubscribe () = = 0 :
247+ self .received_message_since_last_disconnect = True
248+ if await self ._unsubscribe () ! = 0 :
247249 # If we fail to unsubscribe, reconnect to the broker
248- return await self .reconnect ()
249- if await self .subscribe () = = 0 :
250+ return await self ._reconnect ()
251+ if await self ._subscribe () ! = 0 :
250252 # If we fail to subscribe, reconnected to the broker.
251- return await self .reconnect ()
253+ return await self ._reconnect ()
252254
253255 except Exception : # noqa
254256 # If we get any errors at all, we should just reconnect.
255- return await self .reconnect ()
257+ return await self ._reconnect ()
256258 # Call connect to make sure everything is still in a good state.
257259 await self .async_connect ()
0 commit comments