Skip to content

Commit c1eab9d

Browse files
committed
test: add an in-process streaming ASGI transport and cover server-initiated requests over streamable HTTP
1 parent d07f01f commit c1eab9d

6 files changed

Lines changed: 310 additions & 35 deletions

File tree

tests/interaction/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ The whole suite is in-memory and event-driven; it runs in about a second.
2727
SDK's own deliberate output.
2828
- **No sleeps, no real I/O.** Concurrency is coordinated with `anyio.Event`; every wait that
2929
could hang is bounded by `anyio.fail_after(5)`. The streamable HTTP tests drive the Starlette
30-
app in-process through `httpx.ASGITransport` — no sockets, threads, or subprocesses anywhere.
30+
app in-process through the suite's streaming ASGI bridge (`transports/_bridge.py`), which
31+
delivers each response chunk as the server produces it — full duplex, but still no sockets,
32+
threads, or subprocesses anywhere.
3133

3234
## Layout
3335

tests/interaction/_requirements.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1722,7 +1722,8 @@ def __post_init__(self) -> None:
17221722
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",
17231723
behavior=(
17241724
"A server-to-client message that is not related to an in-flight request is routed to the "
1725-
"standalone GET stream; a client that never opened one does not receive it."
1725+
"standalone GET stream and delivered to the client listening on it, not to any request's "
1726+
"own stream."
17261727
),
17271728
transports=("streamable-http",),
17281729
),
@@ -1732,11 +1733,6 @@ def __post_init__(self) -> None:
17321733
"A server-initiated request nested inside an in-flight call round-trips over stateful streamable HTTP."
17331734
),
17341735
transports=("streamable-http",),
1735-
deferred=(
1736-
"The in-process ASGI client buffers each response in full, which deadlocks on a "
1737-
"server-to-client request nested inside a still-open call. Covered over a real socket by "
1738-
"tests/shared/test_streamable_http.py."
1739-
),
17401736
),
17411737
"transport:streamable-http:resumability": Requirement(
17421738
source=f"{SPEC_BASE_URL}/basic/transports#streamable-http",

tests/interaction/test_coverage.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
# Anything listed here is exempt from the every-test-has-a-requirement check.
2727
_HARNESS_SELF_TESTS = {
2828
"tests.interaction.lowlevel.test_wire.test_recording_read_stream_ends_iteration_when_the_sender_closes",
29+
"tests.interaction.transports.test_bridge.test_response_chunks_arrive_as_the_application_sends_them",
30+
"tests.interaction.transports.test_bridge.test_closing_the_response_delivers_a_disconnect_to_the_application",
31+
"tests.interaction.transports.test_bridge.test_an_application_failure_before_the_response_starts_fails_the_request",
2932
}
3033

3134

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""An in-process, full-duplex HTTP transport for driving ASGI applications from httpx.
2+
3+
`httpx.ASGITransport` runs the application to completion and only then hands the buffered
4+
response to the caller, so a server that streams its response — the streamable HTTP transport's
5+
SSE responses — can never converse with the client mid-request: a server-initiated request
6+
nested inside a still-open call deadlocks. `StreamingASGITransport` removes that limitation by
7+
running the application as a background task and forwarding every `http.response.body` chunk to
8+
the client the moment it is sent. Everything happens on the one event loop: no sockets, no
9+
threads, no sleeps, no extra dependencies.
10+
11+
The behavioural contract, pinned by `test_bridge.py`:
12+
13+
- The request body is buffered before the application is invoked (MCP requests are small JSON
14+
documents); the response streams chunk by chunk.
15+
- Closing the response — or the whole client — delivers `http.disconnect` to the application,
16+
exactly as a real server sees when its peer goes away.
17+
- An exception the application raises before sending `http.response.start` fails the originating
18+
request with that same exception. After the response has started, a failure is visible to the
19+
client only through the response itself (status code, truncated body) — the same signal a real
20+
server over a real socket would give.
21+
22+
The transport owns an anyio task group for the application tasks; it is opened and closed by
23+
`httpx.AsyncClient`'s own context manager, so use the client as a context manager (the suite
24+
always does).
25+
"""
26+
27+
import math
28+
from collections.abc import AsyncIterator
29+
from types import TracebackType
30+
31+
import anyio
32+
import anyio.abc
33+
import httpx
34+
from anyio.streams.memory import MemoryObjectReceiveStream
35+
from starlette.types import ASGIApp, Message, Scope
36+
37+
38+
class _StreamingResponseBody(httpx.AsyncByteStream):
39+
"""A response body that yields chunks as the application produces them.
40+
41+
Closing it tells the application the client has gone away (`http.disconnect`), mirroring a
42+
peer that drops the connection mid-response.
43+
"""
44+
45+
def __init__(self, chunks: MemoryObjectReceiveStream[bytes], client_disconnected: anyio.Event) -> None:
46+
self._chunks = chunks
47+
self._client_disconnected = client_disconnected
48+
49+
async def __aiter__(self) -> AsyncIterator[bytes]:
50+
async for chunk in self._chunks:
51+
yield chunk
52+
53+
async def aclose(self) -> None:
54+
self._client_disconnected.set()
55+
await self._chunks.aclose()
56+
57+
58+
class StreamingASGITransport(httpx.AsyncBaseTransport):
59+
"""Drive an ASGI application in-process, streaming each response as it is produced."""
60+
61+
_task_group: anyio.abc.TaskGroup
62+
63+
def __init__(self, app: ASGIApp) -> None:
64+
self._app = app
65+
66+
async def __aenter__(self) -> "StreamingASGITransport":
67+
self._task_group = anyio.create_task_group()
68+
await self._task_group.__aenter__()
69+
return self
70+
71+
async def __aexit__(
72+
self,
73+
exc_type: type[BaseException] | None = None,
74+
exc_value: BaseException | None = None,
75+
traceback: TracebackType | None = None,
76+
) -> None:
77+
# Any application task still running at this point is serving a client that no longer
78+
# exists; cancel rather than wait so harness teardown can never hang.
79+
self._task_group.cancel_scope.cancel()
80+
await self._task_group.__aexit__(exc_type, exc_value, traceback)
81+
82+
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
83+
assert isinstance(request.stream, httpx.AsyncByteStream)
84+
request_body = b"".join([chunk async for chunk in request.stream])
85+
86+
scope: Scope = {
87+
"type": "http",
88+
"asgi": {"version": "3.0"},
89+
"http_version": "1.1",
90+
"method": request.method,
91+
"scheme": request.url.scheme,
92+
"path": request.url.path,
93+
"raw_path": request.url.raw_path.split(b"?", maxsplit=1)[0],
94+
"query_string": request.url.query,
95+
"root_path": "",
96+
"headers": [(name.lower(), value) for name, value in request.headers.raw],
97+
"server": (request.url.host, request.url.port),
98+
"client": ("127.0.0.1", 1234),
99+
}
100+
101+
request_delivered = False
102+
client_disconnected = anyio.Event()
103+
response_started = anyio.Event()
104+
response_status = 0
105+
response_headers: list[tuple[bytes, bytes]] = []
106+
application_error: Exception | None = None
107+
chunk_writer, chunk_reader = anyio.create_memory_object_stream[bytes](math.inf)
108+
109+
async def receive_request() -> Message:
110+
nonlocal request_delivered
111+
if not request_delivered:
112+
request_delivered = True
113+
return {"type": "http.request", "body": request_body, "more_body": False}
114+
await client_disconnected.wait()
115+
return {"type": "http.disconnect"}
116+
117+
async def send_response(message: Message) -> None:
118+
nonlocal response_status, response_headers
119+
if message["type"] == "http.response.start":
120+
response_status = message["status"]
121+
response_headers = list(message.get("headers", []))
122+
response_started.set()
123+
return
124+
assert message["type"] == "http.response.body"
125+
body: bytes = message.get("body", b"")
126+
if body:
127+
await chunk_writer.send(body)
128+
if not message.get("more_body", False):
129+
await chunk_writer.aclose()
130+
131+
async def run_application() -> None:
132+
nonlocal application_error
133+
try:
134+
await self._app(scope, receive_request, send_response)
135+
except Exception as exc: # The bridge is the application's outermost boundary: a crash
136+
# must fail the originating request (or show up in the already-started response),
137+
# never tear down the task group shared with every other in-flight request.
138+
application_error = exc
139+
finally:
140+
response_started.set()
141+
await chunk_writer.aclose()
142+
143+
self._task_group.start_soon(run_application)
144+
await response_started.wait()
145+
if application_error is not None:
146+
# No response will be built, so close the reader the response body would have owned.
147+
await chunk_reader.aclose()
148+
raise application_error
149+
return httpx.Response(
150+
status_code=response_status,
151+
headers=response_headers,
152+
stream=_StreamingResponseBody(chunk_reader, client_disconnected),
153+
request=request,
154+
)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""Contract tests for the suite's streaming ASGI bridge.
2+
3+
These pin what `StreamingASGITransport` itself guarantees — chunk-by-chunk delivery, disconnect
4+
propagation, and failure handling — against minimal hand-written ASGI applications, so the MCP
5+
transport tests built on top of it never have to wonder what the harness provides. They are
6+
harness self-tests, not interaction-model tests, and are exempted from the requirement-coverage
7+
contract in `test_coverage.py`.
8+
"""
9+
10+
import anyio
11+
import httpx
12+
import pytest
13+
from starlette.types import Message, Receive, Scope, Send
14+
15+
from tests.interaction.transports._bridge import StreamingASGITransport
16+
17+
pytestmark = pytest.mark.anyio
18+
19+
20+
async def test_response_chunks_arrive_as_the_application_sends_them() -> None:
21+
"""Each body chunk is delivered as sent, empty chunks are skipped, and the stream ends with the application."""
22+
23+
async def chunked_app(scope: Scope, receive: Receive, send: Send) -> None:
24+
assert scope["type"] == "http"
25+
assert (await receive())["type"] == "http.request"
26+
await send({"type": "http.response.start", "status": 200, "headers": [(b"content-type", b"text/plain")]})
27+
await send({"type": "http.response.body", "body": b"first", "more_body": True})
28+
await send({"type": "http.response.body", "body": b"", "more_body": True})
29+
await send({"type": "http.response.body", "body": b"second", "more_body": False})
30+
31+
async with httpx.AsyncClient(transport=StreamingASGITransport(chunked_app), base_url="http://bridge") as http:
32+
async with http.stream("GET", "/chunks") as response:
33+
with anyio.fail_after(5):
34+
chunks = [chunk async for chunk in response.aiter_raw()]
35+
36+
assert response.status_code == 200
37+
assert response.headers["content-type"] == "text/plain"
38+
assert chunks == [b"first", b"second"]
39+
40+
41+
async def test_closing_the_response_delivers_a_disconnect_to_the_application() -> None:
42+
"""A client that closes the response early is seen by the application as an http.disconnect."""
43+
seen_after_request: list[Message] = []
44+
disconnect_seen = anyio.Event()
45+
46+
async def waiting_app(scope: Scope, receive: Receive, send: Send) -> None:
47+
assert scope["type"] == "http"
48+
assert (await receive())["type"] == "http.request"
49+
await send({"type": "http.response.start", "status": 200, "headers": []})
50+
seen_after_request.append(await receive())
51+
disconnect_seen.set()
52+
53+
async with httpx.AsyncClient(transport=StreamingASGITransport(waiting_app), base_url="http://bridge") as http:
54+
async with http.stream("GET", "/wait") as response:
55+
assert response.status_code == 200
56+
# Leaving the stream block closes the response while the application is still mid-response.
57+
with anyio.fail_after(5):
58+
await disconnect_seen.wait()
59+
60+
assert seen_after_request == [{"type": "http.disconnect"}]
61+
62+
63+
async def test_an_application_failure_before_the_response_starts_fails_the_request() -> None:
64+
"""An exception raised before http.response.start reaches the caller as that same exception."""
65+
66+
async def broken_app(scope: Scope, receive: Receive, send: Send) -> None:
67+
raise RuntimeError("the demo application is broken")
68+
69+
async with httpx.AsyncClient(transport=StreamingASGITransport(broken_app), base_url="http://bridge") as http:
70+
with pytest.raises(RuntimeError, match="the demo application is broken"):
71+
await http.get("/broken")

0 commit comments

Comments
 (0)