Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ This document contains critical information about working with this codebase. Fo
- IMPORTANT: Before pushing, verify 100% branch coverage on changed files by running
`uv run --frozen pytest -x` (coverage is configured in `pyproject.toml` with `fail_under = 100`
and `branch = true`). If any branch is uncovered, add a test for it before pushing.
- Avoid `anyio.sleep()` with a fixed duration to wait for async operations. Instead:
- Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test
- For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()`
- Exception: `sleep()` is appropriate when testing time-based features (e.g., timeouts)
- Wrap indefinite waits (`event.wait()`, `stream.receive()`) in `anyio.fail_after(5)` to prevent hangs

Test files mirror the source tree: `src/mcp/client/streamable_http.py` → `tests/client/test_streamable_http.py`
Add tests to the existing file for that module.
Expand Down
2 changes: 2 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def __init__(
] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
self._terminated = False
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None

@property
def is_terminated(self) -> bool:
Expand Down
60 changes: 45 additions & 15 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ class StreamableHTTPSessionManager:
2. Resumability via an optional event store
3. Connection management and lifecycle
4. Request handling and transport setup
5. Idle session cleanup via optional timeout

Important: Only one StreamableHTTPSessionManager instance should be created
per application. The instance cannot be reused after its run() context has
completed. If you need to restart the manager, create a new instance.

Args:
app: The MCP server instance
event_store: Optional event store for resumability support.
If provided, enables resumable connections where clients
can reconnect and receive missed events.
If None, sessions are still tracked but not resumable.
event_store: Optional event store for resumability support. If provided, enables resumable connections
where clients can reconnect and receive missed events. If None, sessions are still tracked but not
resumable.
json_response: Whether to use JSON responses instead of SSE streams
stateless: If True, creates a completely fresh transport for each request
with no session tracking or state persistence between requests.
stateless: If True, creates a completely fresh transport for each request with no session tracking or
state persistence between requests.
security_settings: Optional transport security settings.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
retry field. Used for SSE polling behavior.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE
polling behavior.
session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that
receive no HTTP requests for this duration will be automatically terminated and removed. When
retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to
avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800
(30 minutes) is recommended for most deployments.
"""

def __init__(
Expand All @@ -66,13 +71,20 @@ def __init__(
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
):
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise RuntimeError("session_idle_timeout is not supported in stateless mode")

self.app = app
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings
self.retry_interval = retry_interval
self.session_idle_timeout = session_idle_timeout

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
Expand Down Expand Up @@ -184,6 +196,9 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances:
transport = self._server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
# Push back idle deadline on activity
if transport.idle_scope is not None and self.session_idle_timeout is not None:
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover
await transport.handle_request(scope, receive, send)
return

Expand All @@ -210,16 +225,31 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
read_stream, write_stream = streams
task_status.started()
try:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False, # Stateful mode
)
# Use a cancel scope for idle timeout — when the
# deadline passes the scope cancels app.run() and
# execution continues after the ``with`` block.
# Incoming requests push the deadline forward.
idle_scope = anyio.CancelScope()
if self.session_idle_timeout is not None:
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
http_transport.idle_scope = idle_scope

with idle_scope:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False,
)

if idle_scope.cancelled_caught:
assert http_transport.mcp_session_id is not None
logger.info(f"Session {http_transport.mcp_session_id} idle timeout")
self._server_instances.pop(http_transport.mcp_session_id, None)
await http_transport.terminate()
except Exception:
logger.exception(f"Session {http_transport.mcp_session_id} crashed")
finally:
# Only remove from instances if not terminated
if ( # pragma: no branch
http_transport.mcp_session_id
and http_transport.mcp_session_id in self._server_instances
Expand Down
77 changes: 77 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,80 @@ async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestP
Client(streamable_http_client(f"http://{host}/mcp", http_client=http_client)) as client,
):
await client.list_tools()


@pytest.mark.anyio
async def test_idle_session_is_reaped():
"""After idle timeout fires, the session returns 404."""
app = Server("test-idle-reap")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.05)

async with manager.run():
sent_messages: list[Message] = []

async def mock_send(message: Message):
sent_messages.append(message)

scope = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [(b"content-type", b"application/json")],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

await manager.handle_request(scope, mock_receive, mock_send)

session_id = None
for msg in sent_messages: # pragma: no branch
if msg["type"] == "http.response.start": # pragma: no branch
for header_name, header_value in msg.get("headers", []): # pragma: no branch
if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
session_id = header_value.decode()
break
if session_id: # pragma: no branch
break

assert session_id is not None, "Session ID not found in response headers"

# Wait for the 50ms idle timeout to fire and cleanup to complete
await anyio.sleep(0.1)

# Verify via public API: old session ID now returns 404
response_messages: list[Message] = []

async def capture_send(message: Message):
response_messages.append(message)

scope_with_session = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"mcp-session-id", session_id.encode()),
],
}

await manager.handle_request(scope_with_session, mock_receive, capture_send)

response_start = next(
(msg for msg in response_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None
assert response_start["status"] == 404


def test_session_idle_timeout_rejects_non_positive():
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0)


def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)