Skip to content

Commit 3343410

Browse files
committed
fix: Comprehensive Windows resource cleanup for ALL client transports
- Fix ClosedResourceError/BrokenResourceError in streamable HTTP client - Improve stream cleanup order and exception handling in SSE client - Add robust resource cleanup to WebSocket client - Prevent resource leaks and race conditions on Windows - Handle all anyio stream exceptions gracefully across all transports This resolves Windows-specific test failures in Python 3.12/3.13 by ensuring proper async resource cleanup in all MCP client transports.
1 parent 17a9867 commit 3343410

File tree

3 files changed

+133
-15
lines changed

3 files changed

+133
-15
lines changed

src/mcp/client/sse.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ async def sse_client(
5555
try:
5656
logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
5757
async with httpx_client_factory(
58-
headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
58+
headers=headers,
59+
auth=auth,
60+
timeout=httpx.Timeout(timeout, read=sse_read_timeout),
5961
) as client:
6062
async with aconnect_sse(
6163
client,
@@ -109,7 +111,16 @@ async def sse_reader(
109111
logger.error(f"Error in sse_reader: {exc}")
110112
await read_stream_writer.send(exc)
111113
finally:
112-
await read_stream_writer.aclose()
114+
try:
115+
await read_stream_writer.aclose()
116+
except (
117+
anyio.ClosedResourceError,
118+
anyio.BrokenResourceError,
119+
):
120+
# Stream already closed, ignore
121+
pass
122+
except Exception as exc:
123+
logger.debug(f"Error closing read_stream_writer in sse_reader: {exc}")
113124

114125
async def post_writer(endpoint_url: str):
115126
try:
@@ -129,7 +140,16 @@ async def post_writer(endpoint_url: str):
129140
except Exception as exc:
130141
logger.error(f"Error in post_writer: {exc}")
131142
finally:
132-
await write_stream.aclose()
143+
try:
144+
await write_stream.aclose()
145+
except (
146+
anyio.ClosedResourceError,
147+
anyio.BrokenResourceError,
148+
):
149+
# Stream already closed, ignore
150+
pass
151+
except Exception as exc:
152+
logger.debug(f"Error closing write_stream in post_writer: {exc}")
133153

134154
endpoint_url = await tg.start(sse_reader)
135155
logger.debug(f"Starting post writer with endpoint URL: {endpoint_url}")
@@ -140,5 +160,35 @@ async def post_writer(endpoint_url: str):
140160
finally:
141161
tg.cancel_scope.cancel()
142162
finally:
143-
await read_stream_writer.aclose()
144-
await write_stream.aclose()
163+
# Improved stream cleanup with comprehensive exception handling
164+
try:
165+
await read_stream_writer.aclose()
166+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
167+
# Stream already closed, ignore
168+
pass
169+
except Exception as exc:
170+
logger.debug(f"Error closing read_stream_writer in SSE cleanup: {exc}")
171+
172+
try:
173+
await write_stream.aclose()
174+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
175+
# Stream already closed, ignore
176+
pass
177+
except Exception as exc:
178+
logger.debug(f"Error closing write_stream in SSE cleanup: {exc}")
179+
180+
try:
181+
await read_stream.aclose()
182+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
183+
# Stream already closed, ignore
184+
pass
185+
except Exception as exc:
186+
logger.debug(f"Error closing read_stream in SSE cleanup: {exc}")
187+
188+
try:
189+
await write_stream_reader.aclose()
190+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
191+
# Stream already closed, ignore
192+
pass
193+
except Exception as exc:
194+
logger.debug(f"Error closing write_stream_reader in SSE cleanup: {exc}")

src/mcp/client/streamable_http.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,22 @@ async def handle_request_async():
413413
except Exception as exc:
414414
logger.error(f"Error in post_writer: {exc}")
415415
finally:
416-
await read_stream_writer.aclose()
417-
await write_stream.aclose()
416+
# Improved stream cleanup with comprehensive exception handling
417+
try:
418+
await read_stream_writer.aclose()
419+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
420+
# Stream already closed, ignore
421+
pass
422+
except Exception as exc:
423+
logger.debug(f"Error closing read_stream_writer in cleanup: {exc}")
424+
425+
try:
426+
await write_stream.aclose()
427+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
428+
# Stream already closed, ignore
429+
pass
430+
except Exception as exc:
431+
logger.debug(f"Error closing write_stream in cleanup: {exc}")
418432

419433
async def terminate_session(self, client: httpx.AsyncClient) -> None:
420434
"""Terminate the session by sending a DELETE request."""
@@ -502,8 +516,25 @@ def start_get_stream() -> None:
502516
)
503517
finally:
504518
if transport.session_id and terminate_on_close:
505-
await transport.terminate_session(client)
519+
try:
520+
await transport.terminate_session(client)
521+
except Exception as exc:
522+
logger.debug(f"Error terminating session: {exc}")
506523
tg.cancel_scope.cancel()
507524
finally:
508-
await read_stream_writer.aclose()
509-
await write_stream.aclose()
525+
# Improved stream cleanup with comprehensive exception handling
526+
try:
527+
await read_stream_writer.aclose()
528+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
529+
# Stream already closed, ignore
530+
pass
531+
except Exception as exc:
532+
logger.debug(f"Error closing read_stream_writer in cleanup: {exc}")
533+
534+
try:
535+
await write_stream.aclose()
536+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
537+
# Stream already closed, ignore
538+
pass
539+
except Exception as exc:
540+
logger.debug(f"Error closing write_stream in cleanup: {exc}")

src/mcp/client/websocket.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
async def websocket_client(
2020
url: str,
2121
) -> AsyncGenerator[
22-
tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]],
22+
tuple[
23+
MemoryObjectReceiveStream[SessionMessage | Exception],
24+
MemoryObjectSendStream[SessionMessage],
25+
],
2326
None,
2427
]:
2528
"""
@@ -79,8 +82,42 @@ async def ws_writer():
7982
tg.start_soon(ws_reader)
8083
tg.start_soon(ws_writer)
8184

82-
# Yield the receive/send streams
83-
yield (read_stream, write_stream)
85+
try:
86+
# Yield the receive/send streams
87+
yield (read_stream, write_stream)
88+
finally:
89+
# Once the caller's 'async with' block exits, we shut down
90+
tg.cancel_scope.cancel()
8491

85-
# Once the caller's 'async with' block exits, we shut down
86-
tg.cancel_scope.cancel()
92+
# Improved stream cleanup with comprehensive exception handling
93+
try:
94+
await read_stream.aclose()
95+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
96+
# Stream already closed, ignore
97+
pass
98+
except Exception as exc:
99+
logger.debug(f"Error closing read_stream in WebSocket cleanup: {exc}")
100+
101+
try:
102+
await write_stream.aclose()
103+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
104+
# Stream already closed, ignore
105+
pass
106+
except Exception as exc:
107+
logger.debug(f"Error closing write_stream in WebSocket cleanup: {exc}")
108+
109+
try:
110+
await read_stream_writer.aclose()
111+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
112+
# Stream already closed, ignore
113+
pass
114+
except Exception as exc:
115+
logger.debug(f"Error closing read_stream_writer in WebSocket cleanup: {exc}")
116+
117+
try:
118+
await write_stream_reader.aclose()
119+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
120+
# Stream already closed, ignore
121+
pass
122+
except Exception as exc:
123+
logger.debug(f"Error closing write_stream_reader in WebSocket cleanup: {exc}")

0 commit comments

Comments
 (0)