Skip to content

Commit 5f38a99

Browse files
committed
fix: handle ClosedResourceError when transport closes mid-request
Backports fixes from #2306 to v1.x to address issue #2328. When the transport closes while handlers are processing requests (e.g., stdin EOF during a long-running tool call), the server could crash with ClosedResourceError when trying to send a response through the already-closed write stream. This fix: 1. Wraps the message loop in a try/finally that cancels in-flight handlers when the transport closes, preventing them from attempting to respond 2. Catches BrokenResourceError and ClosedResourceError when calling message.respond() and logs instead of crashing 3. Properly re-raises transport-close cancellation to let the task group handle it (vs client-initiated cancellation which already sent a response) 4. Uses list() snapshot when iterating _response_streams in the finally block to avoid 'dictionary changed size during iteration' errors Fixes #2328
1 parent 2e9897e commit 5f38a99

File tree

3 files changed

+221
-25
lines changed

3 files changed

+221
-25
lines changed

src/mcp/server/lowlevel/server.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -671,16 +671,23 @@ async def run(
671671
await stack.enter_async_context(task_support.run())
672672

673673
async with anyio.create_task_group() as tg:
674-
async for message in session.incoming_messages:
675-
logger.debug("Received message: %s", message)
676-
677-
tg.start_soon(
678-
self._handle_message,
679-
message,
680-
session,
681-
lifespan_context,
682-
raise_exceptions,
683-
)
674+
try:
675+
async for message in session.incoming_messages:
676+
logger.debug("Received message: %s", message)
677+
678+
tg.start_soon(
679+
self._handle_message,
680+
message,
681+
session,
682+
lifespan_context,
683+
raise_exceptions,
684+
)
685+
finally:
686+
# Transport closed: cancel in-flight handlers. Without this the
687+
# TG join waits for them, and when they eventually try to
688+
# respond they hit a closed write stream (the session's
689+
# _receive_loop closed it when the read stream ended).
690+
tg.cancel_scope.cancel()
684691

685692
async def _handle_message(
686693
self,
@@ -763,12 +770,18 @@ async def _handle_request(
763770
response = await handler(req)
764771
except McpError as err: # pragma: no cover
765772
response = err.error
766-
except anyio.get_cancelled_exc_class(): # pragma: no cover
767-
logger.info(
768-
"Request %s cancelled - duplicate response suppressed",
769-
message.request_id,
770-
)
771-
return
773+
except anyio.get_cancelled_exc_class():
774+
if message.cancelled:
775+
# Client sent CancelledNotification; responder.cancel() already
776+
# sent an error response, so skip the duplicate.
777+
logger.info(
778+
"Request %s cancelled - duplicate response suppressed",
779+
message.request_id,
780+
)
781+
return
782+
# Transport-close cancellation from the TG in run(); re-raise so the
783+
# TG swallows its own cancellation.
784+
raise
772785
except Exception as err: # pragma: no cover
773786
if raise_exceptions:
774787
raise err
@@ -777,16 +790,24 @@ async def _handle_request(
777790
# Reset the global state after we are done
778791
if token is not None: # pragma: no branch
779792
request_ctx.reset(token)
780-
781-
await message.respond(response)
782793
else: # pragma: no cover
783-
await message.respond(
784-
types.ErrorData(
785-
code=types.METHOD_NOT_FOUND,
786-
message="Method not found",
787-
)
794+
response = types.ErrorData(
795+
code=types.METHOD_NOT_FOUND,
796+
message="Method not found",
788797
)
789798

799+
try:
800+
await message.respond(response)
801+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
802+
# Transport closed between handler unblocking and respond. Happens
803+
# when _receive_loop's finally wakes a handler blocked on
804+
# send_request: the handler runs to respond() before run()'s TG
805+
# cancel fires, but after the write stream closed. Closed if our
806+
# end closed (_receive_loop's async-with exit); Broken if the peer
807+
# end closed first (streamable_http terminate()).
808+
logger.debug("Response for %s dropped - transport closed", message.request_id)
809+
return
810+
790811
logger.debug("Response sent")
791812

792813
async def _handle_notification(self, notify: Any):

src/mcp/shared/session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def __exit__(
108108
) -> None:
109109
"""Exit the context manager, performing cleanup and notifying completion."""
110110
try:
111-
if self._completed: # pragma: no branch
111+
if self._completed:
112112
self._on_complete(self)
113113
finally:
114114
self._entered = False
@@ -445,7 +445,9 @@ async def _receive_loop(self) -> None:
445445
finally:
446446
# after the read stream is closed, we need to send errors
447447
# to any pending requests
448-
for id, stream in self._response_streams.items():
448+
# Snapshot: stream.send() wakes the waiter, whose finally pops
449+
# from _response_streams before the next __next__() call.
450+
for id, stream in list(self._response_streams.items()):
449451
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
450452
try:
451453
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))

tests/server/test_cancel_handling.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,176 @@ async def first_request():
108108
assert isinstance(content, types.TextContent)
109109
assert content.text == "Call number: 2"
110110
assert call_count == 2
111+
112+
113+
@pytest.mark.anyio
114+
async def test_server_cancels_in_flight_handlers_on_transport_close():
115+
"""When the transport closes mid-request, server.run() must cancel in-flight
116+
handlers rather than join on them.
117+
118+
Without the cancel, the task group waits for the handler, which then tries
119+
to respond through a write stream that _receive_loop already closed,
120+
raising ClosedResourceError and crashing server.run() with exit code 1.
121+
122+
This drives server.run() with raw memory streams because InMemoryTransport
123+
wraps it in its own finally-cancel (_memory.py) which masks the bug.
124+
"""
125+
from mcp.shared.message import SessionMessage
126+
from mcp.types import (
127+
LATEST_PROTOCOL_VERSION,
128+
ClientCapabilities,
129+
Implementation,
130+
InitializeRequestParams,
131+
JSONRPCNotification,
132+
JSONRPCRequest,
133+
)
134+
135+
handler_started = anyio.Event()
136+
handler_cancelled = anyio.Event()
137+
server_run_returned = anyio.Event()
138+
139+
server = Server("test")
140+
141+
@server.call_tool()
142+
async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]:
143+
handler_started.set()
144+
try:
145+
await anyio.sleep_forever()
146+
finally:
147+
handler_cancelled.set()
148+
# unreachable: sleep_forever only exits via cancellation
149+
raise AssertionError # pragma: no cover
150+
151+
to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
152+
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)
153+
154+
async def run_server():
155+
await server.run(server_read, server_write, server.create_initialization_options())
156+
server_run_returned.set()
157+
158+
init_req = JSONRPCRequest(
159+
jsonrpc="2.0",
160+
id=1,
161+
method="initialize",
162+
params=InitializeRequestParams(
163+
protocolVersion=LATEST_PROTOCOL_VERSION,
164+
capabilities=ClientCapabilities(),
165+
clientInfo=Implementation(name="test", version="1.0"),
166+
).model_dump(by_alias=True, mode="json", exclude_none=True),
167+
)
168+
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
169+
call_req = JSONRPCRequest(
170+
jsonrpc="2.0",
171+
id=2,
172+
method="tools/call",
173+
params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"),
174+
)
175+
176+
with anyio.fail_after(5):
177+
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
178+
tg.start_soon(run_server)
179+
180+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req)))
181+
await from_server.receive() # init response
182+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized)))
183+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req)))
184+
185+
await handler_started.wait()
186+
187+
# Close the server's input stream — this is what stdin EOF does.
188+
# server.run()'s incoming_messages loop ends, finally-cancel fires,
189+
# handler gets CancelledError, server.run() returns.
190+
await to_server.aclose()
191+
192+
await server_run_returned.wait()
193+
194+
assert handler_cancelled.is_set()
195+
196+
197+
@pytest.mark.anyio
198+
async def test_server_handles_transport_close_with_pending_server_to_client_requests():
199+
"""When the transport closes while handlers are blocked on server→client
200+
requests (sampling, roots, elicitation), server.run() must still exit cleanly.
201+
202+
Two bugs covered:
203+
1. _receive_loop's finally iterates _response_streams with await checkpoints
204+
inside; the woken handler's send_request finally pops from that dict
205+
before the next __next__() — RuntimeError: dictionary changed size.
206+
2. The woken handler's MCPError is caught in _handle_request, which falls
207+
through to respond() against a write stream _receive_loop already closed.
208+
"""
209+
from mcp.shared.message import SessionMessage
210+
from mcp.types import (
211+
LATEST_PROTOCOL_VERSION,
212+
ClientCapabilities,
213+
Implementation,
214+
InitializeRequestParams,
215+
JSONRPCNotification,
216+
JSONRPCRequest,
217+
)
218+
219+
handlers_started = 0
220+
both_started = anyio.Event()
221+
server_run_returned = anyio.Event()
222+
223+
server = Server("test")
224+
225+
@server.call_tool()
226+
async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]:
227+
nonlocal handlers_started
228+
handlers_started += 1
229+
if handlers_started == 2:
230+
both_started.set()
231+
# Blocks on send_request waiting for a client response that never comes.
232+
# _receive_loop's finally will wake this with CONNECTION_CLOSED.
233+
await server.request_context.session.list_roots()
234+
raise AssertionError # pragma: no cover
235+
236+
to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
237+
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)
238+
239+
async def run_server():
240+
await server.run(server_read, server_write, server.create_initialization_options())
241+
server_run_returned.set()
242+
243+
init_req = JSONRPCRequest(
244+
jsonrpc="2.0",
245+
id=1,
246+
method="initialize",
247+
params=InitializeRequestParams(
248+
protocolVersion=LATEST_PROTOCOL_VERSION,
249+
capabilities=ClientCapabilities(),
250+
clientInfo=Implementation(name="test", version="1.0"),
251+
).model_dump(by_alias=True, mode="json", exclude_none=True),
252+
)
253+
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
254+
255+
with anyio.fail_after(5):
256+
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
257+
tg.start_soon(run_server)
258+
259+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req)))
260+
await from_server.receive() # init response
261+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized)))
262+
263+
# Two tool calls → two handlers → two _response_streams entries.
264+
for rid in (2, 3):
265+
call_req = JSONRPCRequest(
266+
jsonrpc="2.0",
267+
id=rid,
268+
method="tools/call",
269+
params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"),
270+
)
271+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req)))
272+
273+
await both_started.wait()
274+
# Drain the two roots/list requests so send_request's _write_stream.send()
275+
# completes and both handlers are parked at response_stream_reader.receive().
276+
await from_server.receive()
277+
await from_server.receive()
278+
279+
await to_server.aclose()
280+
281+
# Without the fixes: RuntimeError (dict mutation) or ClosedResourceError
282+
# (respond after write-stream close) escapes run_server and this hangs.
283+
await server_run_returned.wait()

0 commit comments

Comments
 (0)