Skip to content

Commit 9cce0dc

Browse files
committed
fix: terminate active streamable http sessions on shutdown
1 parent fb2276b commit 9cce0dc

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

src/mcp/server/streamable_http_manager.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,18 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
130130
yield # Let the application run
131131
finally:
132132
logger.info("StreamableHTTP session manager shutting down")
133+
active_transports = list(self._server_instances.values())
134+
self._server_instances.clear()
135+
136+
for transport in active_transports:
137+
try:
138+
await transport.terminate()
139+
except Exception: # pragma: no cover
140+
logger.exception("Failed to terminate active streamable HTTP session during shutdown")
141+
133142
# Cancel task group to stop all spawned tasks
134143
tg.cancel_scope.cancel()
135144
self._task_group = None
136-
# Clear any remaining server instances
137-
self._server_instances.clear()
138145

139146
async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None:
140147
"""Process ASGI request with proper session handling and transport setup.

tests/server/test_streamable_http_manager.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,65 @@ async def mock_receive(): # pragma: no cover
207207
assert not manager._server_instances, "No sessions should be tracked after the only session crashes"
208208

209209

210+
@pytest.mark.anyio
211+
async def test_run_terminates_active_stateful_sessions_on_shutdown():
212+
app = Server("test-shutdown-cleanup")
213+
manager = StreamableHTTPSessionManager(app=app)
214+
created_transports: list[StreamableHTTPServerTransport] = []
215+
run_started = anyio.Event()
216+
217+
original_constructor = StreamableHTTPServerTransport
218+
219+
def track_transport(*args: Any, **kwargs: Any) -> StreamableHTTPServerTransport:
220+
transport = original_constructor(*args, **kwargs)
221+
created_transports.append(transport)
222+
return transport
223+
224+
async def block_run(*args: Any, **kwargs: Any) -> None:
225+
run_started.set()
226+
await anyio.sleep_forever()
227+
228+
app.run = AsyncMock(side_effect=block_run)
229+
230+
sent_messages: list[Message] = []
231+
232+
async def mock_send(message: Message):
233+
sent_messages.append(message)
234+
235+
scope = {
236+
"type": "http",
237+
"method": "POST",
238+
"path": "/mcp",
239+
"headers": [(b"content-type", b"application/json")],
240+
}
241+
242+
async def mock_receive():
243+
return {"type": "http.request", "body": b"", "more_body": False} # pragma: no cover
244+
245+
with patch.object(streamable_http_manager, "StreamableHTTPServerTransport", side_effect=track_transport):
246+
async with manager.run():
247+
await manager.handle_request(scope, mock_receive, mock_send)
248+
await run_started.wait()
249+
250+
assert len(created_transports) == 1
251+
transport = created_transports[0]
252+
terminate_spy = AsyncMock(side_effect=transport.terminate)
253+
transport.terminate = terminate_spy
254+
255+
response_start = next(
256+
(msg for msg in sent_messages if msg["type"] == "http.response.start"),
257+
None,
258+
)
259+
assert response_start is not None
260+
assert manager._server_instances
261+
262+
await anyio.sleep(0)
263+
264+
terminate_spy.assert_awaited_once()
265+
assert transport._terminated
266+
assert not manager._server_instances
267+
268+
210269
@pytest.mark.anyio
211270
async def test_stateless_requests_memory_cleanup():
212271
"""Test that stateless requests actually clean up resources using real transports."""

0 commit comments

Comments
 (0)