Skip to content

Commit f11b5e0

Browse files
Add retry_interval to FastMCP and test_reconnection to everything-server (SEP-1699)
- Add retry_interval parameter to FastMCP for SSE polling control - Add InMemoryEventStore and test_reconnection tool to everything-server - Enables SSE polling conformance test to pass (server-sse-polling scenario)
1 parent fdcd8f5 commit f11b5e0

File tree

2 files changed

+58
-0
lines changed
  • examples/servers/everything-server/mcp_everything_server
  • src/mcp/server/fastmcp

2 files changed

+58
-0
lines changed

examples/servers/everything-server/mcp_everything_server/server.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from mcp.server.fastmcp import Context, FastMCP
1515
from mcp.server.fastmcp.prompts.base import UserMessage
1616
from mcp.server.session import ServerSession
17+
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
1718
from mcp.types import (
1819
AudioContent,
1920
Completion,
@@ -31,6 +32,41 @@
3132

3233
logger = logging.getLogger(__name__)
3334

35+
# Type aliases for event store
36+
StreamId = str
37+
EventId = str
38+
39+
40+
class InMemoryEventStore(EventStore):
41+
"""Simple in-memory event store for SSE resumability testing."""
42+
43+
def __init__(self) -> None:
44+
self._events: list[tuple[StreamId, EventId, dict | None]] = []
45+
self._event_id_counter = 0
46+
47+
async def store_event(self, stream_id: StreamId, message: dict | None) -> EventId:
48+
"""Store an event and return its ID."""
49+
self._event_id_counter += 1
50+
event_id = str(self._event_id_counter)
51+
self._events.append((stream_id, event_id, message))
52+
return event_id
53+
54+
async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
55+
"""Replay events after the specified ID."""
56+
target_stream_id = None
57+
for stream_id, event_id, _ in self._events:
58+
if event_id == last_event_id:
59+
target_stream_id = stream_id
60+
break
61+
if target_stream_id is None:
62+
return None
63+
last_event_id_int = int(last_event_id)
64+
for stream_id, event_id, message in self._events:
65+
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
66+
await send_callback(EventMessage(message, event_id))
67+
return target_stream_id
68+
69+
3470
# Test data
3571
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
3672
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
@@ -39,8 +75,13 @@
3975
resource_subscriptions: set[str] = set()
4076
watched_resource_content = "Watched resource content"
4177

78+
# Create event store for SSE resumability (SEP-1699)
79+
event_store = InMemoryEventStore()
80+
4281
mcp = FastMCP(
4382
name="mcp-conformance-test-server",
83+
event_store=event_store,
84+
retry_interval=100, # 100ms retry interval for SSE polling
4485
)
4586

4687

@@ -263,6 +304,20 @@ def test_error_handling() -> str:
263304
raise RuntimeError("This tool intentionally returns an error for testing")
264305

265306

307+
@mcp.tool()
308+
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
309+
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
310+
await ctx.info("Before disconnect")
311+
312+
if ctx.close_sse_stream:
313+
await ctx.close_sse_stream()
314+
315+
await asyncio.sleep(0.2) # Wait for client to reconnect
316+
317+
await ctx.info("After reconnect")
318+
return "Reconnection test completed"
319+
320+
266321
# Resources
267322
@mcp.resource("test://static-text")
268323
def static_text_resource() -> str:

src/mcp/server/fastmcp/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def __init__( # noqa: PLR0913
153153
auth_server_provider: (OAuthAuthorizationServerProvider[Any, Any, Any] | None) = None,
154154
token_verifier: TokenVerifier | None = None,
155155
event_store: EventStore | None = None,
156+
retry_interval: int | None = None,
156157
*,
157158
tools: list[Tool] | None = None,
158159
debug: bool = False,
@@ -221,6 +222,7 @@ def __init__( # noqa: PLR0913
221222
if auth_server_provider and not token_verifier: # pragma: no cover
222223
self._token_verifier = ProviderTokenVerifier(auth_server_provider)
223224
self._event_store = event_store
225+
self._retry_interval = retry_interval
224226
self._custom_starlette_routes: list[Route] = []
225227
self.dependencies = self.settings.dependencies
226228
self._session_manager: StreamableHTTPSessionManager | None = None
@@ -940,6 +942,7 @@ def streamable_http_app(self) -> Starlette:
940942
self._session_manager = StreamableHTTPSessionManager(
941943
app=self._mcp_server,
942944
event_store=self._event_store,
945+
retry_interval=self._retry_interval,
943946
json_response=self.settings.json_response,
944947
stateless=self.settings.stateless_http, # Use the stateless setting
945948
security_settings=self.settings.transport_security,

0 commit comments

Comments
 (0)