Skip to content

Commit 10d6531

Browse files
committed
chore: init commit
1 parent a53e73c commit 10d6531

File tree

2 files changed

+230
-0
lines changed

2 files changed

+230
-0
lines changed

roborock/mqtt_manager.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import dataclasses
5+
import logging
6+
from collections.abc import Coroutine
7+
from typing import Callable, Self
8+
from urllib.parse import urlparse
9+
10+
import aiomqtt
11+
from aiomqtt import TLSParameters
12+
13+
from roborock import RoborockException, UserData
14+
from roborock.protocol import MessageParser, md5hex
15+
16+
from .containers import DeviceData
17+
18+
LOGGER = logging.getLogger(__name__)
19+
20+
21+
@dataclasses.dataclass
22+
class ClientWrapper:
23+
publish_function: Coroutine[None]
24+
unsubscribe_function: Coroutine[None]
25+
subscribe_function: Coroutine[None]
26+
27+
28+
class RoborockMqttManager:
29+
client_wrappers: dict[str, ClientWrapper] = {}
30+
_instance: Self = None
31+
32+
def __new__(cls) -> RoborockMqttManager:
33+
if cls._instance is None:
34+
cls._instance = super().__new__(cls)
35+
return cls._instance
36+
37+
async def connect(self, user_data: UserData):
38+
# Add some kind of lock so we don't try to connect if we are already trying to connect the same account.
39+
if user_data.rriot.u not in self.client_wrappers:
40+
loop = asyncio.get_event_loop()
41+
loop.create_task(self._new_connect(user_data))
42+
43+
async def _new_connect(self, user_data: UserData):
44+
rriot = user_data.rriot
45+
mqtt_user = rriot.u
46+
hashed_user = md5hex(mqtt_user + ":" + rriot.k)[2:10]
47+
url = urlparse(rriot.r.m)
48+
if not isinstance(url.hostname, str):
49+
raise RoborockException("Url parsing returned an invalid hostname")
50+
mqtt_host = str(url.hostname)
51+
mqtt_port = url.port
52+
53+
mqtt_password = rriot.s
54+
hashed_password = md5hex(mqtt_password + ":" + rriot.k)[16:]
55+
LOGGER.debug("Connecting to %s for %s", mqtt_host, mqtt_user)
56+
57+
async with aiomqtt.Client(
58+
hostname=mqtt_host,
59+
port=mqtt_port,
60+
username=hashed_user,
61+
password=hashed_password,
62+
keepalive=60,
63+
tls_params=TLSParameters(),
64+
) as client:
65+
# TODO: Handle logic for when client loses connection
66+
LOGGER.info("Connected to %s for %s", mqtt_host, mqtt_user)
67+
callbacks: dict[str, Callable] = {}
68+
device_map = {}
69+
70+
async def publish(device: DeviceData, payload: bytes):
71+
await client.publish(f"rr/m/i/{mqtt_user}/{hashed_user}/{device.device.duid}", payload=payload)
72+
73+
async def subscribe(device: DeviceData, callback):
74+
LOGGER.debug(f"Subscribing to rr/m/o/{mqtt_user}/{hashed_user}/{device.device.duid}")
75+
await client.subscribe(f"rr/m/o/{mqtt_user}/{hashed_user}/{device.device.duid}")
76+
LOGGER.debug(f"Subscribed to rr/m/o/{mqtt_user}/{hashed_user}/{device.device.duid}")
77+
callbacks[device.device.duid] = callback
78+
device_map[device.device.duid] = device
79+
return
80+
81+
async def unsubscribe(device: DeviceData):
82+
await client.unsubscribe(f"rr/m/o/{mqtt_user}/{hashed_user}/{device.device.duid}")
83+
84+
self.client_wrappers[user_data.rriot.u] = ClientWrapper(
85+
publish_function=publish, unsubscribe_function=unsubscribe, subscribe_function=subscribe
86+
)
87+
async for message in client.messages:
88+
try:
89+
device_id = message.topic.value.split("/")[-1]
90+
device = device_map[device_id]
91+
message = MessageParser.parse(message.payload, device.device.local_key)
92+
callbacks[device_id](message)
93+
except Exception:
94+
...
95+
96+
async def disconnect(self, user_data: UserData):
97+
await self.client_wrappers[user_data.rriot.u].disconnect()
98+
99+
async def subscribe(self, user_data: UserData, device: DeviceData, callback):
100+
if user_data.rriot.u not in self.client_wrappers:
101+
await self.connect(user_data)
102+
# add some kind of lock to make sure we don't subscribe until the connection is successful
103+
await asyncio.sleep(2)
104+
await self.client_wrappers[user_data.rriot.u].subscribe_function(device, callback)
105+
106+
async def unsubscribe(self):
107+
pass
108+
109+
async def publish(self, user_data: UserData, device, payload: bytes):
110+
LOGGER.debug("Publishing topic for %s, Message: %s", device.device.duid, payload)
111+
if user_data.rriot.u not in self.client_wrappers:
112+
await self.connect(user_data)
113+
await self.client_wrappers[user_data.rriot.u].publish_function(device, payload)

