Skip to content

Commit a121041

Browse files
committed
fix: propagate stdio process exit during initialization
1 parent c09ee67 commit a121041

8 files changed

Lines changed: 281 additions & 4 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.atomic.AtomicReference;
1212
import java.util.function.Function;
1313

14+
import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
1415
import io.modelcontextprotocol.spec.McpClientSession;
1516
import io.modelcontextprotocol.spec.McpError;
1617
import io.modelcontextprotocol.spec.McpSchema;
@@ -225,6 +226,16 @@ private void close() {
225226
this.mcpSession().close();
226227
}
227228

229+
private void close(Throwable cause) {
230+
McpClientSession mcpClientSession = this.mcpSession();
231+
if (mcpClientSession != null) {
232+
mcpClientSession.close(cause);
233+
}
234+
else {
235+
this.error(cause);
236+
}
237+
}
238+
228239
private Mono<Void> closeGracefully() {
229240
return this.mcpSession().closeGracefully();
230241
}
@@ -260,6 +271,13 @@ public void handleException(Throwable t) {
260271
// the implicit initialization step.
261272
this.withInitialization("re-initializing", result -> Mono.empty()).subscribe();
262273
}
274+
else if (t instanceof McpStdioServerProcessExitException) {
275+
DefaultInitialization previous = this.initializationRef.get();
276+
if (previous != null && previous.initializeResult() == null
277+
&& this.initializationRef.compareAndSet(previous, null)) {
278+
previous.close(t);
279+
}
280+
}
263281
}
264282

265283
/**
@@ -356,4 +374,4 @@ public Mono<?> closeGracefully() {
356374
});
357375
}
358376

359-
}
377+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import io.modelcontextprotocol.spec.McpTransportException;
8+
import io.modelcontextprotocol.util.Assert;
9+
10+
/**
11+
* Thrown when an MCP stdio server process exits unexpectedly.
12+
*
13+
* @author DragonFSKY
14+
*/
15+
public class McpStdioServerProcessExitException extends McpTransportException {
16+
17+
private static final long serialVersionUID = 1L;
18+
19+
private final int exitCode;
20+
21+
private final String command;
22+
23+
public McpStdioServerProcessExitException(int exitCode, String command) {
24+
super(message(exitCode, command));
25+
this.exitCode = exitCode;
26+
this.command = command;
27+
}
28+
29+
public int getExitCode() {
30+
return this.exitCode;
31+
}
32+
33+
public String getCommand() {
34+
return this.command;
35+
}
36+
37+
private static String message(int exitCode, String command) {
38+
Assert.hasText(command, "The command can not be empty");
39+
return "MCP server process exited unexpectedly with code " + exitCode + " for command: " + command;
40+
}
41+
42+
}

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.ArrayList;
1313
import java.util.List;
1414
import java.util.concurrent.Executors;
15+
import java.util.concurrent.atomic.AtomicReference;
1516
import java.util.function.Consumer;
1617
import java.util.function.Function;
1718

@@ -48,6 +49,10 @@ public class StdioClientTransport implements McpClientTransport {
4849
/** The server process being communicated with */
4950
private Process process;
5051

52+
private final AtomicReference<McpStdioServerProcessExitException> unexpectedExitException = new AtomicReference<>();
53+
54+
private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();
55+
5156
private McpJsonMapper jsonMapper;
5257

5358
/** Scheduler for handling inbound messages from the server process */
@@ -66,6 +71,8 @@ public class StdioClientTransport implements McpClientTransport {
6671

6772
private volatile boolean isClosing = false;
6873

74+
private volatile boolean closeRequested = false;
75+
6976
// visible for tests
7077
private Consumer<String> stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);
7178

@@ -134,6 +141,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
134141
startInboundProcessing();
135142
startOutboundProcessing();
136143
startErrorProcessing();
144+
startExitMonitoring();
137145
logger.info("MCP server started");
138146
}).subscribeOn(Schedulers.boundedElastic());
139147
}
@@ -160,6 +168,11 @@ public void setStdErrorHandler(Consumer<String> errorHandler) {
160168
this.stdErrorHandler = errorHandler;
161169
}
162170

