1818from aiomqtt import MqttCodeError, MqttError, TLSParameters
1919
2020from roborock.callbacks import CallbackMap
21+ from roborock.diagnostics import Diagnostics
2122
2223from .session import MqttParams, MqttSession, MqttSessionException, MqttSessionUnauthorized
2324
@@ -74,6 +75,7 @@ def __init__(
7475 self._connection_task: asyncio.Task[None] | None = None
7576 self._topic_idle_timeout = topic_idle_timeout
7677 self._idle_timers: dict[str, asyncio.Task[None]] = {}
78+ self._diagnostics = params.diagnostics
7779
7880 @property
7981 def connected(self) -> bool:
@@ -88,24 +90,30 @@ async def start(self) -> None:
8890 handle the failure and retry if desired itself. Once connected,
8991 the session will retry connecting in the background.
9092 """
93+ self._diagnostics.increment("start_attempt")
9194 start_future: asyncio.Future[None] = asyncio.Future()
9295 loop = asyncio.get_event_loop()
9396 self._reconnect_task = loop.create_task(self._run_reconnect_loop(start_future))
9497 try:
9598 await start_future
9699 except MqttCodeError as err:
100+ self._diagnostics.increment(f"start_failure:{err.rc}")
97101 if err.rc == MqttReasonCode.RC_ERROR_UNAUTHORIZED:
98102 raise MqttSessionUnauthorized(f"Authorization error starting MQTT session: {err}") from err
99103 raise MqttSessionException(f"Error starting MQTT session: {err}") from err
100104 except MqttError as err:
105+ self._diagnostics.increment("start_failure:unknown")
101106 raise MqttSessionException(f"Error starting MQTT session: {err}") from err
102107 except Exception as err:
108+ self._diagnostics.increment("start_failure:uncaught")
103109 raise MqttSessionException(f"Unexpected error starting session: {err}") from err
104110 else:
111+ self._diagnostics.increment("start_success")
105112 _LOGGER.debug("MQTT session started successfully")
106113
107114 async def close(self) -> None:
108115 """Cancels the MQTT loop and shutdown the client library."""
116+ self._diagnostics.increment("close")
109117 self._stop = True
110118 tasks = [task for task in [self._connection_task, self._reconnect_task, *self._idle_timers.values()] if task]
111119 self._connection_task = None
@@ -128,6 +136,7 @@ async def restart(self) -> None:
128136 the reconnect loop. This is a no-op if there is no active connection.
129137 """
130138 _LOGGER.info("Forcing MQTT session restart")
139+ self._diagnostics.increment("restart")
131140 if self._connection_task:
132141 self._connection_task.cancel()
133142 else:
@@ -136,6 +145,7 @@ async def restart(self) -> None:
136145 async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) -> None:
137146 """Run the MQTT loop."""
138147 _LOGGER.info("Starting MQTT session")
148+ self._diagnostics.increment("start_loop")
139149 while True:
140150 try:
141151 self._connection_task = asyncio.create_task(self._run_connection(start_future))
@@ -156,6 +166,7 @@ async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) -
156166 _LOGGER.debug("MQTT session closed, stopping retry loop")
157167 return
158168 _LOGGER.info("MQTT session disconnected, retrying in %s seconds", self._backoff.total_seconds())
169+ self._diagnostics.increment("reconnect_wait")
159170 await asyncio.sleep(self._backoff.total_seconds())
160171 self._backoff = min(self._backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_INTERVAL)
161172
@@ -167,17 +178,19 @@ async def _run_connection(self, start_future: asyncio.Future[None] | None) -> No
167178 is lost, this method will exit.
168179 """
169180 try:
170- async with self._mqtt_client(self._params) as client:
171- self._backoff = MIN_BACKOFF_INTERVAL
172- self._healthy = True
173- _LOGGER.info("MQTT Session connected.")
174- if start_future and not start_future.done():
175- start_future.set_result(None)
176-
177- _LOGGER.debug("Processing MQTT messages")
178- async for message in client.messages:
179- _LOGGER.debug("Received message: %s", message)
180- self._listeners(message.topic.value, message.payload)
181+ with self._diagnostics.timer("connection"):
182+ async with self._mqtt_client(self._params) as client:
183+ self._backoff = MIN_BACKOFF_INTERVAL
184+ self._healthy = True
185+ _LOGGER.info("MQTT Session connected.")
186+ if start_future and not start_future.done():
187+ start_future.set_result(None)
188+
189+ _LOGGER.debug("Processing MQTT messages")
190+ async for message in client.messages:
191+ _LOGGER.debug("Received message: %s", message)
192+ with self._diagnostics.timer("dispatch_message"):
193+ self._listeners(message.topic.value, message.payload)
181194 except MqttError as err:
182195 if start_future and not start_future.done():
183196 _LOGGER.info("MQTT error starting session: %s", err)
@@ -219,6 +232,7 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
219232 async with self._client_lock:
220233 self._client = client
221234 for topic in self._listeners.keys():
235+ self._diagnostics.increment("resubscribe")
222236 _LOGGER.debug("Re-establishing subscription to topic %s", topic)
223237 # TODO: If this fails it will break the whole connection. Make
224238 # this retry again in the background with backoff.
@@ -243,6 +257,7 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
243257
244258 # If there is an idle timer for this topic, cancel it (reuse subscription)
245259 if idle_timer := self._idle_timers.pop(topic, None):
260+ self._diagnostics.increment("unsubscribe_idle_cancel")
246261 idle_timer.cancel()
247262 _LOGGER.debug("Cancelled idle timer for topic %s (reused subscription)", topic)
248263
@@ -252,12 +267,14 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
252267 if self._client:
253268 _LOGGER.debug("Establishing subscription to topic %s", topic)
254269 try:
255- await self._client.subscribe(topic)
270+ with self._diagnostics.timer("subscribe"):
271+ await self._client.subscribe(topic)
256272 except MqttError as err:
257273 # Clean up the callback if subscription fails
258274 unsub()
259275 raise MqttSessionException(f"Error subscribing to topic: {err}") from err
260276 else:
277+ self._diagnostics.increment("subscribe_pending")
261278 _LOGGER.debug("Client not connected, will establish subscription later")
262279
263280 def schedule_unsubscribe():
@@ -283,9 +300,11 @@ async def idle_unsubscribe():
283300 self._idle_timers[topic] = task
284301
285302 def delayed_unsub():
303+ self._diagnostics.increment("unsubscribe")
286304 unsub() # Remove the callback from CallbackMap
287305 # If no more callbacks for this topic, start idle timer
288306 if not self._listeners.get_callbacks(topic):
307+ self._diagnostics.increment("unsubscribe_idle_start")
289308 schedule_unsubscribe()
290309
291310 return delayed_unsub
@@ -299,7 +318,8 @@ async def publish(self, topic: str, message: bytes) -> None:
299318 raise MqttSessionException("Could not publish message, MQTT client not connected")
300319 client = self._client
301320 try:
302- await client.publish(topic, message)
321+ with self._diagnostics.timer("publish"):
322+ await client.publish(topic, message)
303323 except MqttError as err:
304324 raise MqttSessionException(f"Error publishing message: {err}") from err
305325
@@ -312,11 +332,12 @@ class LazyMqttSession(MqttSession):
312332 is made.
313333 """
314334
315- def __init__(self, session: RoborockMqttSession) -> None:
335+ def __init__(self, session: RoborockMqttSession, diagnostics: Diagnostics ) -> None:
316336 """Initialize the lazy session with an existing session."""
317337 self._lock = asyncio.Lock()
318338 self._started = False
319339 self._session = session
340+ self._diagnostics = diagnostics
320341
321342 @property
322343 def connected(self) -> bool:
@@ -327,6 +348,7 @@ async def _maybe_start(self) -> None:
327348 """Start the MQTT session if not already started."""
328349 async with self._lock:
329350 if not self._started:
351+ self._diagnostics.increment("start")
330352 await self._session.start()
331353 self._started = True
332354
@@ -377,4 +399,4 @@ async def create_lazy_mqtt_session(params: MqttParams) -> MqttSession:
377399 This function is a factory for creating an MQTT session that will
378400 only connect when the first attempt to subscribe or publish is made.
379401 """
380- return LazyMqttSession(RoborockMqttSession(params))
402+ return LazyMqttSession(RoborockMqttSession(params), diagnostics=params.diagnostics.subkey("lazy_mqtt") )
0 commit comments