Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/rmcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,12 @@ required-features = [
]
path = "tests/test_streamable_http_stale_session.rs"

[[test]]
name = "test_streamable_http_session_store"
required-features = [
"client",
"server",
"transport-streamable-http-client-reqwest",
"transport-streamable-http-server",
]
path = "tests/test_streamable_http_session_store.rs"
2 changes: 1 addition & 1 deletion crates/rmcp/src/transport/streamable_http_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod session;
#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
pub mod tower;
pub use session::{SessionId, SessionManager};
pub use session::{RestoreOutcome, SessionId, SessionManager, SessionRestoreMarker};
#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
pub use tower::{StreamableHttpServerConfig, StreamableHttpService};
51 changes: 51 additions & 0 deletions crates/rmcp/src/transport/streamable_http_server/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,39 @@ use crate::{

pub mod local;
pub mod never;
pub mod store;

pub use store::{SessionState, SessionStore, SessionStoreError};

/// Extension marker inserted into the `initialize` request extensions during a
/// session restore replay. Handlers can check for its presence to distinguish a
/// cross-instance restore from a genuine client-initiated `initialize` request.
///
/// ```rust,ignore
/// if req.extensions().get::<SessionRestoreMarker>().is_some() {
/// // this is a restore replay, not a fresh client connection
/// }
/// ```
#[derive(Debug, Clone)]
pub struct SessionRestoreMarker {
pub id: SessionId,
}

/// The outcome of a [`SessionManager::restore_session`] call.
#[derive(Debug)]
pub enum RestoreOutcome<T> {
/// The session was just re-created from external state; the caller must
/// spawn an MCP handler against the returned transport and replay the
/// `initialize` handshake.
Restored(T),
/// The session was already present in memory (e.g. a concurrent request
/// already restored it). The caller should proceed as if `has_session`
/// had returned `true` — no further action is required.
AlreadyPresent,
/// This session manager does not support external-store restore.
/// The caller should fall through to the normal 404 response.
NotSupported,
}

/// Controls how MCP sessions are created, validated, and closed.
///
Expand Down Expand Up @@ -98,4 +131,22 @@ pub trait SessionManager: Send + Sync + 'static {
) -> impl Future<
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
> + Send;

/// Attempt to restore a previously-known session from external state,
/// creating a fresh in-memory session worker with the given `id`.
///
/// See [`RestoreOutcome`] for the three possible results:
/// - [`RestoreOutcome::Restored`] — session re-created; caller must spawn
/// an MCP handler and replay the `initialize` handshake.
/// - [`RestoreOutcome::AlreadyPresent`] — session is already in memory
/// (e.g. a concurrent request restored it first); caller proceeds
/// normally.
/// - [`RestoreOutcome::NotSupported`] (default) — this session manager
/// does not support external-store restore; caller returns 404.
fn restore_session(
&self,
_id: SessionId,
) -> impl Future<Output = Result<RestoreOutcome<Self::Transport>, Self::Error>> + Send {
futures::future::ready(Ok(RestoreOutcome::NotSupported))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ impl SessionManager for LocalSessionManager {
handle.push_message(message, None).await?;
Ok(())
}

async fn restore_session(
&self,
id: SessionId,
) -> Result<RestoreOutcome<Self::Transport>, Self::Error> {
let mut sessions = self.sessions.write().await;
if sessions.contains_key(&id) {
// A concurrent request already restored this session.
return Ok(RestoreOutcome::AlreadyPresent);
}
let (handle, worker) = create_local_session(id.clone(), self.session_config.clone());
sessions.insert(id, handle);
Ok(RestoreOutcome::Restored(WorkerTransport::spawn(worker)))
}
}

/// `<index>/request_id>`
Expand Down Expand Up @@ -179,7 +193,7 @@ impl std::str::FromStr for EventId {
}
}

use super::{ServerSseMessage, SessionManager};
use super::{RestoreOutcome, ServerSseMessage, SessionManager};

struct CachedTx {
tx: Sender<ServerSseMessage>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::model::InitializeRequestParams;

/// State persisted to an external store for cross-instance session recovery.
///
/// When a client reconnects to a different server instance, the new instance
/// loads this state to transparently replay the `initialize` handshake without
/// the client needing to re-initialize.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionState {
/// Parameters from the client's original `initialize` request.
pub initialize_params: InitializeRequestParams,
}

/// Type alias for boxed session store errors.
pub type SessionStoreError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Pluggable external session store for cross-instance recovery.
///
/// Implement this trait to back sessions with Redis, a database, or any
/// key-value store. The simplest usage is to set
/// `StreamableHttpServerConfig::session_store` to an `Arc<impl SessionStore>`.
///
/// # Example (in-memory, for testing)
///
/// ```rust,ignore
/// use std::{collections::HashMap, sync::Arc};
/// use tokio::sync::RwLock;
/// use rmcp::transport::streamable_http_server::session::store::{
/// SessionState, SessionStore, SessionStoreError,
/// };
///
/// #[derive(Default)]
/// struct InMemoryStore(Arc<RwLock<HashMap<String, SessionState>>>);
///
/// #[async_trait::async_trait]
/// impl SessionStore for InMemoryStore {
/// async fn load(&self, id: &str) -> Result<Option<SessionState>, SessionStoreError> {
/// Ok(self.0.read().await.get(id).cloned())
/// }
/// async fn store(&self, id: &str, state: &SessionState) -> Result<(), SessionStoreError> {
/// self.0.write().await.insert(id.to_owned(), state.clone());
/// Ok(())
/// }
/// async fn delete(&self, id: &str) -> Result<(), SessionStoreError> {
/// self.0.write().await.remove(id);
/// Ok(())
/// }
/// }
/// ```
#[async_trait::async_trait]
pub trait SessionStore: Send + Sync + 'static {
/// Load session state for the given `session_id`.
///
/// Returns `Ok(None)` when no entry exists (i.e. session is unknown to the store).
async fn load(&self, session_id: &str) -> Result<Option<SessionState>, SessionStoreError>;

/// Persist session state for the given `session_id`.
async fn store(&self, session_id: &str, state: &SessionState) -> Result<(), SessionStoreError>;

/// Remove session state for the given `session_id`.
async fn delete(&self, session_id: &str) -> Result<(), SessionStoreError>;
}
Loading
Loading