Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions packages/middleware/node/test/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,92 @@ describe('Zod v4', () => {
toolResolve!();
});

it('should replay the terminal response after closeSSE and reconnect', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Tool closes its own SSE stream, then produces its final result while the
// client is disconnected. The result must be replayable on reconnect.
let toolResolve: () => void;
const toolCompletePromise = new Promise<void>(resolve => {
toolResolve = resolve;
});

mcpServer.registerTool('close-then-finish-tool', { description: 'Closes its stream, then returns a result' }, async ctx => {
ctx.http?.closeSSE?.();
await toolCompletePromise;
return { content: [{ type: 'text', text: 'replayed-result' }] };
});

const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 100,
method: 'tools/call',
params: { name: 'close-then-finish-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
expect(postResponse.status).toBe(200);

// Read the priming event and capture its event ID for reconnection.
const reader = postResponse.body?.getReader();
const primingData = await reader!.read();
const primingText = new TextDecoder().decode(primingData.value);
const idMatch = primingText.match(/id: ([^\n]+)/);
expect(idMatch).toBeTruthy();
const primingEventId = idMatch![1]!;

// The POST SSE stream closes once the tool calls closeSSE.
const { done } = await reader!.read();
expect(done).toBe(true);

// Let the tool produce its final result while the client is disconnected.
toolResolve!();
await new Promise(resolve => setTimeout(resolve, 100));

// Reconnect with the priming event ID; the terminal response must be replayed.
const reconnectResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25',
'last-event-id': primingEventId
}
});
expect(reconnectResponse.status).toBe(200);

const reconnectReader = reconnectResponse.body?.getReader();
const reconnectData = await reconnectReader!.read();
const reconnectText = new TextDecoder().decode(reconnectData.value);

expect(reconnectText).toContain('replayed-result');
expect(reconnectText).toContain('"id":100');
expect(reconnectText).toContain('id: ');

await reconnectReader!.cancel();
});

it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
Expand Down
25 changes: 21 additions & 4 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,15 +985,20 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

const stream = this._streamMapping.get(streamId);

if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// For SSE responses, generate event ID if event store is provided
if (!this._enableJsonResponse) {
// For SSE responses, generate event ID if event store is provided.
// Store the event even if the stream is currently disconnected (e.g. after
// closeSSEStream) so it can be replayed when the client reconnects, mirroring
// the standalone SSE stream above.
let eventId: string | undefined;

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
// Write the event to the response stream
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
// Write the event to the response stream only if a controller is attached.
if (stream?.controller && stream?.encoder) {
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}
}

if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
Expand All @@ -1005,6 +1010,18 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

if (allResponsesReady) {
if (!stream) {
// The stream was disconnected (e.g. via closeSSEStream) before the final
// response was produced. When an event store is configured, the response
// has already been persisted above and will be replayed on reconnect, so
// just clean up the request mappings. Without an event store there is no
// way to deliver the response, so surface the error as before.
if (this._eventStore && !this._enableJsonResponse) {
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
return;
}
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
if (this._enableJsonResponse && stream.resolveJson) {
Expand Down
Loading