Skip to content

Commit 69691b8

Browse files
committed
fix(client): propagate HTTP transport errors to caller (#2110)
Previously, `sse_client` and `streamable_http_client` post paths caught exceptions with a bare `except Exception` and only logged them. HTTP errors (401/403/404/5xx) and network errors silently disappeared, leaving the caller blocked indefinitely on `read_stream.receive()`. Changes: - Move the error handling inside `_send_message` (sse) and `handle_request_async` (streamable_http) so exceptions are caught before anyio's task group wraps them in a BaseExceptionGroup. Narrow the catch to `httpx.HTTPError` and forward the exception to `read_stream_writer` — the pattern already used by `stdio.py` and `websocket.py`. - In `_handle_post_request`, preserve the upstream HTTP status code in `ErrorData.data = {"http_status": N}` for both the 404 session- terminated branch and the generic >=400 branch, and include the status in `error.message` for the generic branch. The JSON-RPC code mapping is unchanged; callers that inspected `error.message` before still work. Tests: - Unit: sse and streamable_http post-error propagation via httpx.MockTransport, parametrized over 401/404/500 plus a network- error case. - Integration: real uvicorn subprocess serving a configurable failing POST endpoint; asserts caller receives the error within a bounded timeout (proving the no-hang behavior end-to-end) and that the HTTP status is preserved in `error.data`. - Negative control confirmed: all 10 test cases fail against the pre-fix code (they hit the fail_after deadlines) and pass with the fix applied. Coverage stays at 100.00%; `strict-no-cover` clean. Refs: #2110
1 parent 161834d commit 69691b8

4 files changed

Lines changed: 327 additions & 15 deletions

File tree

src/mcp/client/sse.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,20 @@ async def post_writer(endpoint_url: str):
120120

121121
async def _send_message(session_message: SessionMessage) -> None:
122122
logger.debug(f"Sending client message: {session_message}")
123-
response = await client.post(
124-
endpoint_url,
125-
json=session_message.message.model_dump(
126-
by_alias=True,
127-
mode="json",
128-
exclude_unset=True,
129-
),
130-
)
131-
response.raise_for_status()
123+
try:
124+
response = await client.post(
125+
endpoint_url,
126+
json=session_message.message.model_dump(
127+
by_alias=True,
128+
mode="json",
129+
exclude_unset=True,
130+
),
131+
)
132+
response.raise_for_status()
133+
except httpx.HTTPError as exc:
134+
logger.exception("Error sending client message")
135+
await read_stream_writer.send(exc)
136+
return
132137
logger.debug(f"Client message sent successfully: {response.status_code}")
133138

134139
async for session_message in write_stream_reader:

src/mcp/client/streamable_http.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,22 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
269269

270270
if response.status_code == 404: # pragma: no branch
271271
if isinstance(message, JSONRPCRequest): # pragma: no branch
272-
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
272+
error_data = ErrorData(
273+
code=INVALID_REQUEST,
274+
message="Session terminated",
275+
data={"http_status": response.status_code},
276+
)
273277
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
274278
await ctx.read_stream_writer.send(session_message)
275279
return
276280

277281
if response.status_code >= 400:
278282
if isinstance(message, JSONRPCRequest):
279-
error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response")
283+
error_data = ErrorData(
284+
code=INTERNAL_ERROR,
285+
message=f"Server returned an error response (HTTP {response.status_code})",
286+
data={"http_status": response.status_code},
287+
)
280288
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
281289
await ctx.read_stream_writer.send(session_message)
282290
return
@@ -468,10 +476,14 @@ async def _handle_message(session_message: SessionMessage) -> None:
468476
)
469477

470478
async def handle_request_async():
471-
if is_resumption:
472-
await self._handle_resumption_request(ctx)
473-
else:
474-
await self._handle_post_request(ctx)
479+
try:
480+
if is_resumption:
481+
await self._handle_resumption_request(ctx)
482+
else:
483+
await self._handle_post_request(ctx)
484+
except httpx.HTTPError as exc:
485+
logger.exception("Error sending client message")
486+
await read_stream_writer.send(exc)
475487

476488
# If this is a request, start a new task to handle it
477489
if isinstance(message, JSONRPCRequest):

