Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions src/mcp/server/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async def run_server():
```
"""

import os
import sys
from contextlib import asynccontextmanager
from io import TextIOWrapper
Expand All @@ -28,6 +29,26 @@ async def run_server():
from mcp import types
from mcp.shared.message import SessionMessage

# Chunk size for non-blocking stdout writes. Small enough to avoid
# filling the OS pipe buffer (64 KB on macOS) in a single syscall.
_WRITE_CHUNK = 4096


async def _write_nonblocking(fd: int, data: bytes) -> None:
"""Write *data* to a non-blocking fd, yielding on EAGAIN.

Writes in small chunks so the event loop stays responsive even when
the MCP client reads slowly and the pipe buffer fills up.
"""
mv = memoryview(data)
while mv:
try:
n = os.write(fd, mv[:_WRITE_CHUNK])
mv = mv[n:]
except BlockingIOError:
# Pipe full — yield to event loop and retry.
await anyio.sleep(0.005)


@asynccontextmanager
async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None):
Expand All @@ -40,7 +61,14 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.
# re-wrap the underlying binary stream to ensure UTF-8.
if not stdin:
stdin = anyio.wrap_file(TextIOWrapper(sys.stdin.buffer, encoding="utf-8"))
# For the default stdout (no custom override), use non-blocking I/O
# directly on the file descriptor to prevent the event loop from
# blocking when the OS pipe buffer is full (macOS: 64 KB).
stdout_fd: int | None = None
if not stdout:
stdout_fd = sys.stdout.buffer.fileno()
os.set_blocking(stdout_fd, False)
# Still create the wrapped stdout for the type signature / fallback
stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8"))

read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
Expand Down Expand Up @@ -71,9 +99,19 @@ async def stdout_writer():
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
await stdout.write(json + "\n")
await stdout.flush()
json_str = session_message.message.model_dump_json(
by_alias=True, exclude_none=True
)
if stdout_fd is not None:
# Non-blocking write directly to fd — never blocks
# the event loop, yields on pipe-full (EAGAIN).
await _write_nonblocking(
stdout_fd, (json_str + "\n").encode("utf-8")
)
else:
# Custom stdout provided — use original path.
await stdout.write(json_str + "\n")
await stdout.flush()
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()

Expand Down
Loading