|
| 1 | +/* |
| 2 | + * Copyright 2024-2024 the original author or authors. |
| 3 | + */ |
| 4 | + |
| 5 | +package io.modelcontextprotocol.client.transport; |
| 6 | + |
| 7 | +import io.modelcontextprotocol.spec.McpSchema; |
| 8 | +import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper; |
| 9 | +import java.time.Duration; |
| 10 | +import java.util.Map; |
| 11 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 12 | +import java.util.concurrent.CountDownLatch; |
| 13 | +import java.util.concurrent.CyclicBarrier; |
| 14 | +import java.util.concurrent.TimeUnit; |
| 15 | +import org.junit.jupiter.api.AfterEach; |
| 16 | +import org.junit.jupiter.api.RepeatedTest; |
| 17 | +import reactor.core.publisher.Mono; |
| 18 | + |
| 19 | +import static org.assertj.core.api.Assertions.assertThat; |
| 20 | + |
| 21 | +/** |
| 22 | + * Reproduces a race condition in StdioClientTransport.sendMessage() when two threads call |
| 23 | + * it concurrently on the same transport instance. |
| 24 | + * |
| 25 | + * <p> |
| 26 | + * The outbound sink (Sinks.many().unicast()) is wrapped by Reactor's SinkManySerialized, |
| 27 | + * which uses a CAS-based guard. When two threads call tryEmitNext concurrently, the CAS |
| 28 | + * loser immediately gets FAIL_NON_SERIALIZED, causing "Failed to enqueue message". |
| 29 | + * |
| 30 | + * <p> |
| 31 | + * This occurs when an MCP server proxies concurrent tool calls to a downstream MCP server |
| 32 | + * via stdio transport — each call is dispatched on a separate thread and both call |
| 33 | + * sendMessage() on the same transport. |
| 34 | + * |
| 35 | + * @see <a href="https://github.com/modelcontextprotocol/java-sdk/issues/875">Issue |
| 36 | + * #875</a> |
| 37 | + */ |
| 38 | +class StdioClientTransportConcurrencyTest { |
| 39 | + |
| 40 | + private StdioClientTransport transport; |
| 41 | + |
| 42 | + @AfterEach |
| 43 | + void tearDown() { |
| 44 | + if (transport != null) { |
| 45 | + transport.closeGracefully().block(Duration.ofSeconds(5)); |
| 46 | + } |
| 47 | + } |
| 48 | + |
| 49 | + @RepeatedTest(20) |
| 50 | + void concurrent_sendMessage_should_not_fail() throws Exception { |
| 51 | + var serverParams = ServerParameters.builder("cat").env(Map.of()).build(); |
| 52 | + transport = new StdioClientTransport(serverParams, new GsonMcpJsonMapper()); |
| 53 | + |
| 54 | + transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5)); |
| 55 | + |
| 56 | + var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1", |
| 57 | + Map.of("name", "tool_a", "arguments", Map.of())); |
| 58 | + var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2", |
| 59 | + Map.of("name", "tool_b", "arguments", Map.of())); |
| 60 | + |
| 61 | + var barrier = new CyclicBarrier(2); |
| 62 | + var errors = new CopyOnWriteArrayList<Throwable>(); |
| 63 | + var latch = new CountDownLatch(2); |
| 64 | + |
| 65 | + for (var msg : new McpSchema.JSONRPCMessage[] { msg1, msg2 }) { |
| 66 | + new Thread(() -> { |
| 67 | + try { |
| 68 | + barrier.await(2, TimeUnit.SECONDS); |
| 69 | + transport.sendMessage(msg).block(Duration.ofSeconds(2)); |
| 70 | + } |
| 71 | + catch (Exception e) { |
| 72 | + errors.add(e); |
| 73 | + } |
| 74 | + finally { |
| 75 | + latch.countDown(); |
| 76 | + } |
| 77 | + }).start(); |
| 78 | + } |
| 79 | + |
| 80 | + latch.await(5, TimeUnit.SECONDS); |
| 81 | + |
| 82 | + assertThat(errors) |
| 83 | + .as("Concurrent sendMessage calls should both succeed, but the unicast sink " |
| 84 | + + "rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext") |
| 85 | + .isEmpty(); |
| 86 | + } |
| 87 | + |
| 88 | +} |
0 commit comments