Skip to content

Commit 6a58bc5

Browse files
committed
fix(client): propagate transport exceptions in default message handler
_default_message_handler called anyio.checkpoint() unconditionally, silently swallowing any Exception passed to it. When a transport error (e.g. httpx.ReadTimeout) was delivered through the read stream, the async-for in _receive_loop called `continue` and waited for the next message that never came — hanging all pending requests indefinitely. Fix: check isinstance(message, Exception) and re-raise so the exception exits the async-for, hits _receive_loop's except-handler (which logs it), and falls into the finally block that closes all pending response streams with CONNECTION_CLOSED — unblocking any in-flight call_tool or other send_request callers. Custom message_handler implementations that need to suppress or transform transport exceptions can still do so by not re-raising. Fixes #1401. Co-authored-by: Sean Campbell <rudi193@gmail.com>
1 parent 3d7b311 commit 6a58bc5

2 files changed

Lines changed: 140 additions & 0 deletions

File tree

src/mcp/client/session.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ async def __call__(
5757
async def _default_message_handler(
5858
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
5959
) -> None:
60+
if isinstance(message, Exception):
61+
raise message
6062
await anyio.lowlevel.checkpoint()
6163

6264

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Tests for issue #1401: ClientSession should propagate transport exceptions.
2+
3+
Root cause: _default_message_handler called anyio.checkpoint() unconditionally,
4+
silently dropping exceptions. The async-for loop in _receive_loop then called
5+
`continue`, waiting for the next message that never came — hanging all pending
6+
requests indefinitely.
7+
8+
Fix: raise when message is an Exception so _receive_loop's except-handler runs,
9+
triggering the finally block that closes pending response streams with
10+
CONNECTION_CLOSED.
11+
"""
12+
13+
import anyio
14+
import pytest
15+
16+
from mcp import types
17+
from mcp.client.session import ClientSession, _default_message_handler
18+
from mcp.server import Server, ServerRequestContext
19+
from mcp.shared.exceptions import MCPError
20+
from mcp.shared.message import SessionMessage
21+
from mcp.types import CallToolRequestParams, CallToolResult, TextContent
22+
23+
24+
@pytest.mark.anyio
25+
async def test_default_message_handler_raises_on_exception():
26+
"""_default_message_handler must re-raise Exception instances."""
27+
err = RuntimeError("transport failure")
28+
with pytest.raises(RuntimeError, match="transport failure"):
29+
await _default_message_handler(err)
30+
31+
32+
@pytest.mark.anyio
33+
async def test_default_message_handler_checkpoints_on_notification():
34+
"""_default_message_handler should checkpoint (not raise) for non-exception messages."""
35+
notification = types.ToolListChangedNotification(
36+
method="notifications/tools/list_changed"
37+
)
38+
# Should complete without raising
39+
await _default_message_handler(notification)
40+
41+
42+
@pytest.mark.anyio
43+
async def test_transport_exception_unblocks_pending_request():
44+
"""A transport exception must unblock pending requests instead of hanging them.
45+
46+
Before the fix: exception was swallowed by checkpoint(); async-for looped
47+
back waiting for the next message; pending call_tool hung indefinitely.
48+
49+
After the fix: exception propagates out of the async-for, _receive_loop's
50+
finally block closes all pending response streams with CONNECTION_CLOSED,
51+
and call_tool raises MCPError rather than hanging.
52+
"""
53+
slow_tool_started = anyio.Event()
54+
55+
async def handle_list_tools(
56+
ctx: ServerRequestContext, params: types.PaginatedRequestParams | None
57+
) -> types.ListToolsResult:
58+
return types.ListToolsResult(
59+
tools=[types.Tool(name="slow", description="hangs", input_schema={"type": "object"})]
60+
)
61+
62+
async def handle_call_tool(
63+
ctx: ServerRequestContext, params: CallToolRequestParams
64+
) -> CallToolResult:
65+
slow_tool_started.set()
66+
await anyio.sleep(60) # hangs until cancelled
67+
return CallToolResult(content=[TextContent(type="text", text="never")]) # pragma: no cover
68+
69+
server = Server(
70+
name="test",
71+
on_list_tools=handle_list_tools,
72+
on_call_tool=handle_call_tool,
73+
)
74+
75+
server_writer, server_reader = anyio.create_memory_object_stream[SessionMessage](4)
76+
client_writer, client_reader = anyio.create_memory_object_stream[SessionMessage | Exception](4)
77+
78+
call_tool_error: Exception | None = None
79+
server_scope: anyio.CancelScope | None = None
80+
81+
async def run_server(*, task_status: anyio.abc.TaskStatus) -> None:
82+
with anyio.CancelScope() as scope:
83+
task_status.started(scope)
84+
await server.run(server_reader, client_writer, server.create_initialization_options())
85+
86+
async def run_client() -> None:
87+
nonlocal call_tool_error
88+
async with ClientSession(client_reader, server_writer) as session: # type: ignore[arg-type]
89+
await session.initialize()
90+
91+
async def inject() -> None:
92+
await slow_tool_started.wait()
93+
# Inject a transport exception — simulates e.g. httpx.ReadTimeout
94+
await client_writer.send(RuntimeError("sse read timeout"))
95+
96+
async with anyio.create_task_group() as tg:
97+
tg.start_soon(inject)
98+
try:
99+
await session.call_tool("slow")
100+
except (MCPError, RuntimeError) as e:
101+
call_tool_error = e
102+
tg.cancel_scope.cancel()
103+
104+
assert server_scope is not None
105+
server_scope.cancel()
106+
107+
async with anyio.create_task_group() as tg:
108+
server_scope = await tg.start(run_server)
109+
tg.start_soon(run_client)
110+
111+
assert call_tool_error is not None, "call_tool should have raised, not hung"
112+
113+
114+
@pytest.mark.anyio
115+
async def test_custom_message_handler_receives_exception():
116+
"""A custom message_handler can intercept transport exceptions without re-raising."""
117+
received: list[Exception] = []
118+
119+
async def capturing_handler(message: object) -> None:
120+
if isinstance(message, Exception):
121+
received.append(message) # capture — do not re-raise
122+
123+
server_writer, server_reader = anyio.create_memory_object_stream[SessionMessage](4)
124+
client_writer, client_reader = anyio.create_memory_object_stream[SessionMessage | Exception](4)
125+
126+
async with server_reader, server_writer:
127+
async with ClientSession(
128+
client_reader, # type: ignore[arg-type]
129+
server_writer.clone(),
130+
message_handler=capturing_handler,
131+
):
132+
await client_writer.send(ValueError("custom handler test"))
133+
await client_writer.aclose()
134+
await anyio.sleep(0.05)
135+
136+
assert len(received) == 1
137+
assert isinstance(received[0], ValueError)
138+
assert str(received[0]) == "custom handler test"

0 commit comments

Comments
 (0)