Skip to content

Conversation

@MackinnonBuck
Copy link
Collaborator

@MackinnonBuck MackinnonBuck commented Dec 9, 2025

Overview

This PR implements the following features:

  • Client-side polling via the Last-Event-ID header
  • An ISseEventStreamStore abstraction to allow for storage and replay of SSE events
  • Server-side resumability of SSE streams via the Last-Event-ID header
  • Server-side disconnect (to enable client-side polling)

Each of these features has been split into its own commit for ease of review.

Description

There are two disconnection scenarios covered by this PR:

  • Network errors (unintentional disconnection)
  • Server-side programmatic disconnection

If a network error occurs, and an ISseEventStreamStore is configured, then the client may attempt to reconnect by making a GET request with a Last-Event-ID header. The server will then replay stored events before continuing to use the new GET response to stream remaining events. If further disconnections happen, the client may continue to make new GET requests to resume the stream. This applies for both client-initiated requests (POST) and the unsolicited message stream (GET).

However, the server can also initiate a disconnection and force the client to poll for updates. This is useful for avoiding long-running streams. This can be done via the new RequestContext<TParams>.EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken = default) API. When the client reconnects via GET with a Last-Event-ID, the response will only contain events currently available in the ISseEventStreamStore before completing. The client must continue initiating new GET requests at the specified retryInterval until the final response is received.

Fixes #510
Fixes #1020

/// For other transports, this property will be <see langword="null"/>.
/// </para>
/// </remarks>
public Action? CloseStandaloneSseStream { get; set; }
Copy link
Contributor

@halter73 halter73 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes to the JsonRpcMessageContext necessary? I think I see the purpose. You cannot always rely on the request ID from the POST body to identify the final message in the SSE stream, but I wonder if there isn't a cleaner abstraction. Maybe disposing the RelatedTransport? That wouldn't cover the GET stream, but I don't know why you need to be able to close the GET stream via a random JsonRpcMessage at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that it's probably not necessary to support closing the unsolicited message stream.

I've updated how this works. RequestContext now has an EnablePollingAsync(TimeSpan retryInterval, ...) method that calls through to StreamableHttpPostTransport.EnablePollingAsync(...). If the developer knows that an operation is going to take a long time, they can call EnablePollingAsync() which closes the connection and forces the client to poll for updates at a stream-specific interval.

If the connection gets closed via network error and EnablePollingAsync() hasn't been called, then after the client reconnects, the connection will remain open until the stream has completed (the server generates a final response).

}

