Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/mcp/client/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async def stdout_reader():

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: lax no cover
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover
await anyio.lowlevel.checkpoint()

async def stdin_writer():
Expand All @@ -174,7 +174,7 @@ async def stdin_writer():
errors=server.encoding_error_handler,
)
)
except anyio.ClosedResourceError: # pragma: no cover
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
await anyio.lowlevel.checkpoint()

async with anyio.create_task_group() as tg, process:
Expand Down Expand Up @@ -205,6 +205,7 @@ async def stdin_writer():
except ProcessLookupError: # pragma: no cover
# Process already exited, which is fine
pass

await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
Expand Down
4 changes: 2 additions & 2 deletions src/mcp/server/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def stdin_reader():

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
await anyio.lowlevel.checkpoint()

async def stdout_writer():
Expand All @@ -74,7 +74,7 @@ async def stdout_writer():
json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True)
await stdout.write(json + "\n")
await stdout.flush()
except anyio.ClosedResourceError: # pragma: no cover
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
await anyio.lowlevel.checkpoint()

async with anyio.create_task_group() as tg:
Expand Down
47 changes: 47 additions & 0 deletions tests/client/test_stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,50 @@ def sigterm_handler(signum, frame):
f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. "
f"Expected between 2-4 seconds (2s stdin timeout + termination time)."
)


@pytest.mark.anyio
async def test_stdio_client_quick_exit_race_condition():
"""Test that stdio_client handles quick context exits without crashing.

This reproduces the race condition where:
1. Subprocess is spawned and starts outputting data
2. User code exits the context quickly (e.g., timeout, error, disconnect)
3. Cleanup code closes streams while background tasks are still using them
4. Background tasks should handle closed streams gracefully (no BrokenResourceError)

The fix ensures:
- Tasks are cancelled before streams are closed
- Tasks handle BrokenResourceError gracefully as a fallback
"""

# Create a Python script that continuously outputs data
# This simulates a subprocess that's slow to shut down
continuous_output_script = textwrap.dedent(
"""
import sys
import time

# Continuously output to keep stdout_reader busy
for i in range(100):
print(f'{{"jsonrpc":"2.0","id":{i},"result":{{}}}}')
sys.stdout.flush()
time.sleep(0.01)
"""
)

server_params = StdioServerParameters(
command=sys.executable,
args=["-c", continuous_output_script],
)

# This should not raise an ExceptionGroup or BrokenResourceError
# The background tasks should handle stream closure gracefully
async with stdio_client(server_params) as (_, _):
# Immediately exit - triggers cleanup while subprocess is still outputting
pass

# If we get here without exception, the race condition is handled correctly
# The tasks either:
# 1. Were cancelled before stream closure (proper fix)
# 2. Handled BrokenResourceError gracefully (defense in depth)