Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/rust-mcp-sdk/src/hyper_servers/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ pub struct HyperServerOptions {
/// Interval between automatic ping messages sent to clients to detect disconnects
pub ping_interval: Duration,

/// Maximum number of concurrent sessions retained by the default in-memory
/// session store. New sessions past this limit are rejected with
/// `503 Service Unavailable`. `None` uses the store default.
pub max_sessions: Option<usize>,

/// Evict sessions that have been idle for longer than this duration.
/// `None` disables idle expiry.
pub session_idle_ttl: Option<Duration>,

/// Enables SSL/TLS if set to `true`
pub enable_ssl: bool,

Expand Down Expand Up @@ -273,6 +282,8 @@ impl Default for HyperServerOptions {
custom_streamable_http_endpoint: None,
custom_messages_endpoint: None,
ping_interval: DEFAULT_CLIENT_PING_INTERVAL,
max_sessions: None,
session_idle_ttl: None,
transport_options: Default::default(),
enable_ssl: false,
ssl_cert_path: None,
Expand Down Expand Up @@ -321,7 +332,10 @@ impl HyperServer {
mut server_options: HyperServerOptions,
) -> Self {
let state: Arc<McpAppState> = Arc::new(McpAppState {
session_store: Arc::new(InMemorySessionStore::new()),
session_store: Arc::new(InMemorySessionStore::with_limits(
server_options.max_sessions,
server_options.session_idle_ttl,
)),
id_generator: server_options
.session_id_generator
.take()
Expand Down
16 changes: 16 additions & 0 deletions crates/rust-mcp-sdk/src/mcp_http/http_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,14 @@ pub(crate) async fn start_new_session(
payload: &str,
auth_info: Option<AuthInfo>,
) -> McpHttpResult<http::Response<GenericBody>> {
if state.session_store.is_full().await {
return error_response(
StatusCode::SERVICE_UNAVAILABLE,
SdkError::internal_error()
.with_message("Server is at maximum session capacity, try again later."),
);
}

let session_id: SessionId = state.id_generator.generate();

let h: Arc<dyn McpServerHandler> = state.handler.clone();
Expand Down Expand Up @@ -716,6 +724,14 @@ pub(crate) async fn handle_sse_connection(
sse_message_endpoint: Option<&str>,
auth_info: Option<AuthInfo>,
) -> McpHttpResult<http::Response<GenericBody>> {
if state.session_store.is_full().await {
return error_response(
StatusCode::SERVICE_UNAVAILABLE,
SdkError::internal_error()
.with_message("Server is at maximum session capacity, try again later."),
);
}

let session_id: SessionId = state.id_generator.generate();

let sse_message_endpoint = sse_message_endpoint.unwrap_or(DEFAULT_MESSAGES_ENDPOINT);
Expand Down
9 changes: 9 additions & 0 deletions crates/rust-mcp-sdk/src/session_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ pub trait SessionStore: Send + Sync {

/// Clears all sessions from the store
async fn clear(&self);

/// Returns `true` when the store cannot accept a new session.
///
/// Callers should reject new-session creation (e.g. `initialize`) with
/// `503 Service Unavailable` when this returns `true`. The default
/// implementation reports unlimited capacity.
async fn is_full(&self) -> bool {
false
}
}
109 changes: 94 additions & 15 deletions crates/rust-mcp-sdk/src/session_store/in_memory_session_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,115 @@ use super::SessionId;
use super::SessionStore;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;

/// In-memory session store implementation
/// Default maximum number of concurrent sessions retained by the store.
pub const DEFAULT_MAX_SESSIONS: usize = 10_000;

fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}

/// A stored session together with the time it was last accessed.
struct SessionEntry {
runtime: Arc<ServerRuntime>,
last_access_ms: AtomicU64,
}

impl SessionEntry {
fn new(runtime: Arc<ServerRuntime>) -> Self {
Self {
runtime,
last_access_ms: AtomicU64::new(now_millis()),
}
}

/// Marks the session as accessed now.
fn touch(&self) {
self.last_access_ms.store(now_millis(), Ordering::Relaxed);
}

/// Returns true if the session has been idle for longer than `ttl_ms`.
fn is_idle(&self, now_ms: u64, ttl_ms: u64) -> bool {
now_ms.saturating_sub(self.last_access_ms.load(Ordering::Relaxed)) > ttl_ms
}
}

/// In-memory session store with a bounded session count and optional idle TTL.
///
/// Stores session data in a thread-safe HashMap, using a read-write lock for
#[derive(Clone, Default)]
/// Idle sessions (older than the configured TTL) are evicted lazily, on access
/// and whenever the store is checked for capacity. Once `max_sessions` is
/// reached the server rejects new sessions with `503 Service Unavailable`,
/// preventing an unauthenticated client from exhausting memory via repeated
/// `initialize` requests.
#[derive(Clone)]
pub struct InMemorySessionStore {
store: Arc<RwLock<HashMap<String, Arc<ServerRuntime>>>>,
store: Arc<RwLock<HashMap<String, SessionEntry>>>,
max_sessions: usize,
idle_ttl: Option<Duration>,
}

impl Default for InMemorySessionStore {
fn default() -> Self {
Self::with_limits(None, None)
}
}

impl InMemorySessionStore {
/// Creates a new in-memory session store
///
/// Initializes an empty HashMap wrapped in a read-write lock for thread-safe access.
///
/// # Returns
/// * `Self` - A new InMemorySessionStore instance
/// Creates a new in-memory session store with default limits
/// ([`DEFAULT_MAX_SESSIONS`], no idle TTL).
pub fn new() -> Self {
Self::default()
}

/// Creates a session store with explicit limits.
///
/// * `max_sessions` - maximum number of concurrent sessions; `None` uses
/// [`DEFAULT_MAX_SESSIONS`]. Pass `Some(usize::MAX)` for an effectively
/// unbounded store.
/// * `idle_ttl` - sessions idle for longer than this are evicted; `None`
/// disables idle expiry.
pub fn with_limits(max_sessions: Option<usize>, idle_ttl: Option<Duration>) -> Self {
Self {
store: Arc::new(RwLock::new(HashMap::new())),
max_sessions: max_sessions.unwrap_or(DEFAULT_MAX_SESSIONS),
idle_ttl,
}
}

/// Evicts sessions idle past the configured TTL and returns the resulting
/// session count.
async fn evict_idle(&self) -> usize {
let Some(ttl) = self.idle_ttl else {
return self.store.read().await.len();
};
let ttl_ms = ttl.as_millis() as u64;
let now = now_millis();
let mut store = self.store.write().await;
store.retain(|_, entry| !entry.is_idle(now, ttl_ms));
store.len()
}
}

/// Implementation of the SessionStore trait for InMemorySessionStore
///
/// Provides asynchronous methods for managing sessions in memory, ensuring
#[async_trait]
impl SessionStore for InMemorySessionStore {
async fn get(&self, key: &SessionId) -> Option<Arc<ServerRuntime>> {
let store = self.store.read().await;
store.get(key).cloned()
let entry = store.get(key)?;
entry.touch();
Some(entry.runtime.clone())
}

async fn set(&self, key: SessionId, value: Arc<ServerRuntime>) {
let mut store = self.store.write().await;
store.insert(key, value);
store.insert(key, SessionEntry::new(value));
}

async fn delete(&self, key: &SessionId) {
Expand All @@ -59,10 +130,18 @@ impl SessionStore for InMemorySessionStore {
}
async fn values(&self) -> Vec<Arc<ServerRuntime>> {
let store = self.store.read().await;
store.values().cloned().collect::<Vec<_>>()
store
.values()
.map(|entry| entry.runtime.clone())
.collect::<Vec<_>>()
}
async fn has(&self, session: &SessionId) -> bool {
let store = self.store.read().await;
store.contains_key(session)
}

async fn is_full(&self) -> bool {
let count = self.evict_idle().await;
count >= self.max_sessions
}
}
34 changes: 34 additions & 0 deletions crates/rust-mcp-sdk/tests/test_streamable_http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,40 @@ async fn should_handle_elicitation() {
server.hyper_runtime.await_server().await.unwrap();
}

// should reject new sessions once the store reaches its capacity
#[tokio::test]
async fn should_reject_new_session_when_at_capacity() {
let server_options = HyperServerOptions {
port: random_port(),
max_sessions: Some(1),
..Default::default()
};

let server = create_start_server(server_options).await;
tokio::time::sleep(Duration::from_millis(250)).await;

let init = ClientJsonrpcRequest::new(RequestId::Integer(0), initialize_request());
let body = serde_json::to_string(&init).unwrap();

// first session is accepted
let first = send_post_request(&server.streamable_url, &body, None, None)
.await
.expect("Request failed");
assert_eq!(first.status(), StatusCode::OK);

// second session is rejected: the store is full
let second = send_post_request(&server.streamable_url, &body, None, None)
.await
.expect("Request failed");
assert_eq!(second.status(), StatusCode::SERVICE_UNAVAILABLE);

// keep the first session's stream open until the assertions complete
drop(first);

server.hyper_runtime.graceful_shutdown(ONE_MILLISECOND);
server.hyper_runtime.await_server().await.unwrap()
}

// should return 400 error for invalid JSON-RPC messages
// should keep stream open after sending server notifications
// NA: should reject second initialization request
Expand Down
Loading