|
17 | 17 | from httpx_sse import EventSource, ServerSentEvent |
18 | 18 | from inline_snapshot import snapshot |
19 | 19 |
|
| 20 | +from mcp.client.session import ClientSession |
| 21 | +from mcp.client.streamable_http import streamable_http_client |
20 | 22 | from mcp.server.mcpserver import Context, MCPServer |
| 23 | +from mcp.shared.message import ClientMessageMetadata |
21 | 24 | from mcp.types import ( |
| 25 | + LATEST_PROTOCOL_VERSION, |
| 26 | + CallToolRequest, |
| 27 | + CallToolRequestParams, |
22 | 28 | CallToolResult, |
23 | 29 | JSONRPCNotification, |
24 | 30 | JSONRPCRequest, |
|
28 | 34 | jsonrpc_message_adapter, |
29 | 35 | ) |
30 | 36 | from tests.interaction._connect import ( |
| 37 | + BASE_URL, |
31 | 38 | base_headers, |
32 | 39 | connect_over_streamable_http, |
33 | 40 | initialize_via_http, |
@@ -229,13 +236,12 @@ async def hold(ctx: Context) -> str: |
229 | 236 | await finished.wait() |
230 | 237 |
|
231 | 238 |
|
232 | | -# This test intentionally carries every resumability requirement: the close-then-resume |
233 | | -# scenario is indivisible, so splitting it would mean six near-identical bodies. |
| 239 | +# This test intentionally carries every automatic-reconnection requirement: the |
| 240 | +# close-then-resume scenario is indivisible, so splitting it would mean five near-identical bodies. |
234 | 241 | @requirement("hosting:resume:close-stream") |
235 | 242 | @requirement("transport:streamable-http:resumability") |
236 | 243 | @requirement("client-transport:http:reconnect-post-priming") |
237 | 244 | @requirement("client-transport:http:reconnect-retry-value") |
238 | | -@requirement("client-transport:http:resume-stream-api") |
239 | 245 | @requirement("flow:resume:tool-call-resumption-token") |
240 | 246 | async def test_a_call_whose_stream_the_server_closes_is_resumed_by_the_client() -> None: |
241 | 247 | """A server-closed request stream is reconnected by the client and the call completes. |
@@ -288,3 +294,78 @@ async def call() -> None: |
288 | 294 | [CallToolResult(content=[TextContent(text="resumed")], structured_content={"result": "resumed"})] |
289 | 295 | ) |
290 | 296 | assert received == snapshot(["before close", "after close"]) |
| 297 | + |
| 298 | + |
| 299 | +@requirement("client-transport:http:resume-stream-api") |
| 300 | +async def test_a_captured_resumption_token_replays_missed_messages_on_a_new_connection() -> None: |
| 301 | + """A resumption token captured via on_resumption_token_update on one connection lets a fresh |
| 302 | + connection retrieve the messages it missed by passing resumption_token to send_request. |
| 303 | +
|
| 304 | + This is the explicit ClientMessageMetadata API, distinct from the automatic reconnection the |
| 305 | + previous test covers: the transport dispatches a resumption_token request as a GET with |
| 306 | + Last-Event-ID instead of POSTing the body, and remaps the replayed response onto the new |
| 307 | + request's id. Client.call_tool does not expose ClientMessageMetadata, so the test drives a |
| 308 | + bare ClientSession via session.send_request -- the sanctioned drop-down for behaviour Client |
| 309 | + cannot express. The second connection carries the original session id but does not initialize |
| 310 | + (the server-side session already is), modelling a caller that resumes after a process restart. |
| 311 | + """ |
| 312 | + captured: list[str] = [] |
| 313 | + received: list[object] = [] |
| 314 | + first_seen = anyio.Event() |
| 315 | + token_seen = anyio.Event() |
| 316 | + release = anyio.Event() |
| 317 | + store = SequencedEventStore() |
| 318 | + |
| 319 | + mcp = MCPServer("resumable") |
| 320 | + |
| 321 | + @mcp.tool() |
| 322 | + async def hold(ctx: Context) -> str: |
| 323 | + """Emit one notification, wait for the test, emit another, return.""" |
| 324 | + await ctx.info("first") |
| 325 | + await release.wait() |
| 326 | + await ctx.info("second") |
| 327 | + return "done" |
| 328 | + |
| 329 | + async def on_token(token: str) -> None: |
| 330 | + captured.append(token) |
| 331 | + if len(captured) >= 2: |
| 332 | + token_seen.set() |
| 333 | + |
| 334 | + async def collect(params: LoggingMessageNotificationParams) -> None: |
| 335 | + received.append(params.data) |
| 336 | + first_seen.set() |
| 337 | + |
| 338 | + call = CallToolRequest(params=CallToolRequestParams(name="hold", arguments={})) |
| 339 | + capture = ClientMessageMetadata(on_resumption_token_update=on_token) |
| 340 | + |
| 341 | + async with mounted_app(mcp, event_store=store, retry_interval=0) as (http, manager): |
| 342 | + with anyio.fail_after(5): # pragma: no branch |
| 343 | + async with ( |
| 344 | + streamable_http_client(f"{BASE_URL}/mcp", http_client=http, terminate_on_close=False) as (r1, w1), |
| 345 | + ClientSession(r1, w1, logging_callback=collect) as first, |
| 346 | + anyio.create_task_group() as tg, |
| 347 | + ): |
| 348 | + await first.initialize() |
| 349 | + tg.start_soon(first.send_request, call, CallToolResult, None, capture) |
| 350 | + await first_seen.wait() |
| 351 | + await token_seen.wait() |
| 352 | + tg.cancel_scope.cancel() |
| 353 | + assert captured == snapshot(["3", "4"]) |
| 354 | + assert received == snapshot(["first"]) |
| 355 | + # The session id is only observable via the manager (the client transport does not expose it). |
| 356 | + (session_id,) = manager._server_instances |
| 357 | + |
| 358 | + release.set() |
| 359 | + # init priming + init response + call priming + "first" + "second" + result = 6 stored events. |
| 360 | + await store.wait_until_stored(6) |
| 361 | + http.headers["mcp-session-id"] = session_id |
| 362 | + http.headers["mcp-protocol-version"] = LATEST_PROTOCOL_VERSION |
| 363 | + async with ( |
| 364 | + streamable_http_client(f"{BASE_URL}/mcp", http_client=http) as (r2, w2), |
| 365 | + ClientSession(r2, w2, logging_callback=collect) as second, |
| 366 | + ): |
| 367 | + result = await second.send_request( |
| 368 | + call, CallToolResult, metadata=ClientMessageMetadata(resumption_token=captured[-1]) |
| 369 | + ) |
| 370 | + assert result == snapshot(CallToolResult(content=[TextContent(text="done")], structured_content={"result": "done"})) |
| 371 | + assert received == snapshot(["first", "second"]) |
0 commit comments