Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/rust-mcp-transport/src/client_sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ where
self.pending_requests.clone(),
self.request_timeout,
cancellation_token,
crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY,
);

self.set_message_sender(sender).await;
Expand Down
2 changes: 2 additions & 0 deletions crates/rust-mcp-transport/src/client_streamable_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where
self.pending_requests.clone(),
self.request_timeout,
cancellation_token,
crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY,
);

self.set_message_sender(sender).await;
Expand Down Expand Up @@ -373,6 +374,7 @@ where
self.pending_requests.clone(),
self.request_timeout,
cancellation_token,
crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY,
);

self.set_message_sender(sender).await;
Expand Down
10 changes: 7 additions & 3 deletions crates/rust-mcp-transport/src/mcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tokio::{
sync::Mutex,
};

const CHANNEL_CAPACITY: usize = 36;
/// Default capacity of the incoming-message channel. Used when callers do not
/// override it (see [`crate::TransportOptions::channel_capacity`]).
pub(crate) const DEFAULT_MESSAGE_CHANNEL_CAPACITY: usize = 36;

pub struct MCPStream {}

Expand All @@ -32,6 +34,7 @@ impl MCPStream {
pending_requests: Arc<Mutex<HashMap<RequestId, tokio::sync::oneshot::Sender<R>>>>,
request_timeout: Duration,
cancellation_token: CancellationToken,
channel_capacity: usize,
) -> (
tokio_stream::wrappers::ReceiverStream<X>,
MessageDispatcher<R>,
Expand All @@ -41,7 +44,7 @@ impl MCPStream {
R: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
X: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<X>(CHANNEL_CAPACITY);
let (tx, rx) = tokio::sync::mpsc::channel::<X>(channel_capacity);
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);

// Clone cancellation_token for reader
Expand All @@ -67,6 +70,7 @@ impl MCPStream {
pending_requests: Arc<Mutex<HashMap<RequestId, tokio::sync::oneshot::Sender<R>>>>,
request_timeout: Duration,
cancellation_token: CancellationToken,
channel_capacity: usize,
) -> (
tokio_stream::wrappers::ReceiverStream<X>,
MessageDispatcher<R>,
Expand All @@ -76,7 +80,7 @@ impl MCPStream {
R: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
X: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<X>(CHANNEL_CAPACITY);
let (tx, rx) = tokio::sync::mpsc::channel::<X>(channel_capacity);
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);

// Clone cancellation_token for reader
Expand Down
1 change: 1 addition & 0 deletions crates/rust-mcp-transport/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl Transport<ClientMessages, MessageFromServer, ClientMessage, ServerMessages,
self.pending_requests.clone(),
self.options.timeout,
cancellation_token,
self.options.channel_capacity,
);

if let (Some(session_id), Some(stream_id), Some(event_store)) = (
Expand Down
2 changes: 2 additions & 0 deletions crates/rust-mcp-transport/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ where
self.pending_requests.clone(),
self.options.timeout,
cancellation_token,
self.options.channel_capacity,
);

self.set_message_sender(sender).await;
Expand All @@ -237,6 +238,7 @@ where
self.pending_requests.clone(),
self.options.timeout,
cancellation_token,
self.options.channel_capacity,
);

self.set_message_sender(sender).await;
Expand Down
30 changes: 30 additions & 0 deletions crates/rust-mcp-transport/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,19 @@ pub struct TransportOptions {
/// This value defines the maximum amount of time to wait for a response before
/// considering the request as timed out.
pub timeout: Duration,

/// Capacity of the incoming-message channel buffer.
///
/// A larger value smooths out head-of-line jitter under bursty traffic at
/// the cost of more buffered memory. Defaults to
/// [`DEFAULT_MESSAGE_CHANNEL_CAPACITY`](crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY).
pub channel_capacity: usize,
}
impl Default for TransportOptions {
fn default() -> Self {
Self {
timeout: Duration::from_millis(DEFAULT_TIMEOUT_MSEC),
channel_capacity: crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY,
}
}
}
Expand Down Expand Up @@ -183,3 +191,25 @@ where
// Ok(self)
// }
// }

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn default_channel_capacity_matches_constant() {
assert_eq!(
TransportOptions::default().channel_capacity,
crate::mcp_stream::DEFAULT_MESSAGE_CHANNEL_CAPACITY
);
}

#[test]
fn channel_capacity_is_overridable() {
let options = TransportOptions {
channel_capacity: 256,
..Default::default()
};
assert_eq!(options.channel_capacity, 256);
}
}
Loading