Skip to content

Commit d86072e

Browse files
committed
Add idle session cleanup to prevent unbounded session accumulation
Implement automatic cleanup of idle sessions when session count exceeds configurable threshold (default 10k). Sessions inactive for longer than the idle timeout (default 30 minutes) are terminated to free resources. This prevents memory exhaustion from clients that create sessions but never send termination requests.
1 parent 47d35f0 commit d86072e

File tree

2 files changed

+215
-3
lines changed

2 files changed

+215
-3
lines changed

src/mcp/server/streamable_http_manager.py

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import contextlib
66
import logging
7+
import time
78
from collections.abc import AsyncIterator
89
from http import HTTPStatus
910
from typing import Any
@@ -51,6 +52,14 @@ class StreamableHTTPSessionManager:
5152
json_response: Whether to use JSON responses instead of SSE streams
5253
stateless: If True, creates a completely fresh transport for each request
5354
with no session tracking or state persistence between requests.
55+
security_settings: Optional security settings for DNS rebinding protection
56+
session_idle_timeout: Maximum idle time in seconds before a session is eligible
57+
for cleanup. Default is 1800 seconds (30 minutes).
58+
cleanup_check_interval: Interval in seconds between cleanup checks.
59+
Default is 300 seconds (5 minutes).
60+
max_sessions_before_cleanup: Threshold number of sessions before idle cleanup
61+
is activated. Default is 10000. Cleanup only runs
62+
when the session count exceeds this threshold.
5463
"""
5564

5665
def __init__(
@@ -60,16 +69,23 @@ def __init__(
6069
json_response: bool = False,
6170
stateless: bool = False,
6271
security_settings: TransportSecuritySettings | None = None,
72+
session_idle_timeout: float = 1800, # 30 minutes default
73+
cleanup_check_interval: float = 300, # 5 minutes default
74+
max_sessions_before_cleanup: int = 10000, # Threshold to activate cleanup
6375
):
6476
self.app = app
6577
self.event_store = event_store
6678
self.json_response = json_response
6779
self.stateless = stateless
6880
self.security_settings = security_settings
81+
self.session_idle_timeout = session_idle_timeout
82+
self.cleanup_check_interval = cleanup_check_interval
83+
self.max_sessions_before_cleanup = max_sessions_before_cleanup
6984

7085
# Session tracking (only used if not stateless)
7186
self._session_creation_lock = anyio.Lock()
7287
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
88+
self._session_last_activity: dict[str, float] = {}
7389

7490
# The task group will be set during lifespan
7591
self._task_group = None
@@ -108,15 +124,21 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
108124
# Store the task group for later use
109125
self._task_group = tg
110126
logger.info("StreamableHTTP session manager started")
127+
128+
# Start the cleanup task if not in stateless mode
129+
if not self.stateless:
130+
tg.start_soon(self._run_session_cleanup)
131+
111132
try:
112133
yield # Let the application run
113134
finally:
114135
logger.info("StreamableHTTP session manager shutting down")
115-
# Cancel task group to stop all spawned tasks
136+
# Cancel task group to stop all spawned tasks (this will also stop cleanup task)
116137
tg.cancel_scope.cancel()
117138
self._task_group = None
118-
# Clear any remaining server instances
139+
# Clear any remaining server instances and tracking
119140
self._server_instances.clear()
141+
self._session_last_activity.clear()
120142

121143
async def handle_request(
122144
self,
@@ -213,6 +235,9 @@ async def _handle_stateful_request(
213235
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances:
214236
transport = self._server_instances[request_mcp_session_id]
215237
logger.debug("Session already exists, handling request directly")
238+
# Update last activity time for this session
239+
if request_mcp_session_id:
240+
self._session_last_activity[request_mcp_session_id] = time.time()
216241
await transport.handle_request(scope, receive, send)
217242
return
218243

@@ -230,6 +255,8 @@ async def _handle_stateful_request(
230255

231256
assert http_transport.mcp_session_id is not None
232257
self._server_instances[http_transport.mcp_session_id] = http_transport
258+
# Track initial activity time for new session
259+
self._session_last_activity[http_transport.mcp_session_id] = time.time()
233260
logger.info(f"Created new transport with session ID: {new_session_id}")
234261

235262
# Define the server runner
@@ -262,6 +289,8 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
262289
"active instances."
263290
)
264291
del self._server_instances[http_transport.mcp_session_id]
292+
# Also remove from activity tracking
293+
self._session_last_activity.pop(http_transport.mcp_session_id, None)
265294

266295
# Assert task group is not None for type checking
267296
assert self._task_group is not None
@@ -277,3 +306,63 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
277306
status_code=HTTPStatus.BAD_REQUEST,
278307
)
279308
await response(scope, receive, send)
309+
310+
async def _run_session_cleanup(self) -> None:
311+
"""
312+
Background task that periodically cleans up idle sessions.
313+
Only performs cleanup when the number of sessions exceeds the threshold.
314+
"""
315+
logger.info(
316+
f"Session cleanup task started (threshold: {self.max_sessions_before_cleanup} sessions, "
317+
f"idle timeout: {self.session_idle_timeout}s)"
318+
)
319+
try:
320+
while True:
321+
await anyio.sleep(self.cleanup_check_interval)
322+
323+
# Only perform cleanup if we're above the threshold
324+
session_count = len(self._server_instances)
325+
if session_count <= self.max_sessions_before_cleanup:
326+
logger.debug(
327+
f"Session count ({session_count}) below threshold "
328+
f"({self.max_sessions_before_cleanup}), skipping cleanup"
329+
)
330+
continue
331+
332+
logger.info(f"Session count ({session_count}) exceeds threshold, performing idle session cleanup")
333+
334+
current_time = time.time()
335+
sessions_to_cleanup: list[tuple[str, float]] = []
336+
337+
# Identify sessions that have been idle too long
338+
for session_id, last_activity in list(self._session_last_activity.items()):
339+
idle_time = current_time - last_activity
340+
if idle_time > self.session_idle_timeout:
341+
sessions_to_cleanup.append((session_id, idle_time))
342+
343+
# Clean up identified sessions
344+
for session_id, idle_time in sessions_to_cleanup:
345+
try:
346+
if session_id in self._server_instances:
347+
transport = self._server_instances[session_id]
348+
logger.info(f"Cleaning up idle session {session_id}")
349+
# Terminate the transport to properly close resources
350+
await transport.terminate()
351+
# Remove from tracking dictionaries
352+
del self._server_instances[session_id]
353+
self._session_last_activity.pop(session_id, None)
354+
except Exception:
355+
logger.exception(f"Error cleaning up session {session_id}")
356+
357+
if sessions_to_cleanup:
358+
logger.info(
359+
f"Cleaned up {len(sessions_to_cleanup)} idle sessions, "
360+
f"{len(self._server_instances)} sessions remaining"
361+
)
362+
363+
except anyio.get_cancelled_exc_class():
364+
logger.info("Session cleanup task cancelled")
365+
raise
366+
except Exception:
367+
logger.exception("Unexpected error in session cleanup task - cleanup task terminated")
368+
# Don't re-raise - let the task end gracefully without crashing the server

tests/server/test_streamable_http_manager.py

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""Tests for StreamableHTTPSessionManager."""
22

