Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-connection-closed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@modelcontextprotocol/core': patch
'@modelcontextprotocol/server': patch
---

Fix unhandled promise rejections on transport close and detect stdin EOF in StdioServerTransport. Pending request promises are now rejected asynchronously via microtask deferral, and the server transport listens for stdin `end` events to trigger a clean shutdown when the client process exits.
11 changes: 10 additions & 1 deletion packages/core/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,17 @@ export abstract class Protocol<ContextT extends BaseContext> {
try {
this.onclose?.();
} finally {
// Reject pending response handlers on the next microtask to allow
// callers time to attach .catch() handlers and prevent unhandled
// promise rejections (see #1049, #392).
for (const handler of responseHandlers.values()) {
handler(error);
void Promise.resolve().then(() => {
try {
handler(error);
} catch (handlerError) {
this._onerror(handlerError instanceof Error ? handlerError : new Error(String(handlerError)));
}
});
}

for (const controller of requestHandlerAbortControllers.values()) {
Expand Down
47 changes: 47 additions & 0 deletions packages/core/test/shared/protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,53 @@ describe('protocol tests', () => {
expect(oncloseMock).toHaveBeenCalled();
});

test('should reject pending requests with ConnectionClosed when transport closes', async () => {
await protocol.connect(transport);
const mockSchema = z.object({ result: z.string() });
const requestPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema);

// Close transport while request is pending
await transport.close();

// The pending request should reject with ConnectionClosed
await expect(requestPromise).rejects.toThrow('Connection closed');
await expect(requestPromise).rejects.toMatchObject({
code: SdkErrorCode.ConnectionClosed
});
});

test('should not cause unhandled promise rejections when transport closes with pending requests', async () => {
await protocol.connect(transport);
const mockSchema = z.object({ result: z.string() });

// Track unhandled rejections
const unhandledRejections: unknown[] = [];
const processHandler = (reason: unknown) => {
unhandledRejections.push(reason);
};
process.on('unhandledRejection', processHandler);

try {
// Create a pending request and attach .catch() to prevent the expected rejection
// from triggering the handler
const requestPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema);
requestPromise.catch(() => {
// Expected — the request was rejected due to connection close
});

// Close transport
await transport.close();

// Wait for microtasks to flush
await new Promise(resolve => setTimeout(resolve, 50));

// No unhandled rejections should have occurred
expect(unhandledRejections).toHaveLength(0);
} finally {
process.off('unhandledRejection', processHandler);
}
});

test('should abort in-flight request handlers when the connection is closed', async () => {
await protocol.connect(transport);

Expand Down
10 changes: 10 additions & 0 deletions packages/server/src/server/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ export class StdioServerTransport implements Transport {
// Ignore errors during close — we're already in an error path
});
};
_onend = () => {
// stdin EOF means the client process has disconnected.
// Trigger a clean close so pending requests are properly rejected
// and the server can shut down gracefully (see #1049).
this.close().catch(() => {
// Ignore errors during close — we're already in a shutdown path
});
};

/**
* Starts listening for messages on `stdin`.
Expand All @@ -57,6 +65,7 @@ export class StdioServerTransport implements Transport {

this._started = true;
this._stdin.on('data', this._ondata);
this._stdin.on('end', this._onend);
this._stdin.on('error', this._onerror);
this._stdout.on('error', this._onstdouterror);
}
Expand Down Expand Up @@ -84,6 +93,7 @@ export class StdioServerTransport implements Transport {

// Remove our event listeners first
this._stdin.off('data', this._ondata);
this._stdin.off('end', this._onend);
this._stdin.off('error', this._onerror);
this._stdout.off('error', this._onstdouterror);

Expand Down
75 changes: 75 additions & 0 deletions packages/server/test/server/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,78 @@ test('should fire onerror before onclose on stdout error', async () => {

expect(events).toEqual(['error', 'close']);
});

test('should close transport when stdin emits end (EOF)', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

let didClose = false;
server.onclose = () => {
didClose = true;
};

await server.start();
expect(didClose).toBeFalsy();

// Simulate client disconnecting (stdin EOF)
input.push(null);

// Wait for the async close() to complete
await new Promise(resolve => setTimeout(resolve, 50));
expect(didClose).toBeTruthy();
});

test('should not fire onclose twice when stdin EOF followed by explicit close()', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

let closeCount = 0;
server.onclose = () => {
closeCount++;
};

await server.start();

// stdin EOF triggers close
input.push(null);
await new Promise(resolve => setTimeout(resolve, 50));

// Explicit close should be idempotent
await server.close();

expect(closeCount).toBe(1);
});

test('should process remaining messages before closing on stdin EOF', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

const messages: JSONRPCMessage[] = [];
server.onmessage = message => {
messages.push(message);
};

await server.start();

const message: JSONRPCMessage = {
jsonrpc: '2.0',
id: 1,
method: 'ping'
};

// Push a message followed by EOF
input.push(serializeMessage(message));
input.push(null);

// Wait for processing
await new Promise(resolve => setTimeout(resolve, 50));

// The message should have been processed before close
expect(messages).toEqual([message]);
});
Loading