Skip to content

Commit 82ccfa1

Browse files
committed
feat: implement support for SEP-1686 Tasks
1 parent f7a460f commit 82ccfa1

File tree

54 files changed

+15686
-201
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+15686
-201
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 784 additions & 8 deletions
Large diffs are not rendered by default.

mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 149 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.modelcontextprotocol.client;
66

77
import io.modelcontextprotocol.common.McpTransportContext;
8+
import io.modelcontextprotocol.experimental.tasks.TaskStore;
89
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
910
import io.modelcontextprotocol.spec.McpClientTransport;
1011
import io.modelcontextprotocol.spec.McpSchema;
@@ -184,6 +185,8 @@ class SyncSpec {
184185

185186
private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();
186187

188+
private final List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers = new ArrayList<>();
189+
187190
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
188191

189192
private Function<ElicitRequest, ElicitResult> elicitationHandler;
@@ -194,6 +197,10 @@ class SyncSpec {
194197

195198
private boolean enableCallToolSchemaCaching = false; // Default to false
196199

200+
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
201+
202+
private Duration taskPollTimeout; // null = use default (5 minutes)
203+
197204
private SyncSpec(McpClientTransport transport) {
198205
Assert.notNull(transport, "Transport must not be null");
199206
this.transport = transport;
@@ -317,6 +324,44 @@ public SyncSpec elicitation(Function<ElicitRequest, ElicitResult> elicitationHan
317324
return this;
318325
}
319326

327+
/**
328+
* Sets the task store for client-side task hosting. When set, the client can host
329+
* tasks for task-augmented sampling and elicitation requests from the server.
330+
*
331+
* <p>
332+
* This is an experimental feature that may change in future releases.
333+
* @param taskStore The task store implementation. Must not be null.
334+
* @return This builder instance for method chaining
335+
* @throws IllegalArgumentException if taskStore is null
336+
*/
337+
public SyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
338+
Assert.notNull(taskStore, "Task store must not be null");
339+
this.taskStore = taskStore;
340+
return this;
341+
}
342+
343+
/**
344+
* Sets the maximum time to wait for a task to reach a terminal state during task
345+
* result polling.
346+
*
347+
* <p>
348+
* When using task-augmented requests (e.g., long-running tool calls), the client
349+
* polls the server for task status updates. This timeout limits how long the
350+
* client will wait for the task to complete, fail, or be cancelled.
351+
*
352+
* <p>
353+
* If not set, defaults to 5 minutes to prevent infinite polling loops.
354+
*
355+
* <p>
356+
* This is an experimental feature that may change in future releases.
357+
* @param timeout maximum poll duration, or null to use the default (5 minutes)
358+
* @return This builder instance for method chaining
359+
*/
360+
public SyncSpec taskPollTimeout(Duration timeout) {
361+
this.taskPollTimeout = timeout;
362+
return this;
363+
}
364+
320365
/**
321366
* Adds a consumer to be notified when the available tools change. This allows the
322367
* client to react to changes in the server's tool capabilities, such as tools
@@ -428,14 +473,42 @@ public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progre
428473
* @param progressConsumers A list of consumers that receives progress
429474
* notifications. Must not be null.
430475
* @return This builder instance for method chaining
431-
* @throws IllegalArgumentException if progressConsumer is null
476+
* @throws IllegalArgumentException if progressConsumers is null
432477
*/
433478
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
434479
Assert.notNull(progressConsumers, "Progress consumers must not be null");
435480
this.progressConsumers.addAll(progressConsumers);
436481
return this;
437482
}
438483

484+
/**
485+
* Adds a consumer to be notified of task status notifications from the server.
486+
* This enables clients to receive updates about task progress and status changes.
487+
* @param taskStatusConsumer A consumer that receives task status notifications.
488+
* Must not be null.
489+
* @return This builder instance for method chaining
490+
* @throws IllegalArgumentException if taskStatusConsumer is null
491+
*/
492+
public SyncSpec taskStatusConsumer(Consumer<McpSchema.TaskStatusNotification> taskStatusConsumer) {
493+
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
494+
this.taskStatusConsumers.add(taskStatusConsumer);
495+
return this;
496+
}
497+
498+
/**
499+
* Adds multiple consumers to be notified of task status notifications from the
500+
* server.
501+
* @param taskStatusConsumers A list of consumers that receive task status
502+
* notifications. Must not be null.
503+
* @return This builder instance for method chaining
504+
* @throws IllegalArgumentException if taskStatusConsumers is null
505+
*/
506+
public SyncSpec taskStatusConsumers(List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers) {
507+
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
508+
this.taskStatusConsumers.addAll(taskStatusConsumers);
509+
return this;
510+
}
511+
439512
/**
440513
* Add a provider of {@link McpTransportContext}, providing a context before
441514
* calling any client operation. This allows to extract thread-locals and hand
@@ -486,14 +559,15 @@ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching)
486559
public McpSyncClient build() {
487560
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
488561
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
489-
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
490-
this.elicitationHandler, this.enableCallToolSchemaCaching);
562+
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
563+
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
564+
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null);
491565

492566
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
493567

494568
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
495-
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(),
496-
asyncFeatures), this.contextProvider);
569+
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(), asyncFeatures,
570+
this.taskStore), this.contextProvider);
497571
}
498572

499573
}
@@ -540,6 +614,8 @@ class AsyncSpec {
540614

541615
private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
542616

617+
private final List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers = new ArrayList<>();
618+
543619
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
544620

545621
private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
@@ -548,6 +624,10 @@ class AsyncSpec {
548624

549625
private boolean enableCallToolSchemaCaching = false; // Default to false
550626

627+
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
628+
629+
private Duration taskPollTimeout; // null = use default (5 minutes)
630+
551631
private AsyncSpec(McpClientTransport transport) {
552632
Assert.notNull(transport, "Transport must not be null");
553633
this.transport = transport;
@@ -671,6 +751,22 @@ public AsyncSpec elicitation(Function<ElicitRequest, Mono<ElicitResult>> elicita
671751
return this;
672752
}
673753

754+
/**
755+
* Sets the task store for client-side task hosting. When set, the client can host
756+
* tasks for task-augmented sampling and elicitation requests from the server.
757+
*
758+
* <p>
759+
* This is an experimental feature that may change in future releases.
760+
* @param taskStore The task store implementation. Must not be null.
761+
* @return This builder instance for method chaining
762+
* @throws IllegalArgumentException if taskStore is null
763+
*/
764+
public AsyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
765+
Assert.notNull(taskStore, "Task store must not be null");
766+
this.taskStore = taskStore;
767+
return this;
768+
}
769+
674770
/**
675771
* Adds a consumer to be notified when the available tools change. This allows the
676772
* client to react to changes in the server's tool capabilities, such as tools
@@ -785,7 +881,7 @@ public AsyncSpec progressConsumer(Function<McpSchema.ProgressNotification, Mono<
785881
* @param progressConsumers A list of consumers that receives progress
786882
* notifications. Must not be null.
787883
* @return This builder instance for method chaining
788-
* @throws IllegalArgumentException if progressConsumer is null
884+
* @throws IllegalArgumentException if progressConsumers is null
789885
*/
790886
public AsyncSpec progressConsumers(
791887
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
@@ -794,6 +890,35 @@ public AsyncSpec progressConsumers(
794890
return this;
795891
}
796892

893+
/**
894+
* Adds a consumer to be notified of task status notifications from the server.
895+
* This enables clients to receive updates about task progress and status changes.
896+
* @param taskStatusConsumer A consumer that receives task status notifications.
897+
* Must not be null.
898+
* @return This builder instance for method chaining
899+
* @throws IllegalArgumentException if taskStatusConsumer is null
900+
*/
901+
public AsyncSpec taskStatusConsumer(Function<McpSchema.TaskStatusNotification, Mono<Void>> taskStatusConsumer) {
902+
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
903+
this.taskStatusConsumers.add(taskStatusConsumer);
904+
return this;
905+
}
906+
907+
/**
908+
* Adds multiple consumers to be notified of task status notifications from the
909+
* server.
910+
* @param taskStatusConsumers A list of consumers that receive task status
911+
* notifications. Must not be null.
912+
* @return This builder instance for method chaining
913+
* @throws IllegalArgumentException if taskStatusConsumers is null
914+
*/
915+
public AsyncSpec taskStatusConsumers(
916+
List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers) {
917+
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
918+
this.taskStatusConsumers.addAll(taskStatusConsumers);
919+
return this;
920+
}
921+
797922
/**
798923
* Sets the JSON schema validator to use for validating tool responses against
799924
* output schemas.
@@ -819,6 +944,21 @@ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching
819944
return this;
820945
}
821946

947+
/**
948+
* Sets the maximum duration to poll for task completion in
949+
* {@code callToolStream()}. If not set, defaults to 5 minutes to prevent infinite
950+
* polling loops.
951+
*
952+
* <p>
953+
* This is an experimental feature that may change in future releases.
954+
* @param timeout maximum poll duration, or null to use the default (5 minutes)
955+
* @return This builder instance for method chaining
956+
*/
957+
public AsyncSpec taskPollTimeout(Duration timeout) {
958+
this.taskPollTimeout = timeout;
959+
return this;
960+
}
961+
822962
/**
823963
* Create an instance of {@link McpAsyncClient} with the provided configurations
824964
* or sensible defaults.
@@ -832,7 +972,9 @@ public McpAsyncClient build() {
832972
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
833973
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
834974
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
835-
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
975+
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
976+
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null),
977+
this.taskStore);
836978
}
837979

838980
}

0 commit comments

Comments
 (0)