Skip to content

Commit 906bcfc

Browse files
committed
fix: merge stream and task-group contexts to satisfy 3.11 branch coverage
Python 3.11's bytecode for nested async with blocks produces extra branch arcs that coverage.py tracks but the test suite doesn't exercise. Merging the stream context-managers with the task group into a single async with restores the same nesting depth as main, so branch count stays at 4 instead of 6. Teardown order is also slightly better this way: tg.__aexit__ runs first (waits for cancelled tasks), then stream ends close in reverse order — tasks are fully done before streams close.
1 parent 1e83583 commit 906bcfc

File tree

1 file changed

+36
-32
lines changed

1 file changed

+36
-32
lines changed

src/mcp/client/websocket.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,39 +43,43 @@ async def websocket_client(
4343
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
4444
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
4545

46-
async with read_stream_writer, read_stream, write_stream, write_stream_reader:
46+
async def ws_reader():
47+
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
48+
and sends them into read_stream_writer.
49+
"""
50+
async with read_stream_writer:
51+
async for raw_text in ws:
52+
try:
53+
message = types.jsonrpc_message_adapter.validate_json(raw_text, by_name=False)
54+
session_message = SessionMessage(message)
55+
await read_stream_writer.send(session_message)
56+
except ValidationError as exc: # pragma: no cover
57+
# If JSON parse or model validation fails, send the exception
58+
await read_stream_writer.send(exc)
4759

48-
async def ws_reader():
49-
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
50-
and sends them into read_stream_writer.
51-
"""
52-
async with read_stream_writer:
53-
async for raw_text in ws:
54-
try:
55-
message = types.jsonrpc_message_adapter.validate_json(raw_text, by_name=False)
56-
session_message = SessionMessage(message)
57-
await read_stream_writer.send(session_message)
58-
except ValidationError as exc: # pragma: no cover
59-
# If JSON parse or model validation fails, send the exception
60-
await read_stream_writer.send(exc)
60+
async def ws_writer():
61+
"""Reads JSON-RPC messages from write_stream_reader and
62+
sends them to the server.
63+
"""
64+
async with write_stream_reader:
65+
async for session_message in write_stream_reader:
66+
# Convert to a dict, then to JSON
67+
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
68+
await ws.send(json.dumps(msg_dict))
6169

62-
async def ws_writer():
63-
"""Reads JSON-RPC messages from write_stream_reader and
64-
sends them to the server.
65-
"""
66-
async with write_stream_reader:
67-
async for session_message in write_stream_reader:
68-
# Convert to a dict, then to JSON
69-
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
70-
await ws.send(json.dumps(msg_dict))
70+
async with (
71+
read_stream_writer,
72+
read_stream,
73+
write_stream,
74+
write_stream_reader,
75+
anyio.create_task_group() as tg,
76+
):
77+
# Start reader and writer tasks
78+
tg.start_soon(ws_reader)
79+
tg.start_soon(ws_writer)
7180

72-
async with anyio.create_task_group() as tg:
73-
# Start reader and writer tasks
74-
tg.start_soon(ws_reader)
75-
tg.start_soon(ws_writer)
81+
# Yield the receive/send streams
82+
yield (read_stream, write_stream)
7683

77-
# Yield the receive/send streams
78-
yield (read_stream, write_stream)
79-
80-
# Once the caller's 'async with' block exits, we shut down
81-
tg.cancel_scope.cancel()
84+
# Once the caller's 'async with' block exits, we shut down
85+
tg.cancel_scope.cancel()

0 commit comments

Comments
 (0)