3+
import time
34
from typing import Any
4-
from unittest.mock import AsyncMock, patch
5+
from unittest.mock import AsyncMock, MagicMock, patch
56

67
import anyio
78
import pytest
@@ -262,3 +263,125 @@ async def mock_receive():
262263

263264
# Verify internal state is cleaned up
264265
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
266+
267+
268+
@pytest.mark.anyio
269+
async def test_idle_session_cleanup():
270+
"""Test that idle sessions are cleaned up when threshold is exceeded."""
271+
app = Server("test-idle-cleanup")
272+
273+
# Use very short timeouts for testing
274+
manager = StreamableHTTPSessionManager(
275+
app=app,
276+
session_idle_timeout=0.5, # 500ms idle timeout
277+
cleanup_check_interval=0.2, # Check every 200ms
278+
max_sessions_before_cleanup=2, # Low threshold for testing
279+
)
280+
281+
async with manager.run():
282+
# Mock the app.run to prevent it from doing anything
283+
app.run = AsyncMock(side_effect=lambda *args, **kwargs: anyio.sleep(float("inf")))
284+
285+
# Create mock transports directly to simulate sessions
286+
# We'll bypass the HTTP layer for simplicity
287+
session_ids = ["session1", "session2", "session3"]
288+
289+
for session_id in session_ids:
290+
# Create a mock transport
291+
transport = MagicMock(spec=StreamableHTTPServerTransport)
292+
transport.mcp_session_id = session_id
293+
transport.is_terminated = False
294+
transport.terminate = AsyncMock()
295+
296+
# Add to manager's tracking
297+
manager._server_instances[session_id] = transport
298+
manager._session_last_activity[session_id] = time.time()
299+
300+
# Verify all sessions are tracked
301+
assert len(manager._server_instances) == 3
302+
assert len(manager._session_last_activity) == 3
303+
304+
# Wait for cleanup to trigger (sessions should be idle long enough)
305+
await anyio.sleep(1.0) # Wait longer than idle timeout + cleanup interval
306+
307+
# All sessions should be cleaned up since they exceeded idle timeout
308+
assert len(manager._server_instances) == 0, "All idle sessions should be cleaned up"
309+
assert len(manager._session_last_activity) == 0, "Activity tracking should be cleared"
310+
311+
312+
@pytest.mark.anyio
313+
async def test_cleanup_only_above_threshold():
314+
"""Test that cleanup only runs when session count exceeds threshold."""
315+
app = Server("test-threshold")
316+
317+
# Set high threshold so cleanup won't run
318+
manager = StreamableHTTPSessionManager(
319+
app=app,
320+
session_idle_timeout=0.1, # Very short idle timeout
321+
cleanup_check_interval=0.1, # Check frequently
322+
max_sessions_before_cleanup=100, # High threshold
323+
)
324+
325+
async with manager.run():
326+
app.run = AsyncMock(side_effect=lambda *args, **kwargs: anyio.sleep(float("inf")))
327+
328+
# Create just one session (below threshold)
329+
transport = MagicMock(spec=StreamableHTTPServerTransport)
330+
transport.mcp_session_id = "session1"
331+
transport.is_terminated = False
332+
transport.terminate = AsyncMock()
333+
334+
manager._server_instances["session1"] = transport
335+
manager._session_last_activity["session1"] = time.time()
336+
337+
# Wait longer than idle timeout
338+
await anyio.sleep(0.5)
339+
340+
# Session should NOT be cleaned up because we're below threshold
341+
assert len(manager._server_instances) == 1, "Session should not be cleaned when below threshold"
342+
assert "session1" in manager._server_instances
343+
transport.terminate.assert_not_called()
344+
345+
346+
@pytest.mark.anyio
347+
async def test_session_activity_update():
348+
"""Test that session activity is properly updated on requests."""
349+
app = Server("test-activity-update")
350+
manager = StreamableHTTPSessionManager(app=app)
351+
352+
async with manager.run():
353+
# Create a session with known activity time
354+
old_time = time.time() - 100 # 100 seconds ago
355+
356+
transport = MagicMock(spec=StreamableHTTPServerTransport)
357+
transport.mcp_session_id = "test-session"
358+
transport.handle_request = AsyncMock()
359+
360+
manager._server_instances["test-session"] = transport
361+
manager._session_last_activity["test-session"] = old_time
362+
363+
# Simulate a request to existing session
364+
scope = {
365+
"type": "http",
366+
"method": "POST",
367+
"path": "/mcp",
368+
"headers": [
369+
(b"mcp-session-id", b"test-session"),
370+
(b"content-type", b"application/json"),
371+
(b"accept", b"application/json, text/event-stream"),
372+
],
373+
}
374+
375+
async def mock_receive():
376+
return {"type": "http.request", "body": b'{"jsonrpc":"2.0","method":"test","id":1}', "more_body": False}
377+
378+
async def mock_send(message: Message):
379+
pass
380+
381+
# Handle the request
382+
await manager.handle_request(scope, mock_receive, mock_send)
383+
384+
# Activity time should be updated
385+
new_time = manager._session_last_activity["test-session"]
386+
assert new_time > old_time, "Activity time should be updated"
387+
assert new_time >= time.time() - 1, "Activity time should be recent"

0 commit comments

Comments
 (0)