Skip to content

Commit 7c78582

Browse files
committed
Introduce retries
1 parent fa1861e commit 7c78582

File tree

6 files changed

+199
-139
lines changed

6 files changed

+199
-139
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import reactor.core.publisher.Flux;
5050
import reactor.core.publisher.FluxSink;
5151
import reactor.core.publisher.Mono;
52+
import reactor.util.retry.Retry;
5253
import reactor.util.function.Tuple2;
5354
import reactor.util.function.Tuples;
5455

@@ -291,20 +292,11 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
291292
})).flatMap(responseEvent -> {
292293
int statusCode = responseEvent.responseInfo().statusCode();
293294
if (statusCode == 401 || statusCode == 403) {
294-
logger.debug("Authorization error in sendMessage with code {}", statusCode);
295-
return Mono.deferContextual(innerCtx -> {
296-
var transportContext = innerCtx.getOrDefault(McpTransportContext.KEY,
297-
McpTransportContext.EMPTY);
298-
return Mono.from(this.authorizationErrorHandler.onAuthorizationError(
299-
responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
300-
logger.debug("Authorization error handled, retrying original request");
301-
return this.reconnect(stream).then();
302-
}),
303-
Mono.error(new McpHttpClientTransportException(
304-
"Authorization error connecting to SSE stream",
305-
responseEvent.responseInfo()))))
306-
.then(Mono.empty());
307-
});
295+
logger.debug("Authorization error in reconnect with code {}", statusCode);
296+
return Mono.<McpSchema.JSONRPCMessage>error(
297+
new McpHttpClientTransportAuthorizationException(
298+
"Authorization error connecting to SSE stream",
299+
responseEvent.responseInfo()));
308300
}
309301

310302
if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
@@ -389,6 +381,7 @@ else if (statusCode == BAD_REQUEST) {
389381
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
390382
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
391383
})
384+
.retryWhen(authorizationErrorRetrySpec())
392385
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
393386
.onErrorMap(CompletionException.class, t -> t.getCause())
394387
.onErrorComplete(t -> {
@@ -411,6 +404,25 @@ else if (statusCode == BAD_REQUEST) {
411404

412405
}
413406

407+
private Retry authorizationErrorRetrySpec() {
408+
return Retry.from(companion -> companion.flatMap(retrySignal -> {
409+
if (!(retrySignal.failure() instanceof McpHttpClientTransportAuthorizationException authException)) {
410+
return Mono.error(retrySignal.failure());
411+
}
412+
if (retrySignal.totalRetriesInARow() >= this.authorizationErrorHandler.maxRetries()) {
413+
return Mono.error(retrySignal.failure());
414+
}
415+
return Mono.deferContextual(ctx -> {
416+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
417+
return Mono
418+
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
419+
.switchIfEmpty(Mono.just(false))
420+
.flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
421+
: Mono.error(retrySignal.failure()));
422+
});
423+
}));
424+
}
425+
414426
private BodyHandler<Void> toSendMessageBodySubscriber(FluxSink<ResponseEvent> sink) {
415427

416428
BodyHandler<Void> responseBodyHandler = responseInfo -> {
@@ -492,17 +504,8 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
492504
int statusCode = responseEvent.responseInfo().statusCode();
493505
if (statusCode == 401 || statusCode == 403) {
494506
logger.debug("Authorization error in sendMessage with code {}", statusCode);
495-
return Mono.deferContextual(ctx -> {
496-
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
497-
return Mono.from(this.authorizationErrorHandler
498-
.onAuthorizationError(responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
499-
logger.debug("Authorization error handled, retrying original request");
500-
return this.sendMessage(sentMessage);
501-
}), Mono.error(new McpHttpClientTransportException(
502-
"Authorization error when sending message", responseEvent.responseInfo()))))
503-
.doOnSuccess(s -> deliveredSink.success())
504-
.then(Mono.empty());
505-
});
507+
return Mono.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportAuthorizationException(
508+
"Authorization error when sending message", responseEvent.responseInfo()));
506509
}
507510

