Skip to content

Commit 705b2e8

Browse files
committed
Fix concurrent sendMessage race by retrying on FAIL_NON_SERIALIZED
Replace tryEmitNext (fail-fast) with emitNext + busyLooping(100ms) in StdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. The contention window is microseconds (single CAS operation), so the 100ms duration is just a generous upper bound for pathological cases like GC pauses. Before: 19/20 test repetitions fail After: 20/20 pass Closes #875
1 parent 4348299 commit 705b2e8

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,17 @@ private void handleIncomingErrors() {
227227

228228
@Override
229229
public Mono<Void> sendMessage(JSONRPCMessage message) {
230-
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
231-
// TODO: essentially we could reschedule ourselves in some time and make
232-
// another attempt with the already read data but pause reading until
233-
// success
234-
// In this approach we delegate the retry and the backpressure onto the
235-
// caller. This might be enough for most cases.
230+
try {
231+
// busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from
232+
// another thread) instead of failing immediately. The contention window is
233+
// microseconds (single CAS), so the spin resolves almost instantly; the
234+
// duration is just a generous upper bound for pathological cases like GC
235+
// pauses.
236+
this.outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
236237
return Mono.empty();
237238
}
238-
else {
239-
return Mono.error(new RuntimeException("Failed to enqueue message"));
239+
catch (Sinks.EmissionException e) {
240+
return Mono.error(new RuntimeException("Failed to enqueue message", e));
240241
}
241242
}
242243

0 commit comments

Comments
 (0)