Skip to content

Commit d739975

Browse files
committed
test: add in-process streamable HTTP transport smoke tests
Drives the streamable HTTP Starlette app through httpx's ASGI transport so the full HTTP framing layer (session ids, SSE and JSON response encoding, stateful and stateless modes) runs with no sockets, threads, or subprocesses. Covers the handshake, tool calls and errors, mid-call notifications, the stateless rejection of server-initiated requests, and the routing of unrelated server messages to the standalone stream. Removes the 'pragma: no cover' comments these tests now cover (the session-manager accessors, the no-session-id validation path, and the related-request-id routing branch). The session-manager accessor's unreachable error guard keeps its pragma, moved onto the raise statement itself so the now-executed condition above it is measured normally.
1 parent a358aa4 commit d739975

6 files changed

Lines changed: 291 additions & 6 deletions

File tree

src/mcp/server/lowlevel/server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,12 @@ def session_manager(self) -> StreamableHTTPSessionManager:
349349
Raises:
350350
RuntimeError: If called before streamable_http_app() has been called.
351351
"""
352-
if self._session_manager is None: # pragma: no cover
353-
raise RuntimeError(
352+
if self._session_manager is None:
353+
raise RuntimeError( # pragma: no cover
354354
"Session manager can only be accessed after calling streamable_http_app(). "
355355
"The session manager is created lazily to avoid unnecessary initialization."
356356
)
357-
return self._session_manager # pragma: no cover
357+
return self._session_manager
358358

359359
async def run(
360360
self,

src/mcp/server/mcpserver/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def session_manager(self) -> StreamableHTTPSessionManager:
244244
Raises:
245245
RuntimeError: If called before streamable_http_app() has been called.
246246
"""
247-
return self._lowlevel_server.session_manager # pragma: no cover
247+
return self._lowlevel_server.session_manager
248248

249249
@overload
250250
def run(self, transport: Literal["stdio"] = ...) -> None: ...

src/mcp/server/streamable_http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ async def _validate_request_headers(self, request: Request, send: Send) -> bool:
818818

819819
async def _validate_session(self, request: Request, send: Send) -> bool:
820820
"""Validate the session ID in the request."""
821-
if not self.mcp_session_id: # pragma: no cover
821+
if not self.mcp_session_id:
822822
# If we're not using session IDs, return True
823823
return True
824824