tests/shared/test_sse.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from mcp.server.sse import SseServerTransport
2626
from mcp.server.transport_security import TransportSecuritySettings
2727
from mcp.shared.exceptions import MCPError
28+
from mcp.shared.message import SessionMessage
2829
from mcp.types import (
2930
CallToolRequestParams,
3031
CallToolResult,
@@ -629,3 +630,172 @@ async def test_sse_session_cleanup_on_disconnect(server: None, server_url: str)
629630
headers={"Content-Type": "application/json"},
630631
)
631632
assert response.status_code == 404
633+
634+
635+
class _LongLivedSSEStream(httpx.AsyncByteStream):
636+
"""A streaming SSE body that emits one endpoint event then stays open.
637+
638+
This mirrors a real SSE server: the stream does not end after the endpoint
639+
event, so the client's `sse_reader` keeps running while we exercise the
640+
POST path. The stream unblocks when the outer task group is cancelled.
641+
"""
642+
643+
def __init__(self, endpoint_path: str) -> None:
644+
self._endpoint_path = endpoint_path
645+
646+
async def __aiter__(self) -> AsyncGenerator[bytes, None]:
647+
yield f"event: endpoint\ndata: {self._endpoint_path}\n\n".encode()
648+
await anyio.sleep_forever()
649+
650+
651+
@pytest.mark.anyio
652+
@pytest.mark.parametrize("post_status", [401, 404, 500])
653+
async def test_sse_client_propagates_post_http_error_to_caller(post_status: int) -> None:
654+
"""Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110.
655+
656+
When the SSE POST endpoint returns a non-2xx HTTP status, the caller must
657+
receive the HTTPStatusError via the read stream instead of hanging on an
658+
indefinite wait for a response that will never arrive.
659+
"""
660+
661+
def handler(request: httpx.Request) -> httpx.Response:
662+
if request.method == "GET":
663+
return httpx.Response(
664+
200,
665+
headers={"content-type": "text/event-stream"},
666+
stream=_LongLivedSSEStream("/messages/?session_id=test"),
667+
)
668+
return httpx.Response(post_status)
669+
670+
def mock_factory(
671+
headers: dict[str, Any] | None = None,
672+
timeout: httpx.Timeout | None = None,
673+
auth: httpx.Auth | None = None,
674+
) -> httpx.AsyncClient:
675+
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
676+
677+
async with sse_client("http://test/sse", httpx_client_factory=mock_factory) as (read_stream, write_stream):
678+
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
679+
await write_stream.send(SessionMessage(request))
680+
681+
with anyio.fail_after(3):
682+
received = await read_stream.receive()
683+
assert isinstance(received, httpx.HTTPStatusError)
684+
assert received.response.status_code == post_status
685+
686+
687+
# ---- Integration tests against a real uvicorn MCP server (issue #2110) ----
688+
689+
690+
def make_failing_post_server_app(post_status: int) -> Starlette: # pragma: no cover
691+
"""Starlette app with a real SSE GET endpoint but a POST that always fails.
692+
693+
Used by the integration test for issue #2110 — the SSE GET handshake is
694+
the real `SseServerTransport`, so the client receives a genuine endpoint
695+
event; the POST route is replaced so every client message fails with the
696+
given status.
697+
"""
698+
security_settings = TransportSecuritySettings(
699+
allowed_hosts=["127.0.0.1:*", "localhost:*"], allowed_origins=["http://127.0.0.1:*", "http://localhost:*"]
700+
)
701+
sse = SseServerTransport("/messages/", security_settings=security_settings)
702+
server = _create_server()
703+
704+
async def handle_sse(request: Request) -> Response:
705+
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
706+
await server.run(streams[0], streams[1], server.create_initialization_options())
707+
return Response()
708+
709+
async def failing_post(request: Request) -> Response:
710+
return Response(status_code=post_status, content=f"deliberate {post_status} for #2110".encode())
711+
712+
return Starlette(
713+
routes=[
714+
Route("/sse", endpoint=handle_sse),
715+
Route("/messages/", endpoint=failing_post, methods=["POST"]),
716+
]
717+
)
718+
719+
720+
def run_failing_post_server(server_port: int, post_status: int) -> None: # pragma: no cover
721+
app = make_failing_post_server_app(post_status)
722+
uvicorn.Server(config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error")).run()
723+
724+
725+
@pytest.mark.anyio
726+
@pytest.mark.parametrize("post_status", [401, 404, 500])
727+
async def test_sse_client_real_server_surfaces_post_http_error(server_port: int, post_status: int) -> None:
728+
"""End-to-end integration test for issue #2110.
729+
730+
Against a real uvicorn-hosted MCP server whose POST endpoint returns
731+
401/404/500, the client must surface the error within a bounded timeout
732+
rather than hanging forever. Before the fix, this test hit `fail_after(5)`
733+
because `post_writer` swallowed the exception.
734+
"""
735+
proc = multiprocessing.Process(
736+
target=run_failing_post_server,
737+
kwargs={"server_port": server_port, "post_status": post_status},
738+
daemon=True,
739+
)
740+
proc.start()
741+
try:
742+
wait_for_server(server_port)
743+
server_url = f"http://127.0.0.1:{server_port}"
744+
with anyio.fail_after(5):
745+
async with sse_client(server_url + "/sse") as (read_stream, write_stream):
746+
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
747+
await write_stream.send(SessionMessage(request))
748+
received = await read_stream.receive()
749+
assert isinstance(received, httpx.HTTPStatusError)
750+
assert received.response.status_code == post_status
751+
finally:
752+
proc.kill()
753+
proc.join(timeout=2)
754+
755+
756+
def make_sse_only_server_app() -> Starlette: # pragma: no cover
757+
"""Starlette app with SSE GET but NO /messages/ POST route (all POSTs 404)."""
758+
security_settings = TransportSecuritySettings(
759+
allowed_hosts=["127.0.0.1:*", "localhost:*"],
760+
allowed_origins=["http://127.0.0.1:*", "http://localhost:*"],
761+
)
762+
sse = SseServerTransport("/messages/", security_settings=security_settings)
763+
server = _create_server()
764+
765+
async def handle_sse(request: Request) -> Response:
766+
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
767+
await server.run(streams[0], streams[1], server.create_initialization_options())
768+
return Response()
769+
770+
return Starlette(routes=[Route("/sse", endpoint=handle_sse)])
771+
772+
773+
def run_sse_only_server(port: int) -> None: # pragma: no cover
774+
uvicorn.Server(
775+
config=uvicorn.Config(app=make_sse_only_server_app(), host="127.0.0.1", port=port, log_level="error")
776+
).run()
777+
778+
779+
@pytest.mark.anyio
780+
async def test_sse_client_real_server_handles_no_route_match(server_port: int) -> None:
781+
"""End-to-end test: if the server has no POST route at all, httpx surfaces
782+
the 404 to the caller via the read stream rather than hanging.
783+
784+
This catches the 'server is up but refusing POSTs' failure mode, which is
785+
distinct from the deliberate-status test above.
786+
"""
787+
proc = multiprocessing.Process(target=run_sse_only_server, kwargs={"port": server_port}, daemon=True)
788+
proc.start()
789+
try:
790+
wait_for_server(server_port)
791+
server_url = f"http://127.0.0.1:{server_port}"
792+
with anyio.fail_after(5):
793+
async with sse_client(server_url + "/sse") as (read_stream, write_stream):
794+
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
795+
await write_stream.send(SessionMessage(request))
796+
received = await read_stream.receive()
797+
assert isinstance(received, httpx.HTTPStatusError)
798+
assert received.response.status_code == 404
799+
finally:
800+
proc.kill()
801+
proc.join(timeout=2)

tests/shared/test_streamable_http.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,3 +2318,128 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(
23182318

23192319
assert "content-type" in headers_data
23202320
assert headers_data["content-type"] == "application/json"
2321+
2322+
2323+
@pytest.mark.anyio
2324+
async def test_streamable_http_client_propagates_post_network_error_to_caller() -> None:
2325+
"""Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110.
2326+
2327+
When a network-level error occurs during POST (e.g. ConnectError), the
2328+
caller must receive the exception via the read stream instead of hanging
2329+
indefinitely while the error is silently logged.
2330+
"""
2331+
2332+
def handler(request: httpx.Request) -> httpx.Response:
2333+
raise httpx.ConnectError("simulated network failure")
2334+
2335+
mock_client = httpx.AsyncClient(transport=httpx.MockTransport(handler))
2336+
2337+
async with mock_client:
2338+
async with streamable_http_client("http://test/mcp", http_client=mock_client) as (
2339+
read_stream,
2340+
write_stream,
2341+
):
2342+
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
2343+
await write_stream.send(SessionMessage(request))
2344+
2345+
with anyio.fail_after(3):
2346+
received = await read_stream.receive()
2347+
assert isinstance(received, httpx.ConnectError)
2348+
2349+
2350+
# ---- Integration tests against a real uvicorn server (issue #2110) ----
2351+
2352+
2353+
def _fixed_status_mcp_app(post_status: int) -> Starlette: # pragma: no cover
2354+
"""Starlette app whose POST /mcp always returns the given status."""
2355+
from starlette.responses import Response as StarletteResponse
2356+
from starlette.routing import Route
2357+
2358+
async def failing_post(request: Request) -> StarletteResponse:
2359+
return StarletteResponse(status_code=post_status, content=f"deliberate {post_status} for #2110".encode())
2360+
2361+
return Starlette(routes=[Route("/mcp", endpoint=failing_post, methods=["POST", "GET", "DELETE"])])
2362+
2363+
2364+
def _run_fixed_status_mcp_server(port: int, post_status: int) -> None: # pragma: no cover
2365+
uvicorn.Server(
2366+
config=uvicorn.Config(app=_fixed_status_mcp_app(post_status), host="127.0.0.1", port=port, log_level="error")
2367+
).run()
2368+
2369+
2370+
@pytest.fixture
2371+
def failing_mcp_server_port() -> int:
2372+
with socket.socket() as s:
2373+
s.bind(("127.0.0.1", 0))
2374+
return s.getsockname()[1]
2375+
2376+
2377+
@pytest.mark.anyio
2378+
@pytest.mark.parametrize(
2379+
"post_status,expected_rpc_code,expected_message_fragment",
2380+
[
2381+
(500, types.INTERNAL_ERROR, "500"),
2382+
(401, types.INTERNAL_ERROR, "401"),
2383+
(404, types.INVALID_REQUEST, "Session terminated"),
2384+
],
2385+
)
2386+
async def test_streamable_http_client_real_server_preserves_http_status(
2387+
failing_mcp_server_port: int,
2388+
post_status: int,
2389+
expected_rpc_code: int,
2390+
expected_message_fragment: str,
2391+
) -> None:
2392+
"""End-to-end integration test for issue #2110.
2393+
2394+
Against a real uvicorn server whose POST /mcp returns 401/404/500, the
2395+
client must deliver a `JSONRPCError` to the read stream that carries the
2396+
HTTP status code in `error.data`, within a bounded timeout. Before the
2397+
fix, the status was lost (mapped to JSON-RPC error codes with no HTTP
2398+
context), and the 404 'session terminated' branch had no reference at all
2399+
to the underlying HTTP status.
2400+
"""
2401+
proc = multiprocessing.Process(
2402+
target=_run_fixed_status_mcp_server,
2403+
kwargs={"port": failing_mcp_server_port, "post_status": post_status},
2404+
daemon=True,
2405+
)
2406+
proc.start()
2407+
try:
2408+
wait_for_server(failing_mcp_server_port)
2409+
url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp"
2410+
with anyio.fail_after(5):
2411+
async with streamable_http_client(url) as (read_stream, write_stream):
2412+
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
2413+
await write_stream.send(SessionMessage(request))
2414+
received = await read_stream.receive()
2415+
2416+
assert isinstance(received, SessionMessage)
2417+
assert isinstance(received.message, types.JSONRPCError)
2418+
assert received.message.error.code == expected_rpc_code
2419+
assert received.message.error.data == {"http_status": post_status}
2420+
assert expected_message_fragment in received.message.error.message
2421+
finally:
2422+
proc.kill()
2423+
proc.join(timeout=2)
2424+
2425+
2426+
@pytest.mark.anyio
2427+
async def test_streamable_http_client_real_server_surfaces_network_error(
2428+
failing_mcp_server_port: int,
2429+
) -> None:
2430+
"""End-to-end integration test for issue #2110.
2431+
2432+
When the server is entirely unreachable (nothing listening), the client
2433+
must surface the network error on the read stream within a bounded
2434+
timeout, not hang waiting for a response. Before the fix, `post_writer`
2435+
caught the exception and only logged it, leaving the caller blocked.
2436+
"""
2437+
# failing_mcp_server_port is reserved but nothing is bound — this gives us
2438+
# a port that is guaranteed to refuse connections.
2439+
url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp"
2440+
with anyio.fail_after(5):
2441+
async with streamable_http_client(url) as (read_stream, write_stream):
2442+
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
2443+
await write_stream.send(SessionMessage(request))
2444+
received = await read_stream.receive()
2445+
assert isinstance(received, httpx.ConnectError)

0 commit comments

Comments
 (0)