171+
@Override
172+
public void setExceptionHandler(Consumer<Throwable> handler) {
173+
this.exceptionHandler.set(handler);
174+
}
175+
163176
/**
164177
* Waits for the server process to exit.
165178
* @throws RuntimeException if the process is interrupted while waiting
@@ -227,6 +240,14 @@ private void handleIncomingErrors() {
227240

228241
@Override
229242
public Mono<Void> sendMessage(JSONRPCMessage message) {
243+
McpStdioServerProcessExitException exitException = this.unexpectedExitException.get();
244+
if (exitException != null) {
245+
return Mono.error(exitException);
246+
}
247+
if (!this.closeRequested && this.process != null && !this.process.isAlive()) {
248+
exitException = signalUnexpectedProcessExit(this.process.exitValue());
249+
return Mono.error(exitException);
250+
}
230251
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
231252
// TODO: essentially we could reschedule ourselves in some time and make
232253
// another attempt with the already read data but pause reading until
@@ -240,6 +261,32 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
240261
}
241262
}
242263

264+
private void startExitMonitoring() {
265+
this.process.onExit().thenAccept(process -> {
266+
if (!closeRequested) {
267+
signalUnexpectedProcessExit(process.exitValue());
268+
}
269+
});
270+
}
271+
272+
private McpStdioServerProcessExitException signalUnexpectedProcessExit(int exitCode) {
273+
McpStdioServerProcessExitException exception = new McpStdioServerProcessExitException(exitCode,
274+
this.params.getCommand());
275+
if (this.unexpectedExitException.compareAndSet(null, exception)) {
276+
logger.warn(exception.getMessage());
277+
isClosing = true;
278+
inboundSink.tryEmitComplete();
279+
outboundSink.tryEmitComplete();
280+
errorSink.tryEmitComplete();
281+
282+
Consumer<Throwable> handler = this.exceptionHandler.get();
283+
if (handler != null) {
284+
handler.accept(exception);
285+
}
286+
}
287+
return this.unexpectedExitException.get();
288+
}
289+
243290
/**
244291
* Starts the inbound processing thread that reads JSON-RPC messages from the
245292
* process's input stream. Messages are deserialized and emitted to the inbound sink.
@@ -335,6 +382,7 @@ protected void handleOutbound(Function<Flux<JSONRPCMessage>, Flux<JSONRPCMessage
335382
@Override
336383
public Mono<Void> closeGracefully() {
337384
return Mono.fromRunnable(() -> {
385+
closeRequested = true;
338386
isClosing = true;
339387
logger.debug("Initiating graceful shutdown");
340388
}).then(Mono.<Void>defer(() -> {

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,18 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
122122
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
123123
}
124124

125-
private void dismissPendingResponses() {
125+
private void dismissPendingResponses(Throwable cause) {
126126
this.pendingResponses.forEach((id, sink) -> {
127-
logger.warn("Abruptly terminating exchange for request {}", id);
128-
sink.error(new RuntimeException("MCP session with server terminated"));
127+
logger.warn("Abruptly terminating exchange for request {}: {}", id, cause.toString());
128+
sink.error(cause);
129129
});
130130
this.pendingResponses.clear();
131131
}
132132

133+
private void dismissPendingResponses() {
134+
dismissPendingResponses(new RuntimeException("MCP session with server terminated"));
135+
}
136+
133137
private void handle(McpSchema.JSONRPCMessage message) {
134138
if (message instanceof McpSchema.JSONRPCResponse response) {
135139
logger.debug("Received response: {}", response);
@@ -300,4 +304,13 @@ public void close() {
300304
dismissPendingResponses();
301305
}
302306

307+
/**
308+
* Closes the session immediately, failing pending operations with the given cause.
309+
* @param cause the transport-level cause of the closure
310+
*/
311+
public void close(Throwable cause) {
312+
Assert.notNull(cause, "The cause can not be null");
313+
dismissPendingResponses(cause);
314+
}
315+
303316
}

mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import java.util.function.Function;
1212

1313
import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
14+
import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
1415
import io.modelcontextprotocol.spec.McpClientSession;
1516
import io.modelcontextprotocol.spec.McpSchema;
17+
import io.modelcontextprotocol.spec.McpTransportException;
1618
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
1719
import org.junit.jupiter.api.BeforeEach;
1820
import org.junit.jupiter.api.Test;
@@ -302,6 +304,56 @@ void shouldHandleOtherExceptions() {
302304
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
303305
}
304306

307+
@Test
308+
void shouldCloseInProgressInitializationOnStdioProcessExit() {
309+
var cause = new McpStdioServerProcessExitException(127, "java");
310+
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never())
311+
.thenReturn(Mono.just(MOCK_INIT_RESULT));
312+
313+
var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
314+
.subscribe();
315+
316+
initializer.handleException(cause);
317+
subscription.dispose();
318+
319+
verify(mockClientSession).close(cause);
320+
321+
StepVerifier.create(initializer.withInitialization("retry", init -> Mono.just(init.initializeResult())))
322+
.expectNext(MOCK_INIT_RESULT)
323+
.verifyComplete();
324+
325+
verify(mockSessionSupplier, times(2)).apply(any(ContextView.class));
326+
}
327+
328+
@Test
329+
void shouldIgnoreGenericTransportExceptionDuringInitialization() {
330+
var cause = new McpTransportException("Transport closed");
331+
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never());
332+
333+
var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
334+
.subscribe();
335+
336+
initializer.handleException(cause);
337+
subscription.dispose();
338+
339+
verify(mockClientSession, never()).close(cause);
340+
}
341+
342+
@Test
343+
void shouldKeepInitializedAfterTransportException() {
344+
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))
345+
.expectNext(MOCK_INIT_RESULT)
346+
.verifyComplete();
347+
348+
var cause = new McpTransportException("Transport closed");
349+
350+
initializer.handleException(cause);
351+
352+
assertThat(initializer.isInitialized()).isTrue();
353+
verify(mockClientSession, never()).close(cause);
354+
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
355+
}
356+
305357
@Test
306358
void shouldCloseGracefully() {
307359
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))

mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,21 @@ void testRequestTimeout() {
107107
session.close();
108108
}
109109

110+
@Test
111+
void testPendingRequestFailsWithCloseCause() {
112+
var transport = new MockMcpClientTransport();
113+
var session = new McpClientSession(TIMEOUT, transport, Map.of(),
114+
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: {}", params))),
115+
Function.identity());
116+
var cause = new McpTransportException("Transport closed");
117+
118+
Mono<String> responseMono = session.sendRequest(TEST_METHOD, "test", responseType);
119+
120+
StepVerifier.create(responseMono).then(() -> session.close(cause)).expectErrorSatisfies(error -> {
121+
assertThat(error).isSameAs(cause);
122+
}).verify();
123+
}
124+
110125
@Test
111126
void testSendNotification() {
112127
var transport = new MockMcpClientTransport();
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client;
6+
7+
final class FailingStdioServer {
8+
9+
private FailingStdioServer() {
10+
}
11+
12+
public static void main(String[] args) {
13+
System.err.println("Exiting before MCP initialization with code 127");
14+
System.exit(127);
15+
}
16+
17+
}

0 commit comments

Comments
 (0)