// If this is a POST stream, we're done - the replay was the complete response
if (streamId != GetStreamId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be similar changes to HandlePostRequestAsync? Also, we need to do more to avoid stream ID collisions. Even though there's one "get" per session, IEventStore is supplied via HttpServerTransportOptions.EventStore which makes it quite difficult to configure a per-session store.

I don't think people want to be forced to configure the store per-session anyway. I think we need to include the session ID or maybe a random GUID for "Stateless" requests if we support stateless resumption at all.

I think it could make sense to support stateless, but we need to have tests for stateless and non-stateless tests. I recommend looking at the MapMcpTests and MapMcpStreamableHttpTests. By adding test cases to MapMcpStreamableHttpTests, you should get both stateless and non-stateless test coverage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we need to do more to avoid stream ID collisions. Even though there's one "get" per session, IEventStore is supplied via HttpServerTransportOptions.EventStore which makes it quite difficult to configure a per-session store.

This is now fixed. The new ISseEventStreamStore abstraction now requires both a SessionId and StreamId to be supplied in order to create a stored SSE event stream. The event IDs returned from the stream are then guaranteed to be globally unique so long as the SessionId is also globally unique and the StreamId is unique within its own session.

I think it could make sense to support stateless, but we need to have tests for stateless and non-stateless tests. I recommend looking at the MapMcpTests and MapMcpStreamableHttpTests. By adding test cases to MapMcpStreamableHttpTests, you should get both stateless and non-stateless test coverage.

I've experimented a bit with supporting stateless, but I think we should do that in a follow-up. The main blockers at the moment are:

  • The server won't know the client's protocol version after the initial handshake. How do we determine if the client supports priming events?
  • In stateless mode, we currently dispose the session when the POST request completes. This ends the message processing loop and prevents the server from sending more messages to the client. We'd need to instead keep the session alive until the final response message is sent.

/// Priming events are only supported in protocol version &gt;= 2025-11-25.
/// Older clients may crash when receiving SSE events with empty data.
/// </remarks>
internal static bool SupportsResumability(string? protocolVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This should be renamed. It's not about whether the protocol supports resumability, but it's specifically about the "priming" event.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to SupportsPrimingEvent()

/// In-memory event store for testing resumability.
/// This is a simple implementation intended for testing, not for production use.
/// </summary>
public class InMemoryEventStore : IEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about the idea of including a default implementation of IEventStore based on IDistributedCache? It'd be opt-in ofc. I could definitely see it being a follow up. Figuring out how to get the usability of that right might influence the design of the abstraction.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've prototyped an IDistribuedCache-based implementation validate the ISseEventStreamStore abstraction, but that implementation is not included in this PR. If we think it's blocking, we can wait to merge this PR until I have that polished up.

/// Gets or sets the retry interval to suggest to clients in SSE retry field.
/// </summary>
/// <value>
/// The retry interval. The default is <see langword="null"/>, meaning no retry field is sent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec states "it SHOULD send an SSE event with a standard retry field". Does that mean our default is going against the spec's recommendation?

Copy link
Collaborator Author

@MackinnonBuck MackinnonBuck Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've now added a non-null default value of 1 second.

else
{
// We have an event ID, so reconnection should work - reset attempts
attempt = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents us from ending up in an infinite loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the server never sends the response/error message and continues returning successful HTTP responses, then this could theoretically loop indefinitely. But I don't see that as different than, for example, the server leaving the response stream open indefinitely and keeping the client waiting for the final message.

/// Implementations should be thread-safe, as events may be stored and replayed concurrently.
/// </para>
/// </remarks>
public interface IEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fairly general name. Could we make it ISseEventStore or something similarly more specific?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Renamed to ISseEventStreamStore.

Comment on lines 25 to 34
/// Stores an event for later retrieval.
/// </summary>
/// <param name="streamId">
/// The ID of the stream the event belongs to. This is typically the JSON-RPC request ID
/// for POST SSE responses, or a special identifier for the standalone GET SSE stream.
/// </param>
/// <param name="message">
/// The JSON-RPC message to store, or <see langword="null"/> for priming events.
/// Priming events establish the event ID without carrying a message payload.
/// </param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the definition of "event" here? It says this is for storing an event, but then accepts the JSON-RPC message to store. Is the intimation that the event contains that message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've since updated the abstraction to instead accept an SseItem<JsonRpcMessage?>, so hopefully this clears up any ambiguity.

Comment on lines 52 to 55
ValueTask<string?> ReplayEventsAfterAsync(
string lastEventId,
Func<JsonRpcMessage, string, CancellationToken, ValueTask> sendCallback,
CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you chose to expose it like this rather than, say, exposing a GetEventsAfter method that returned an enumerable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - changed this to return an IAsyncEnumerable.

Copy link
Contributor

@mikekistler mikekistler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get through all of this but want to share the comments I've made so far.

/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sessionId could be null, should it be declared as string??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if the caller doesn't have a session ID to provide, they should generate a unique one on-the-fly (e.g., a GUID) even if only for the sake of avoiding stream ID conflicts.

/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow the EventStore to selectively store events. For example, we should allow the EventStore to choose to not store Progress notifications. In these cases, it isn't necessary to assign an id to the event -- I'm pretty sure the SSE spec allows this. So could we make the return value a string? to permit the EventStore to choose to not assign an id to the event (which it may not have stored).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated abstraction just returns an SseItem<JsonRpcMessage?>, so an implementation could opt to just return the original item with a null EventId if they don't want to store it.

/// Priming events establish the event ID without carrying a message payload.
/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some requirements on the event ID, like it must be unique within the session, if the server uses session management. It might be good to add these requirements in the doc here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The new XML docs make this clearer.

/// <summary>
/// Gets the session ID that the events belong to.
/// </summary>
public required string SessionId { get; init; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, should SessionId be a string??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// This class simplifies event storage by binding the session ID, stream ID, and retry interval
/// so that callers only need to provide the message when storing events.
/// </remarks>
internal sealed class SseStreamEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be really helpful to expose a base class for the event store that users could easily extend with custom behavior to suit their needs, such as an event filter, id generator, and retryInterval delegate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion. I'm planning to eventually provide an implementation based on IDistributedCache. We could make it unsealed to allow for these scenarios.

@MackinnonBuck MackinnonBuck force-pushed the mbuck/resumability-redelivery branch from 348fac0 to 493062a Compare December 17, 2025 00:14
@MackinnonBuck MackinnonBuck marked this pull request as ready for review December 17, 2025 04:31

// We set the mode to 'Polling' so that the transport can take over writing to the response stream after
// messages have been replayed.
const SseEventStreamMode Mode = SseEventStreamMode.Polling;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't think anything should take over writing the response after messages have replayed, I can see how an SseEventStreamMode.Polling could be useful, but only if we make it user-configurable at a high level via something like an HttpServerTransportOptions property and it doesn't look like it is currently.

Maybe we could make the RetryInterval nullable and assume polling if it's set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I wonder if we should have a separate option to configure polling for the unsolicited message stream. To support resuming the stream after a network error disconnection (non-polling), the server still needs to send a message with a retry field. So it might be better to have a separate property rather than overloading RetryInterval. Maybe enabling polling for the unsolicited message stream can be done in a follow-up?

(Also note that the Polling mode is still currently useful for the solicited case)

internal static bool SupportsPrimingEvent(string? protocolVersion)
{
const string MinResumabilityProtocolVersion = "2025-11-25";
return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think string.Compare will return -1 when protocolVersion is null because we're in stateless mode and therefore haven't necessarily seen the initialize request. In this case, I think we should at least try to fall back to the now-required MCP-Protocol-Version request header for this comparison.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, didn't realize we had that header.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I'm just going to make this return false if protocolVersion is null given that we're not supporting stateless mode in yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we supporting stateless mode yet? It seems like the additional complexity would be put on the implementor of ISseEventStreamStore and not the code in this PR that consumes it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some changes we need to make outside the implementation of ISseEventStreamStore. For example, in stateless mode, sessions currently get disposed immediately when the POST request completes. This causes the message processing loop to stop, which means new messages never reach the transport and never get written to the event stream store. So, we'd need to revisit the behavior around session lifetimes in stateless mode. I could probably do that in this PR, but it's already quite large and I figured leaving out stateless mode support for now could be a way to scope things down. If you think it should be included in this PR, I could go ahead and try to get it working.

SingleWriter = false,
});
private readonly CancellationTokenSource _disposeCts = new();
private readonly SemaphoreSlim _sendLock = new(1, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of adding additional synchronization via the _sendLock either here or in StreamableHttpPostTransport. Is it really necessary? It seems to me that we already have Channels synchronizing the messages in each direction, so this effectively adds a redundant lock.

I understand that we want to preserve the ordering seen by the ISseEventStreamWriter and the SseWriter, but I think that can be done on the read side of the Channel similar to how we had with StopOnFinalResponseFilter previously.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added because operations involving the event stream store aren't thread safe (e.g., getting the event stream or creating it if it doesn't exist).

I could look into adding back SseWriter.MessageFilter and putting all calls to SSE event stream store APIs inside there, although that might make other things more complicated (e.g., if all ISseEventStreamStore APIs must be called through the message filter, passing arguments to those APIs might become unwieldy as now they need to be somehow encoded in the SseItems themselves). An alternative might be to remove the SemaphoreSlim currently inside SseWriter or change the _sendLock in these classes to only lock for the duration of ISseEventStreamStore operations.

internal static bool SupportsPrimingEvent(string? protocolVersion)
{
const string MinResumabilityProtocolVersion = "2025-11-25";
return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we supporting stateless mode yet? It seems like the additional complexity would be put on the implementor of ISseEventStreamStore and not the code in this PR that consumes it.

public Task<bool> SendPrimingEventAsync(TimeSpan retryInterval, ISseEventStreamWriter eventStreamWriter, CancellationToken cancellationToken = default)
{
// Create a priming event: empty data with an event ID
var primingItem = new SseItem<JsonRpcMessage?>(null, "prime")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to write "event: prime"? Do the other SDK's do that? I think the TS SDK just omits the event despite emiting a redundant "event: message" for normal messages.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikekistler said that the event should be "prime": #1077 (comment) , but the spec doesn't seem to require a specific event name. It's possible we could just use an empty event.

if (parentTransport.Stateless)
{
// Polling is currently not supported in stateless mode.
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we decide not to support Stateless stream resumption in the first iteration, I think we should throw here since this indicates a misconfiguration of the server.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calls to this method can originate from MCP server tool calls via RequestContext<T>.EnablePollingAsync(), which I was picturing as a "best effort" to enable polling if it's available (though I'm realizing the XML docs don't make that clear). Polling also isn't a thing for the stdio transport - should this method throw in that case as well? Maybe the answer is yes, but I was thinking it would be nice to allow MCP server tools to remain as transport-agnostic as is reasonable and just no-op when polling isn't applicable.

/// <summary>
/// Tests for the <see cref="ISseEventStreamStore"/> interface and <see cref="TestSseEventStreamStore"/> implementation.
/// </summary>
public class SseEventStreamStoreTests(ITestOutputHelper testOutputHelper) : LoggedTest(testOutputHelper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any of the tests in this class cover anything outside of TestSseEventStreamStore? I'd just delete this file if it's only testing code inside of the test project.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They mostly just cover TestSseEventStreamStore. We can delete this, but I figured it would be helpful to know that if a test in ResumabilityIntegrationTests fails, it's not because of the event stream store implementation.

/// <param name="lastEventId">The ID of the last event received by the client, used to resume from that point.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A reader for the event stream, or <c>null</c> if no matching stream is found.</returns>
ValueTask<ISseEventStreamReader?> GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
ValueTask<ISseEventStreamReader?> GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken);
ValueTask<ISseEventStreamReader?> GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken = default);

I'm also okay with removing the defaults from all of the interfaces. The SDK be the only caller in practice, so the convenience doesn't matter much, and it might encourage us to be more careful about passing through the right token if there's no default.

/// <summary>
/// Gets the current mode of the event stream.
/// </summary>
SseEventStreamMode Mode { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like anything reads this or StreamId other than the tests. Does it need to be part of the interface?

/// <param name="mode">The new mode to set for the event stream.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make the SseEventStreamMode part of the SseEventStreamOptions used to get the ISseEventStreamWriter? I'm not sure it makes sense to switch modes mid-stream.

I like that there's the ability to configure polling programmatically after inspecting the request, but I don't think we need to be able to change modes after we start producing events. Not being force to handle mode changes might make implementing the ISseEventStreamWriter a lot simpler in practice.

Also, do we imagine any scenarios where the ISseEventStore only supports polling? Does this design work if that's the case?

/// <summary>
/// Represents the mode of an SSE event stream.
/// </summary>
public enum SseEventStreamMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something I can configure globally without calling EnablePollingAsync even if the store supports streaming.

/// Causes the event stream returned by <see cref="ISseEventStreamReader.ReadEventsAsync(System.Threading.CancellationToken)"/> to only end when
/// the associated <see cref="ISseEventStreamWriter"/> gets disposed.
/// </summary>
Default = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Default = 0,
Streaming = 0,

I think it's better to be more explicit about what the default is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SEP-1699: Support SSE polling via server-side disconnect Add support for resumability and redelivery to Streamable HTTP transport

5 participants