roborock/roborock_device.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import base64
2+
import json
3+
import logging
4+
import math
5+
import secrets
6+
import time
7+
8+
from . import RoborockCommand
9+
from .containers import DeviceData, UserData
10+
from .mqtt_manager import RoborockMqttManager
11+
from .protocol import MessageParser, Utils
12+
from .roborock_message import RoborockMessage, RoborockMessageProtocol
13+
from .util import RoborockLoggerAdapter, get_next_int
14+
15+
_LOGGER = logging.getLogger(__name__)
16+
17+
18+
class RoborockDevice:
19+
def __init__(self, user_data: UserData, device_info: DeviceData):
20+
self.user_data = user_data
21+
self.device_info = device_info
22+
self.data = None
23+
self._logger = RoborockLoggerAdapter(device_info.device.name, _LOGGER)
24+
self._mqtt_endpoint = base64.b64encode(Utils.md5(user_data.rriot.k.encode())[8:14]).decode()
25+
self._local_endpoint = "abc"
26+
self._nonce = secrets.token_bytes(16)
27+
self.manager = RoborockMqttManager()
28+
self.update_commands = self.determine_supported_commands()
29+
30+
def determine_supported_commands(self):
31+
# All devices support these
32+
supported_commands = {
33+
RoborockCommand.GET_CONSUMABLE,
34+
RoborockCommand.GET_STATUS,
35+
RoborockCommand.GET_CLEAN_SUMMARY,
36+
}
37+
# Get what features we can from the feature_set info.
38+
39+
# If a command is not described in feature_set, we should just add it anyways and then let it fail on the first call and remove it.
40+
robot_new_features = int(self.device_info.device.feature_set)
41+
new_feature_info_str = self.device_info.device.new_feature_set
42+
if 33554432 & int(robot_new_features):
43+
supported_commands.add(RoborockCommand.GET_DUST_COLLECTION_MODE)
44+
if 2 & int(new_feature_info_str[-8:], 16):
45+
# TODO: May not be needed as i think this can just be found in Status, but just POC
46+
supported_commands.add(RoborockCommand.APP_GET_CLEAN_ESTIMATE_INFO)
47+
return supported_commands
48+
49+
async def connect(self):
50+
"""Connect via MQTT and Local if possible."""
51+
await self.manager.subscribe(self.user_data, self.device_info, self.on_message)
52+
await self.update()
53+
54+
async def update(self):
55+
for cmd in self.update_commands:
56+
await self.send_message(method=cmd)
57+
58+
def _get_payload(
59+
self,
60+
method: RoborockCommand | str,
61+
params: list | dict | int | None = None,
62+
secured=False,
63+
use_cloud: bool = False,
64+
):
65+
timestamp = math.floor(time.time())
66+
request_id = get_next_int(10000, 32767)
67+
inner = {
68+
"id": request_id,
69+
"method": method,
70+
"params": params or [],
71+
}
72+
if secured:
73+
inner["security"] = {
74+
"endpoint": self._mqtt_endpoint if use_cloud else self._local_endpoint,
75+
"nonce": self._nonce.hex().lower(),
76+
}
77+
payload = bytes(
78+
json.dumps(
79+
{
80+
"dps": {"101": json.dumps(inner, separators=(",", ":"))},
81+
"t": timestamp,
82+
},
83+
separators=(",", ":"),
84+
).encode()
85+
)
86+
return request_id, timestamp, payload
87+
88+
async def send_message(
89+
self, method: RoborockCommand | str, params: list | dict | int | None = None, use_cloud: bool = True
90+
):
91+
request_id, timestamp, payload = self._get_payload(method, params, True, use_cloud)
92+
request_protocol = RoborockMessageProtocol.RPC_REQUEST
93+
roborock_message = RoborockMessage(timestamp=timestamp, protocol=request_protocol, payload=payload)
94+
95+
local_key = self.device_info.device.local_key
96+
msg = MessageParser.build(roborock_message, local_key, False)
97+
if use_cloud:
98+
await self.manager.publish(self.user_data, self.device_info, msg)
99+
else:
100+
# Handle doing local commands
101+
pass
102+
103+
def on_message(self, message: RoborockMessage):
104+
# If message is command not supported - remove from self.update_commands
105+
106+
# If message is an error - log it?
107+
108+
# If message is 'ok' - ignore it
109+
110+
# If message is anything else - store ids, and map back to id to determine message type.
111+
# Then update self.data
112+
113+
# If we haven't received a message in X seconds, the device is likely offline. I think we can continue the connection,
114+
# but we should have some way to mark ourselves as unavailable.
115+
116+
# This should also probably be split with on_cloud_message and on_local_message.
117+
print(message)

0 commit comments

Comments
 (0)