Skip to content

Commit a2638cd

Browse files
committed
fix: Handle BrokenResourceError on Windows Python 3.13
- Improve stream cleanup order in Windows stdio client - Add comprehensive exception handling for resource errors - Handle process termination edge cases better - Prevent race conditions in async stream cleanup
1 parent 7553bba commit a2638cd

File tree

2 files changed

+100
-30
lines changed

2 files changed

+100
-30
lines changed

src/mcp/client/stdio/__init__.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder
115115
process = await _create_platform_compatible_process(
116116
command=command,
117117
args=server.args,
118-
env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()),
118+
env=(
119+
{**get_default_environment(), **server.env}
120+
if server.env is not None
121+
else get_default_environment()
122+
),
119123
errlog=errlog,
120124
cwd=server.cwd,
121125
)
@@ -150,7 +154,7 @@ async def stdout_reader():
150154

151155
session_message = SessionMessage(message)
152156
await read_stream_writer.send(session_message)
153-
except anyio.ClosedResourceError:
157+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
154158
await anyio.lowlevel.checkpoint()
155159

156160
async def stdin_writer():
@@ -159,14 +163,16 @@ async def stdin_writer():
159163
try:
160164
async with write_stream_reader:
161165
async for session_message in write_stream_reader:
162-
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
166+
json = session_message.message.model_dump_json(
167+
by_alias=True, exclude_none=True
168+
)
163169
await process.stdin.send(
164170
(json + "\n").encode(
165171
encoding=server.encoding,
166172
errors=server.encoding_error_handler,
167173
)
168174
)
169-
except anyio.ClosedResourceError:
175+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
170176
await anyio.lowlevel.checkpoint()
171177

172178
async with (
@@ -184,13 +190,30 @@ async def stdin_writer():
184190
await terminate_windows_process(process)
185191
else:
186192
process.terminate()
187-
except ProcessLookupError:
188-
# Process already exited, which is fine
193+
except (ProcessLookupError, OSError, anyio.BrokenResourceError):
194+
# Process already exited or couldn't be terminated, which is fine
195+
pass
196+
197+
# Close streams in proper order to avoid BrokenResourceError
198+
try:
199+
await read_stream.aclose()
200+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
201+
pass
202+
203+
try:
204+
await write_stream.aclose()
205+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
206+
pass
207+
208+
try:
209+
await read_stream_writer.aclose()
210+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
211+
pass
212+
213+
try:
214+
await write_stream_reader.aclose()
215+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
189216
pass
190-
await read_stream.aclose()
191-
await write_stream.aclose()
192-
await read_stream_writer.aclose()
193-
await write_stream_reader.aclose()
194217

195218

196219
def _get_executable_command(command: str) -> str:
@@ -223,6 +246,8 @@ async def _create_platform_compatible_process(
223246
if sys.platform == "win32":
224247
process = await create_windows_process(command, args, env, errlog, cwd)
225248
else:
226-
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
249+
process = await anyio.open_process(
250+
[command, *args], env=env, stderr=errlog, cwd=cwd
251+
)
227252

228253
return process

src/mcp/client/stdio/win32.py

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,12 @@ def __init__(self, popen_obj: subprocess.Popen[bytes]):
6262
self.stdout_raw = popen_obj.stdout # type: ignore[assignment]
6363
self.stderr = popen_obj.stderr # type: ignore[assignment]
6464

65-
self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
66-
self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
65+
self.stdin = (
66+
FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
67+
)
68+
self.stdout = (
69+
FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
70+
)
6771

6872
async def __aenter__(self):
6973
"""Support async context manager entry."""
@@ -76,20 +80,50 @@ async def __aexit__(
7680
exc_tb: object | None,
7781
) -> None:
7882
"""Terminate and wait on process exit inside a thread."""
79-
self.popen.terminate()
80-
await to_thread.run_sync(self.popen.wait)
83+
try:
84+
self.popen.terminate()
85+
await to_thread.run_sync(self.popen.wait)
86+
except (ProcessLookupError, OSError):
87+
# Process already exited or couldn't be terminated, which is fine
88+
pass
8189

8290
# Close the file handles to prevent ResourceWarning
83-
if self.stdin:
84-
await self.stdin.aclose()
85-
if self.stdout:
86-
await self.stdout.aclose()
87-
if self.stdin_raw:
88-
self.stdin_raw.close()
89-
if self.stdout_raw:
90-
self.stdout_raw.close()
91-
if self.stderr:
92-
self.stderr.close()
91+
# Close in reverse order of creation to avoid BrokenResourceError
92+
try:
93+
if self.stderr:
94+
self.stderr.close()
95+
except (OSError, ValueError):
96+
# Stream already closed or invalid, ignore
97+
pass
98+
99+
try:
100+
if self.stdout_raw:
101+
self.stdout_raw.close()
102+
except (OSError, ValueError):
103+
# Stream already closed or invalid, ignore
104+
pass
105+
106+
try:
107+
if self.stdin_raw:
108+
self.stdin_raw.close()
109+
except (OSError, ValueError):
110+
# Stream already closed or invalid, ignore
111+
pass
112+
113+
# Close async stream wrappers
114+
try:
115+
if self.stdout:
116+
await self.stdout.aclose()
117+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
118+
# Stream already closed, ignore
119+
pass
120+
121+
try:
122+
if self.stdin:
123+
await self.stdin.aclose()
124+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
125+
# Stream already closed, ignore
126+
pass
93127

94128
async def wait(self):
95129
"""Async wait for process completion."""
@@ -175,8 +209,19 @@ async def terminate_windows_process(process: Process | FallbackProcess):
175209
"""
176210
try:
177211
process.terminate()
178-
with anyio.fail_after(2.0):
179-
await process.wait()
180-
except TimeoutError:
181-
# Force kill if it doesn't terminate
182-
process.kill()
212+
try:
213+
with anyio.fail_after(2.0):
214+
await process.wait()
215+
except TimeoutError:
216+
# Force kill if it doesn't terminate
217+
try:
218+
process.kill()
219+
# Give it a moment to actually terminate after kill
220+
with anyio.fail_after(1.0):
221+
await process.wait()
222+
except (TimeoutError, ProcessLookupError, OSError):
223+
# Process is really stubborn or already gone, just continue
224+
pass
225+
except (ProcessLookupError, OSError, anyio.BrokenResourceError):
226+
# Process already exited or couldn't be terminated, which is fine
227+
pass

0 commit comments

Comments
 (0)