Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions crates/rust-mcp-sdk/src/task_store/in_memory_task_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type ScheduledPoll = (Instant, TaskId, Option<SessionId>);

const DEFAULT_PAGE_SIZE: usize = 50;
const DEFAULT_POLL_INTERVAL: i64 = 1250;
// Capacity of the task status-change broadcast channel. Slow subscribers that
// fall behind by more than this many events receive a `Lagged` error.
const DEFAULT_BROADCAST_CAPACITY: usize = 64;

pub struct InMemoryTaskStore<Req, Res>
where
Expand Down Expand Up @@ -199,13 +202,25 @@ where
Res: Debug + Clone + Send + Sync + serde::Deserialize<'static> + serde::Serialize + 'static,
{
pub fn new(page_size: Option<usize>) -> Self {
Self::with_capacity(page_size, None)
}

/// Creates a store with an explicit status-change broadcast capacity.
///
/// * `page_size` - page size for task listing; `None` uses
/// [`DEFAULT_PAGE_SIZE`].
/// * `broadcast_capacity` - capacity of the status-change broadcast channel;
/// `None` uses [`DEFAULT_BROADCAST_CAPACITY`]. A larger capacity tolerates
/// slower subscribers before they observe a `Lagged` error.
pub fn with_capacity(page_size: Option<usize>, broadcast_capacity: Option<usize>) -> Self {
let broadcast_capacity = broadcast_capacity.unwrap_or(DEFAULT_BROADCAST_CAPACITY);
Self {
inner: Arc::new(RwLock::new(InMemoryTaskStoreInner {
tasks: HashMap::new(),
ordered_task_ids: HashMap::new(),
poll_schedule: Some(BinaryHeap::new()),
})),
broadcast: tokio::sync::broadcast::channel(64).0,
broadcast: tokio::sync::broadcast::channel(broadcast_capacity).0,
page_size: page_size.unwrap_or(DEFAULT_PAGE_SIZE),
id_gen: Arc::new(FastIdGenerator::new(Some("tsk"))),
polling_task_handle: Mutex::new(None),
Expand Down Expand Up @@ -445,8 +460,14 @@ where
guard.re_schedule(&mut to_reschedule)
}

let guard = inner.read().await;
let sleep_duration = guard.next_sleep_duration();
// Compute the next sleep duration in a scoped block so the read
// guard is released before awaiting. Holding it across the sleep
// would block every create_task / store_task_result for the
// entire (potentially multi-second) sleep.
let sleep_duration = {
let guard = inner.read().await;
guard.next_sleep_duration()
};

tokio::time::sleep(sleep_duration).await;
}
Expand Down
Loading