508511
if (transportSession.markInitialized(
@@ -630,6 +633,7 @@ else if (statusCode == BAD_REQUEST) {
630633
return Flux.<McpSchema.JSONRPCMessage>error(
631634
new RuntimeException("Failed to send message: " + responseEvent));
632635
})
636+
.retryWhen(authorizationErrorRetrySpec())
633637
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
634638
.onErrorMap(CompletionException.class, t -> t.getCause())
635639
.onErrorComplete(t -> {
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.http.HttpResponse;
8+
9+
/**
10+
* Thrown when the MCP server responds with an authorization error (HTTP 401 or HTTP 403).
11+
* Subclass of {@link McpHttpClientTransportException} for targeted retry handling in
12+
* {@link HttpClientStreamableHttpTransport}.
13+
*
14+
* @author Daniel Garnier-Moiroux
15+
*/
16+
public class McpHttpClientTransportAuthorizationException extends McpHttpClientTransportException {
17+
18+
public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
19+
super(message, responseInfo);
20+
}
21+
22+
}

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportException.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@
1010

1111
/**
1212
* Authorization-related exception for {@link java.net.http.HttpClient}-based client
13-
* transport. Thrown when the server responds with HTTP 401 or HTTP 403. Wraps the
14-
* response info for further inspection of the headers and the status code.
13+
* transport. Thrown when the server responds with an HTTP error. Wraps the response info
14+
* for further inspection of the headers and the status code.
1515
*
16-
* @see <a href=
17-
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
18-
* Specification: Authorization</a>
1916
* @author Daniel Garnier-Moiroux
2017
*/
2118
public class McpHttpClientTransportException extends McpTransportException {

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.net.http.HttpResponse;
88

9+
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
910
import io.modelcontextprotocol.client.transport.McpHttpClientTransportException;
1011
import io.modelcontextprotocol.common.McpTransportContext;
1112
import org.reactivestreams.Publisher;
@@ -27,13 +28,13 @@ public interface McpHttpClientAuthorizationErrorHandler {
2728
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
2829
* should be retried or not. If the publisher returns true, the original transport
2930
* method (connect, sendMessage) will be replayed with the original arguments.
30-
* Otherwise, the transport will throw an {@link McpHttpClientTransportException},
31-
* indicating the error status.
31+
* Otherwise, the transport will throw an
32+
* {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
3233
* <p>
3334
* If the returned {@link Publisher} errors, the error will be propagated to the
3435
* calling method, to be handled by the caller.
3536
* <p>
36-
* The caller is responsible for bounding the number of retries.
37+
* The number of retries is bounded by {@link #maxRetries()}.
3738
* @param responseInfo the HTTP response information
3839
* @param context the MCP client transport context
3940
* @return {@link Publisher} emitting true if the original request should be replayed,
@@ -42,36 +43,21 @@ public interface McpHttpClientAuthorizationErrorHandler {
4243
Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
4344

4445
/**
45-
* A no-op handler, used in the default use-case.
46+
* Maximum number of authorization error retries the transport will attempt. When the
47+
* handler signals a retry via {@link #handle}, the transport will replay the original
48+
* request at most this many times. If the authorization error persists after
49+
* exhausting all retries, the transport will propagate the
50+
* {@link McpHttpClientTransportAuthorizationException}. Defaults to {@code 1}.
51+
* @return the maximum number of retries
4652
*/
47-
McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
53+
default int maxRetries() {
54+
return 1;
55+
}
4856

4957
/**
50-
* Handle authorization error (HTTP 401 or 403), and optionally retry the HTTP
51-
* request, or trigger a transport error. To retry, use the {@code retryAction}
52-
* publisher. To emit the default transport error, use the {@code defaultError}
53-
* publisher.
54-
* <p>
55-
* Optionally, the returned {@link Publisher} may error to trigger an out-of-band
56-
* action. In that case, the error will be propagated to the calling method, to be
57-
* handled by the caller.
58-
* <p>
59-
* Defaults to {@link #handle(HttpResponse.ResponseInfo, McpTransportContext)}, and
60-
* uses the boolean from the return value to decide whether it should retry the
61-
* request.
62-
* @param responseInfo the HTTP response information
63-
* @param context the MCP client transport context
64-
* @param retryAction handler to retry the original request
65-
* @param defaultError handler to emit an error
66-
* @return a {@link Publisher} to signal either an error or a retry
58+
* A no-op handler, used in the default use-case.
6759
*/
68-
default Publisher<Void> onAuthorizationError(HttpResponse.ResponseInfo responseInfo, McpTransportContext context,
69-
Publisher<Void> retryAction, Publisher<Void> defaultError) {
70-
return Mono.from(this.handle(responseInfo, context))
71-
.switchIfEmpty(Mono.just(false))
72-
.flatMap(shouldRetry -> shouldRetry != null && shouldRetry ? Mono.from(retryAction)
73-
: Mono.from(defaultError));
74-
}
60+
McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
7561

7662
/**
7763
* Create a {@link McpHttpClientAuthorizationErrorHandler} from a synchronous handler.

mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java

Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
import java.net.http.HttpResponse;
77

88
import io.modelcontextprotocol.common.McpTransportContext;
9-
import org.junit.jupiter.api.Nested;
109
import org.junit.jupiter.api.Test;
11-
import reactor.core.publisher.Mono;
1210
import reactor.test.StepVerifier;
1311

1412
import static org.mockito.Mockito.mock;
@@ -22,85 +20,29 @@ class McpHttpClientAuthorizationErrorHandlerTest {
2220

2321
private final McpTransportContext context = McpTransportContext.EMPTY;
2422

25-
@Nested
26-
class OnAuthorizationError {
27-
28-
@Test
29-
void whenTrueThenRetry() {
30-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono.just(true);
31-
Mono<Void> retryAction = Mono.empty();
32-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
33-
34-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
35-
.verifyComplete();
36-
}
37-
38-
@Test
39-
void whenFalseThenError() {
40-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono.just(false);
41-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
42-
Mono<Void> defaultError = Mono.error(new RuntimeException("authorization error"));
43-
44-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
45-
.expectErrorMatches(t -> t instanceof RuntimeException && t.getMessage().equals("authorization error"))
46-
.verify();
47-
}
48-
49-
@Test
50-
void whenErrorThenPropagate() {
51-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono
52-
.error(new IllegalStateException("handler error"));
53-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
54-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
55-
56-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
57-
.expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("handler error"))
58-
.verify();
59-
}
60-
23+
@Test
24+
void whenTrueThenRetry() {
25+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
26+
.fromSync((info, ctx) -> true);
27+
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(true).verifyComplete();
6128
}
6229

63-
@Nested
64-
class FromSync {
65-
66-
@Test
67-
void whenTrueThenRetry() {
68-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
69-
.fromSync((info, ctx) -> true);
70-
Mono<Void> retryAction = Mono.empty();
71-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
72-
73-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
74-
.verifyComplete();
75-
}
76-
77-
@Test
78-
void whenFalseThenError() {
79-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
80-
.fromSync((info, ctx) -> false);
81-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
82-
Mono<Void> defaultError = Mono.error(new RuntimeException("authorization error"));
83-
84-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
85-
.expectErrorMatches(t -> t instanceof RuntimeException && t.getMessage().equals("authorization error"))
86-
.verify();
87-
}
88-
89-
@Test
90-
void whenExceptionThenPropagate() {
91-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
92-
.fromSync((info, ctx) -> {
93-
throw new IllegalStateException("sync handler error");
94-
});
95-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
96-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
97-
98-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
99-
.expectErrorMatches(
100-
t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
101-
.verify();
102-
}
30+
@Test
31+
void whenFalseThenError() {
32+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
33+
.fromSync((info, ctx) -> false);
34+
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(false).verifyComplete();
35+
}
10336

37+
@Test
38+
void whenExceptionThenPropagate() {
39+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
40+
.fromSync((info, ctx) -> {
41+
throw new IllegalStateException("sync handler error");
42+
});
43+
StepVerifier.create(handler.handle(responseInfo, context))
44+
.expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
45+
.verify();
10446
}
10547

10648
}

0 commit comments

Comments
 (0)