@@ -993,7 +993,7 @@ async def message_router():
993993
if isinstance(message, JSONRPCResponse | JSONRPCError) and message.id is not None:
994994
target_request_id = str(message.id)
995995
# Extract related_request_id from meta if it exists
996-
elif ( # pragma: no cover
996+
elif (
997997
session_message.metadata is not None
998998
and isinstance(
999999
session_message.metadata,

tests/interaction/_requirements.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,67 @@ class Requirement:
591591
behavior="A roots/list_changed notification sent by the client is delivered to the server's handler.",
592592
),
593593
# ═══════════════════════════════════════════════════════════════════════════
594+
# Transports
595+
# ═══════════════════════════════════════════════════════════════════════════
596+
"transport:streamable-http:stateful": Requirement(
597+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
598+
behavior=(
599+
"The interaction round trip (initialize, tool calls, tool errors) works through the "
600+
"streamable HTTP framing in its default stateful SSE-response mode."
601+
),
602+
),
603+
"transport:streamable-http:json-response": Requirement(
604+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
605+
behavior="The interaction round trip works when the server answers with plain JSON instead of SSE.",
606+
),
607+
"transport:streamable-http:stateless": Requirement(
608+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
609+
behavior=(
610+
"The interaction round trip works in stateless mode, where every request is served by a "
611+
"fresh transport with no session id."
612+
),
613+
),
614+
"transport:streamable-http:notifications": Requirement(
615+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
616+
behavior=(
617+
"Notifications emitted during a request are delivered on that request's SSE stream and reach "
618+
"the client's callbacks, in order, before the response."
619+
),
620+
),
621+
"transport:streamable-http:stateless-restrictions": Requirement(
622+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
623+
behavior=(
624+
"A handler that attempts a server-initiated request in stateless mode fails with an error "
625+
"result, because there is no session to call back through."
626+
),
627+
),
628+
"transport:streamable-http:unrelated-messages": Requirement(
629+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
630+
behavior=(
631+
"A server-to-client message that is not related to an in-flight request is routed to the "
632+
"standalone GET stream; a client that never opened one does not receive it."
633+
),
634+
),
635+
"transport:streamable-http:server-to-client": Requirement(
636+
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
637+
behavior=(
638+
"A server-initiated request nested inside an in-flight call round-trips over stateful streamable HTTP."
639+
),
640+
deferred=(
641+
"The in-process ASGI client buffers each response in full, which deadlocks on a "
642+
"server-to-client request nested inside a still-open call. Covered over a real socket by "
643+
"tests/shared/test_streamable_http.py."
644+
),
645+
),
646+
"transport:stdio": Requirement(
647+
source=f"{SPEC_BASE_URL}/basic/transports#stdio",
648+
behavior="The interaction round trip works over a stdio subprocess.",
649+
deferred=(
650+
"Requires a real subprocess. Process lifecycle is covered by tests/client/test_stdio.py and "
651+
"end-to-end stdio coverage belongs to the cross-SDK conformance suite."
652+
),
653+
),
654+
# ═══════════════════════════════════════════════════════════════════════════
594655
# MCPServer behavioural guarantees (not spec-mandated)
595656
# ═══════════════════════════════════════════════════════════════════════════
596657
"mcpserver:tools:output-schema:model": Requirement(

tests/interaction/transports/__init__.py

Whitespace-only changes.
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
"""Smoke tests for the interaction model over the streamable HTTP transport, entirely in process.
2+
3+
The Starlette app a real deployment would hand to uvicorn is driven through httpx's ASGI
4+
transport instead: the full HTTP framing layer runs (session ids, SSE and JSON response
5+
encoding, stateful and stateless session management) with no sockets, threads, or subprocesses,
6+
so these tests are as deterministic as the in-memory ones.
7+
8+
The ASGI client buffers each response in full before the client sees any of it. Request,
9+
response, and notification flows are unaffected -- notifications are written to the request's
10+
SSE stream before the response and arrive in order -- but a server-initiated request nested
11+
inside a still-open call would deadlock, so that scenario is deferred to the real-socket
12+
transport tests (see the `transport:streamable-http:server-to-client` requirement).
13+
"""
14+
15+
from collections.abc import AsyncIterator
16+
from contextlib import asynccontextmanager
17+
18+
import httpx
19+
import pytest
20+
from inline_snapshot import snapshot
21+
from pydantic import BaseModel
22+
23+
from mcp.client.client import Client
24+
from mcp.client.streamable_http import streamable_http_client
25+
from mcp.server.mcpserver import Context, MCPServer
26+
from mcp.server.transport_security import TransportSecuritySettings
27+
from mcp.types import (
28+
CallToolResult,
29+
LoggingMessageNotification,
30+
LoggingMessageNotificationParams,
31+
TextContent,
32+
)
33+
from tests.interaction._helpers import IncomingMessage
34+
from tests.interaction._requirements import requirement
35+
36+
pytestmark = pytest.mark.anyio
37+
38+
39+
def _smoke_server() -> MCPServer:
40+
"""A server exercising one example of each message shape the smoke tests need."""
41+
mcp = MCPServer("smoke", instructions="Talk to the smoke server.")
42+
43+
@mcp.tool()
44+
def echo(text: str) -> str:
45+
"""Echo the text back."""
46+
return text
47+
48+
@mcp.tool()
49+
def fail() -> str:
50+
"""Always fails."""
51+
raise ValueError("deliberately broken")
52+
53+
@mcp.tool()
54+
async def narrate(ctx: Context) -> str:
55+
"""Send a log message and a progress update, then return."""
56+
await ctx.info("starting")
57+
await ctx.report_progress(1, 2)
58+
await ctx.info("finishing")
59+
return "narrated"
60+
61+
class Confirmation(BaseModel):
62+
confirmed: bool
63+
64+
@mcp.tool()
65+
async def ask(ctx: Context) -> str:
66+
"""Attempt a server-initiated elicitation."""
67+
await ctx.elicit("Proceed?", Confirmation)
68+
raise NotImplementedError # only called in stateless mode, where the elicit cannot succeed
69+
70+
@mcp.tool()
71+
async def announce(ctx: Context) -> str:
72+
"""Send one notification related to this request and one that is not."""
73+
await ctx.info("about to announce")
74+
await ctx.session.send_resource_updated("file:///watched.txt")
75+
return "announced"
76+
77+
return mcp
78+
79+
80+
@asynccontextmanager
81+
async def _connected(
82+
mcp: MCPServer, *, stateless_http: bool = False, json_response: bool = False
83+
) -> AsyncIterator[Client]:
84+
"""Yield a Client connected to the server through the in-process streamable HTTP stack."""
85+
# DNS-rebinding protection validates Host/Origin headers against a real network attack that
86+
# cannot exist for an in-process ASGI app; leaving it on would also pull the origin-validation
87+
# branch (deliberately uncovered in src) into coverage.
88+
app = mcp.streamable_http_app(
89+
stateless_http=stateless_http,
90+
json_response=json_response,
91+
transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False),
92+
)
93+
async with mcp.session_manager.run():
94+
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://127.0.0.1:8000") as http:
95+
transport = streamable_http_client("http://127.0.0.1:8000/mcp", http_client=http)
96+
async with Client(transport) as client:
97+
yield client
98+
99+
100+
@requirement("transport:streamable-http:stateful")
101+
async def test_initialize_and_call_a_tool_over_streamable_http() -> None:
102+
"""The handshake and a tool round trip work through the stateful SSE framing."""
103+
async with _connected(_smoke_server()) as client:
104+
assert client.initialize_result.server_info.name == "smoke"
105+
assert client.initialize_result.instructions == "Talk to the smoke server."
106+
result = await client.call_tool("echo", {"text": "over http"})
107+
108+
assert result == snapshot(
109+
CallToolResult(content=[TextContent(text="over http")], structured_content={"result": "over http"})
110+
)
111+
112+
113+
@requirement("transport:streamable-http:stateful")
114+
async def test_tool_errors_round_trip_over_streamable_http() -> None:
115+
"""A tool execution error crosses the HTTP framing as an is_error result, not a transport failure."""
116+
async with _connected(_smoke_server()) as client:
117+
result = await client.call_tool("fail", {})
118+
119+
assert result == snapshot(
120+
CallToolResult(content=[TextContent(text="Error executing tool fail: deliberately broken")], is_error=True)
121+
)
122+
123+
124+
@requirement("transport:streamable-http:json-response")
125+
async def test_tool_call_over_streamable_http_with_json_responses() -> None:
126+
"""The round trip works when the server answers with a single JSON body instead of an SSE stream."""
127+
async with _connected(_smoke_server(), json_response=True) as client:
128+
assert client.initialize_result.server_info.name == "smoke"
129+
result = await client.call_tool("echo", {"text": "as json"})
130+
131+
assert result == snapshot(
132+
CallToolResult(content=[TextContent(text="as json")], structured_content={"result": "as json"})
133+
)
134+
135+
136+
@requirement("transport:streamable-http:stateless")
137+
async def test_tool_calls_over_stateless_streamable_http() -> None:
138+
"""Consecutive requests each succeed against a stateless server with no session to share."""
139+
async with _connected(_smoke_server(), stateless_http=True) as client:
140+
first = await client.call_tool("echo", {"text": "first"})
141+
second = await client.call_tool("echo", {"text": "second"})
142+
143+
assert first == snapshot(
144+
CallToolResult(content=[TextContent(text="first")], structured_content={"result": "first"})
145+
)
146+
assert second == snapshot(
147+
CallToolResult(content=[TextContent(text="second")], structured_content={"result": "second"})
148+
)
149+
150+
151+
@requirement("transport:streamable-http:notifications")
152+
async def test_notifications_during_a_tool_call_arrive_before_the_response() -> None:
153+
"""Log and progress notifications emitted mid-call are delivered on the call's SSE stream in order."""
154+
logs: list[LoggingMessageNotificationParams] = []
155+
progress_updates: list[tuple[float, float | None, str | None]] = []
156+
157+
async def collect_log(params: LoggingMessageNotificationParams) -> None:
158+
logs.append(params)
159+
160+
async def collect_progress(progress: float, total: float | None, message: str | None) -> None:
161+
progress_updates.append((progress, total, message))
162+
163+
server = _smoke_server()
164+
app = server.streamable_http_app(
165+
transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False)
166+
)
167+
async with server.session_manager.run():
168+
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://127.0.0.1:8000") as http:
169+
transport = streamable_http_client("http://127.0.0.1:8000/mcp", http_client=http)
170+
async with Client(transport, logging_callback=collect_log) as client:
171+
result = await client.call_tool("narrate", {}, progress_callback=collect_progress)
172+
173+
assert result == snapshot(
174+
CallToolResult(content=[TextContent(text="narrated")], structured_content={"result": "narrated"})
175+
)
176+
assert [params.data for params in logs] == snapshot(["starting", "finishing"])
177+
assert progress_updates == snapshot([(1.0, 2.0, None)])
178+
179+
180+
@requirement("transport:streamable-http:stateless-restrictions")
181+
async def test_stateless_streamable_http_rejects_server_initiated_requests() -> None:
182+
"""A handler that tries to call back to the client in stateless mode fails: there is no session."""
183+
async with _connected(_smoke_server(), stateless_http=True) as client:
184+
result = await client.call_tool("ask", {})
185+
186+
assert result.is_error is True
187+
assert isinstance(result.content[0], TextContent)
188+
# The exact message is the StatelessModeNotSupported exception text wrapped by the tool-error
189+
# path; pin the stable prefix rather than the full exception prose.
190+
assert result.content[0].text.startswith("Error executing tool ask:")
191+
192+
193+
@requirement("transport:streamable-http:unrelated-messages")
194+
async def test_unrelated_server_messages_are_not_delivered_without_a_listening_stream() -> None:
195+
"""A server message with no related request goes to the standalone GET stream, not the call's stream.
196+
197+
The client never opens the standalone stream, so the resource-updated notification is silently
198+
dropped. The log notification sent by the same tool IS related to the call and does arrive,
199+
proving the collector works and making the absence of the unrelated one meaningful. This is
200+
the transport behaviour that makes `related_request_id` matter.
201+
"""
202+
received: list[IncomingMessage] = []
203+
204+
async def collect(message: IncomingMessage) -> None:
205+
received.append(message)
206+
207+
server = _smoke_server()
208+
app = server.streamable_http_app(
209+
transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False)
210+
)
211+
async with server.session_manager.run():
212+
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://127.0.0.1:8000") as http:
213+
transport = streamable_http_client("http://127.0.0.1:8000/mcp", http_client=http)
214+
async with Client(transport, message_handler=collect) as client:
215+
result = await client.call_tool("announce", {})
216+
217+
assert result == snapshot(
218+
CallToolResult(content=[TextContent(text="announced")], structured_content={"result": "announced"})
219+
)
220+
# Only the related log notification arrives; the resource-updated notification went to the
221+
# standalone stream nobody is reading.
222+
assert received == snapshot(
223+
[LoggingMessageNotification(params=LoggingMessageNotificationParams(level="info", data="about to announce"))]
224+
)

0 commit comments

Comments
 (0)