33This module provides a unified channel interface for V1 protocol devices,
44handling both MQTT and local connections with automatic fallback.
55"""
6-
6+ import asyncio
7+ import datetime
78import logging
89from collections .abc import Callable
910from typing import TypeVar
2223from .channel import Channel
2324from .local_channel import LocalChannel , LocalSession , create_local_session
2425from .mqtt_channel import MqttChannel
25- from .v1_rpc_channel import PickFirstAvailable , V1RpcChannel , create_local_rpc_channel , create_mqtt_rpc_channel
26+ from .v1_rpc_channel import (
27+ PickFirstAvailable ,
28+ V1RpcChannel ,
29+ create_local_rpc_channel ,
30+ create_mqtt_rpc_channel ,
31+ )
2632
2733_LOGGER = logging .getLogger (__name__ )
2834
3238
3339_T = TypeVar ("_T" , bound = RoborockBase )
3440
41+ # Exponential backoff parameters for reconnecting to local
42+ MIN_RECONNECT_INTERVAL = datetime .timedelta (minutes = 1 )
43+ MAX_RECONNECT_INTERVAL = datetime .timedelta (minutes = 10 )
44+ RECONNECT_MULTIPLIER = 1.5
45+ # After this many hours, the network info is refreshed
46+ NETWORK_INFO_REFRESH_INTERVAL = datetime .timedelta (hours = 12 )
47+ # Interval to check that the local connection is healthy
48+ LOCAL_CONNECTION_CHECK_INTERVAL = datetime .timedelta (seconds = 15 )
49+
3550
3651class V1Channel (Channel ):
3752 """Unified V1 protocol channel with automatic MQTT/local connection handling.
@@ -69,6 +84,8 @@ def __init__(
6984 self ._local_unsub : Callable [[], None ] | None = None
7085 self ._callback : Callable [[RoborockMessage ], None ] | None = None
7186 self ._cache = cache
87+ self ._reconnect_task : asyncio .Task [None ] | None = None
88+ self ._last_network_info_refresh : datetime .datetime | None = None
7289
7390 @property
7491 def is_connected (self ) -> bool :
@@ -78,7 +95,7 @@ def is_connected(self) -> bool:
7895 @property
7996 def is_local_connected (self ) -> bool :
8097 """Return whether local connection is available."""
81- return self ._local_unsub is not None
98+ return self ._local_channel is not None and self . _local_channel . is_connected
8299
83100 @property
84101 def is_mqtt_connected (self ) -> bool :
@@ -103,25 +120,37 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
103120 a RoborockException. A local connection failure will not raise an exception,
104121 since the local connection is optional.
105122 """
123+ if self ._callback is not None :
124+ raise ValueError ("Only one subscription allowed at a time" )
106125
107- if self ._mqtt_unsub :
108- raise ValueError ("Already connected to the device" )
109- self ._callback = callback
110-
111- # First establish MQTT connection
112- self ._mqtt_unsub = await self ._mqtt_channel .subscribe (self ._on_mqtt_message )
113- _LOGGER .debug ("V1Channel connected to device %s via MQTT" , self ._device_uid )
114-
115- # Try to establish an optional local connection as well.
126+ # Make an initial, optimistic attempt to connect to local with the cache
116127 try :
117- self . _local_unsub = await self ._local_connect ()
128+ await self ._local_connect (use_cache = True )
118129 except RoborockException as err :
130+ _LOGGER .info ("Hello1" )
119131 _LOGGER .warning ("Could not establish local connection for device %s: %s" , self ._device_uid , err )
120- else :
121- _LOGGER .debug ("Local connection established for device %s" , self ._device_uid )
132+ _LOGGER .info ("Hello2" )
133+
134+ # Start a background task to manage the local connection health
135+ _LOGGER .info ("self._reconnect_task=%s" , self ._reconnect_task )
136+ if self ._reconnect_task is None :
137+ loop = asyncio .get_running_loop ()
138+ self ._reconnect_task = loop .create_task (self ._background_reconnect ())
139+
140+ # We were not able to connect locally, so fallback to MQTT. If this fails
141+ # then we'll fail to subscribe.
142+ if not self .is_local_connected :
143+ if self ._mqtt_unsub is not None :
144+ self ._mqtt_unsub ()
145+ self ._mqtt_unsub = None
146+ self ._mqtt_unsub = await self ._mqtt_channel .subscribe (self ._on_mqtt_message )
147+ _LOGGER .debug ("V1Channel connected to device %s via MQTT" , self ._device_uid )
122148
123149 def unsub () -> None :
124150 """Unsubscribe from all messages."""
151+ if self ._reconnect_task :
152+ self ._reconnect_task .cancel ()
153+ self ._reconnect_task = None
125154 if self ._mqtt_unsub :
126155 self ._mqtt_unsub ()
127156 self ._mqtt_unsub = None
@@ -130,15 +159,16 @@ def unsub() -> None:
130159 self ._local_unsub = None
131160 _LOGGER .debug ("Unsubscribed from device %s" , self ._device_uid )
132161
162+ self ._callback = callback
133163 return unsub
134164
135- async def _get_networking_info (self ) -> NetworkInfo :
165+ async def _get_networking_info (self , * , use_cache : bool = True ) -> NetworkInfo :
136166 """Retrieve networking information for the device.
137167
138168 This is a cloud only command used to get the local device's IP address.
139169 """
140170 cache_data = await self ._cache .get ()
141- if cache_data .network_info and (network_info := cache_data .network_info .get (self ._device_uid )):
171+ if use_cache and cache_data .network_info and (network_info := cache_data .network_info .get (self ._device_uid )):
142172 _LOGGER .debug ("Using cached network info for device %s" , self ._device_uid )
143173 return network_info
144174 try :
@@ -148,24 +178,77 @@ async def _get_networking_info(self) -> NetworkInfo:
148178 except RoborockException as e :
149179 raise RoborockException (f"Network info failed for device { self ._device_uid } " ) from e
150180 _LOGGER .debug ("Network info for device %s: %s" , self ._device_uid , network_info )
181+ self ._last_network_info_refresh = datetime .datetime .now (datetime .timezone .utc )
151182 cache_data .network_info [self ._device_uid ] = network_info
152183 await self ._cache .set (cache_data )
153184 return network_info
154185
155- async def _local_connect (self ) -> Callable [[], None ] :
186+ async def _local_connect (self , * , use_cache : bool = True ) -> None :
156187 """Set up local connection if possible."""
157- _LOGGER .debug ("Attempting to connect to local channel for device %s" , self ._device_uid )
158- networking_info = await self ._get_networking_info ()
188+ if self .is_local_connected :
189+ return
190+ _LOGGER .debug (
191+ "Attempting to connect to local channel for device %s (use_cache=%s)" , self ._device_uid , use_cache
192+ )
193+ networking_info = await self ._get_networking_info (use_cache = use_cache )
159194 host = networking_info .ip
160195 _LOGGER .debug ("Connecting to local channel at %s" , host )
161- self ._local_channel = self ._local_session (host )
196+ # Create a new local channel and connect
197+ local_channel = self ._local_session (host )
162198 try :
163- await self . _local_channel .connect ()
199+ await local_channel .connect ()
164200 except RoborockException as e :
165- self ._local_channel = None
166201 raise RoborockException (f"Error connecting to local device { self ._device_uid } : { e } " ) from e
202+ # Wire up the new channel
203+ self ._local_channel = local_channel
167204 self ._local_rpc_channel = create_local_rpc_channel (self ._local_channel )
168- return await self ._local_channel .subscribe (self ._on_local_message )
205+ self ._local_unsub = await self ._local_channel .subscribe (self ._on_local_message )
206+ _LOGGER .info ("Successfully connected to local device %s" , self ._device_uid )
207+
208+ async def _background_reconnect (self ) -> None :
209+ """Task to run in the background to manage the local connection."""
210+ _LOGGER .debug ("Starting background task to manage local connection for %s" , self ._device_uid )
211+ reconnect_backoff = MIN_RECONNECT_INTERVAL
212+ local_connect_failures = 0
213+
214+ while True :
215+ try :
216+ if self .is_local_connected :
217+ await asyncio .sleep (LOCAL_CONNECTION_CHECK_INTERVAL .total_seconds ())
218+ continue
219+
220+ # Not connected, so wait with backoff before trying to connect.
221+ # The first time through, we don't sleep, we just try to connect.
222+ local_connect_failures += 1
223+ if local_connect_failures > 1 :
224+ await asyncio .sleep (reconnect_backoff .total_seconds ())
225+ reconnect_backoff = min (reconnect_backoff * RECONNECT_MULTIPLIER , MAX_RECONNECT_INTERVAL )
226+
227+ # First failure refreshes cache. Subsequent failures use the cache
228+ # until the refresh interval expires.
229+ use_cache = True
230+ if local_connect_failures == 1 :
231+ use_cache = False
232+ elif self ._last_network_info_refresh and (
233+ datetime .datetime .now (datetime .timezone .utc ) - self ._last_network_info_refresh
234+ > NETWORK_INFO_REFRESH_INTERVAL
235+ ):
236+ use_cache = False
237+
238+ await self ._local_connect (use_cache = use_cache )
239+ # Reset backoff and failures on success
240+ reconnect_backoff = MIN_RECONNECT_INTERVAL
241+ local_connect_failures = 0
242+
243+ except asyncio .CancelledError :
244+ _LOGGER .debug ("Background reconnect task cancelled" )
245+ if self ._local_channel :
246+ self ._local_channel .close ()
247+ return
248+ except RoborockException as err :
249+ _LOGGER .debug ("Background reconnect failed: %s" , err )
250+ except Exception :
251+ _LOGGER .exception ("Unhandled exception in background reconnect task" )
169252
170253 def _on_mqtt_message (self , message : RoborockMessage ) -> None :
171254 """Handle incoming MQTT messages."""
0 commit comments