Skip to content

Commit 364e88e

Browse files
committed
feat: Simplify rpc handling and tests
1 parent 06b9178 commit 364e88e

File tree

2 files changed

+40
-33
lines changed

2 files changed

+40
-33
lines changed

roborock/devices/mqtt_channel.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,9 @@ async def _resolve_future_with_lock(self, message: RoborockMessage) -> None:
7676
return
7777
async with self._queue_lock:
7878
if (future := self._waiting_queue.pop(request_id, None)) is not None:
79-
if not future.done():
80-
future.set_result(message)
81-
else:
82-
_LOGGER.warning("Received message for completed future: request_id=%s", request_id)
79+
future.set_result(message)
8380
else:
84-
_LOGGER.warning("Received message with no waiting handler: request_id=%s", request_id)
81+
_LOGGER.debug("Received message with no waiting handler: request_id=%s", request_id)
8582

8683
async def send_command(self, message: RoborockMessage, timeout: float = 10.0) -> RoborockMessage:
8784
"""Send a command message and wait for the response message.
@@ -97,6 +94,8 @@ async def send_command(self, message: RoborockMessage, timeout: float = 10.0) ->
9794

9895
future: asyncio.Future[RoborockMessage] = asyncio.Future()
9996
async with self._queue_lock:
97+
if request_id in self._waiting_queue:
98+
raise RoborockException(f"Request ID {request_id} already pending, cannot send command")
10099
self._waiting_queue[request_id] = future
101100

102101
try:

tests/devices/test_mqtt_channel.py

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from roborock.devices.mqtt_channel import MqttChannel
1212
from roborock.exceptions import RoborockException
1313
from roborock.mqtt.session import MqttParams
14-
from roborock.protocol import Decoder, Encoder, create_mqtt_decoder, create_mqtt_encoder
14+
from roborock.protocol import create_mqtt_decoder, create_mqtt_encoder
1515
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
1616

1717
from .. import mock_data
@@ -144,7 +144,9 @@ async def test_send_command_success(
144144

145145

146146
async def test_send_command_without_request_id(
147-
mqtt_session: Mock, mqtt_channel: MqttChannel, mqtt_message_handler: Callable[[bytes], None],
147+
mqtt_session: Mock,
148+
mqtt_channel: MqttChannel,
149+
mqtt_message_handler: Callable[[bytes], None],
148150
) -> None:
149151
"""Test sending command without request ID raises exception."""
150152
# Create a message without request ID
@@ -157,22 +159,6 @@ async def test_send_command_without_request_id(
157159
await mqtt_channel.send_command(test_message)
158160

159161

160-
async def test_handle_messages_no_waiting_handler(
161-
mqtt_session: Mock,
162-
mqtt_channel: MqttChannel,
163-
mqtt_message_handler: Callable[[bytes], None],
164-
caplog: pytest.LogCaptureFixture,
165-
) -> None:
166-
"""Test handling messages when no handler is waiting."""
167-
# Simulate receiving the response message via MQTT
168-
mqtt_message_handler(ENCODER(TEST_REQUEST))
169-
await asyncio.sleep(0.01) # yield
170-
171-
assert len(caplog.records) == 1
172-
assert caplog.records[0].levelname == "WARNING"
173-
assert "Received message with no waiting handler: request_id=12345" in caplog.records[0].message
174-
175-
176162
async def test_concurrent_commands(
177163
mqtt_session: Mock,
178164
mqtt_channel: MqttChannel,
@@ -204,6 +190,31 @@ async def test_concurrent_commands(
204190
assert not caplog.records
205191

206192

193+
async def test_concurrent_commands_same_request_id(
194+
mqtt_session: Mock,
195+
mqtt_channel: MqttChannel,
196+
mqtt_message_handler: Callable[[bytes], None],
197+
) -> None:
198+
"""Test that we are not allowed to send two commands with the same request id."""
199+
200+
# Create multiple test messages with different request IDs
201+
# Start both commands concurrently
202+
task1 = asyncio.create_task(mqtt_channel.send_command(TEST_REQUEST, timeout=5.0))
203+
task2 = asyncio.create_task(mqtt_channel.send_command(TEST_REQUEST, timeout=5.0))
204+
await asyncio.sleep(0.01) # yield
205+
206+
# Create response
207+
mqtt_message_handler(ENCODER(TEST_RESPONSE))
208+
await asyncio.sleep(0.01) # yield
209+
210+
# Both should complete successfully
211+
result1 = await task1
212+
assert result1 == TEST_RESPONSE
213+
214+
with pytest.raises(RoborockException, match="Request ID 12345 already pending, cannot send command"):
215+
await task2
216+
217+
207218
async def test_handle_completed_future(
208219
mqtt_session: Mock,
209220
mqtt_channel: MqttChannel,
@@ -221,39 +232,36 @@ async def test_handle_completed_future(
221232
mqtt_message_handler(ENCODER(TEST_RESPONSE))
222233
await asyncio.sleep(0.01) # yield
223234

224-
# Task completes and second message is dropped with a warning
235+
# Task completes and second message is not associated with a waiting handler
225236
result = await task
226237
assert result == TEST_RESPONSE
227238

228-
assert len(caplog.records) == 1
229-
assert caplog.records[0].levelname == "WARNING"
230-
assert "Received message with no waiting handler: request_id=12345" in caplog.records[0].message
231-
232239

233240
async def test_subscribe_callback_with_rpc_response(
234241
mqtt_session: Mock,
235242
mqtt_channel: MqttChannel,
236243
received_messages: list[RoborockMessage],
237244
mqtt_message_handler: Callable[[bytes], None],
238245
) -> None:
239-
"""Test that subscribe callback is called along with RPC handling."""
246+
"""Test that subscribe callback is called independent of RPC handling."""
240247
# Send request
241248
task = asyncio.create_task(mqtt_channel.send_command(TEST_REQUEST, timeout=5.0))
242249
await asyncio.sleep(0.01) # yield
243250

244251
assert not received_messages
245252

246-
# Send the response
253+
# Send the response for this command and an unrelated command
247254
mqtt_message_handler(ENCODER(TEST_RESPONSE))
248255
await asyncio.sleep(0.01) # yield
256+
mqtt_message_handler(ENCODER(TEST_RESPONSE2))
257+
await asyncio.sleep(0.01) # yield
249258

250-
# Task completes and second message is dropped with a warning
259+
# Task completes
251260
result = await task
252261
assert result == TEST_RESPONSE
253262

254263
# The subscribe callback should have been called with the same response
255-
assert len(received_messages) == 1
256-
assert received_messages[0] == TEST_RESPONSE
264+
assert received_messages == [TEST_RESPONSE, TEST_RESPONSE2]
257265

258266

259267
async def test_message_decode_error(

0 commit comments

Comments
 (0)