Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/everything/docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ Use `trigger-sampling-request-async` or `trigger-elicitation-request-async` to d

MCP Tasks are bidirectional - both server and client can be task executors:

| Direction | Request Type | Task Executor | Demo Tool |
|-----------|--------------|---------------|-----------|
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
| Direction | Request Type | Task Executor | Demo Tool |
| ---------------- | ------------------------ | ------------- | ----------------------------------- |
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |

For client-side tasks:

1. Server sends request with task metadata (e.g., `params.task.ttl`)
2. Client creates task and returns `CreateTaskResult` with `taskId`
3. Server polls `tasks/get` for status updates
Expand Down
5 changes: 4 additions & 1 deletion src/everything/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const createServer: () => ServerFactoryResponse = () => {
const taskStore = new InMemoryTaskStore();
const taskMessageQueue = new InMemoryTaskMessageQueue();

let initializeTimeout: NodeJS.Timeout | null = null;

// Create the server
const server = new McpServer(
{
Expand Down Expand Up @@ -98,7 +100,7 @@ export const createServer: () => ServerFactoryResponse = () => {
// This is delayed until after the `notifications/initialized` handler finishes,
// otherwise, the request gets lost.
const sessionId = server.server.transport?.sessionId;
setTimeout(() => syncRoots(server, sessionId), 350);
initializeTimeout = setTimeout(() => syncRoots(server, sessionId), 350);
};

// Return the ServerFactoryResponse
Expand All @@ -110,6 +112,7 @@ export const createServer: () => ServerFactoryResponse = () => {
stopSimulatedResourceUpdates(sessionId);
// Clean up task store timers
taskStore.cleanup();
if (initializeTimeout) clearTimeout(initializeTimeout);
},
} satisfies ServerFactoryResponse;
};
13 changes: 4 additions & 9 deletions src/everything/server/roots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,10 @@ export const syncRoots = async (server: McpServer, sessionId?: string) => {
);
}
} catch (error) {
await server.sendLoggingMessage(
{
level: "error",
logger: "everything-server",
data: `Failed to request roots from client: ${
error instanceof Error ? error.message : String(error)
}`,
},
sessionId
console.error(
`Failed to request roots from client ${sessionId}: ${
error instanceof Error ? error.message : String(error)
}`
);
}
};
Expand Down
27 changes: 18 additions & 9 deletions src/everything/tools/simulate-research-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ async function runResearchProcess(
interpretation: {
type: "string",
title: "Clarification",
description: "Which interpretation of the topic do you mean?",
description:
"Which interpretation of the topic do you mean?",
oneOf: getInterpretationsForTopic(state.topic),
},
},
Expand Down Expand Up @@ -187,18 +188,28 @@ This tool demonstrates MCP's task-based execution pattern for long-running opera
**Task Lifecycle Demonstrated:**
1. \`tools/call\` with \`task\` parameter → Server returns \`CreateTaskResult\` (not the final result)
2. Client polls \`tasks/get\` → Server returns current status and \`statusMessage\`
3. Status progressed: \`working\` → ${state.clarification ? `\`input_required\` → \`working\` → ` : ""}\`completed\`
3. Status progressed: \`working\` → ${
state.clarification ? `\`input_required\` → \`working\` → ` : ""
}\`completed\`
4. Client calls \`tasks/result\` → Server returns this final result

${state.clarification ? `**Elicitation Flow:**
${
state.clarification
? `**Elicitation Flow:**
When the query was ambiguous, the server sent an \`elicitation/create\` request
to the client. The task status changed to \`input_required\` while awaiting user input.
${state.clarification.includes("unavailable on HTTP") ? `
${
state.clarification.includes("unavailable on HTTP")
? `
**Note:** Elicitation was skipped because this server is running over HTTP transport.
The current SDK's \`sendRequest\` only works over STDIO. Full HTTP elicitation support
requires SDK PR #1210's streaming \`elicitInputStream\` API.
` : `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`}
` : ""}
`
: `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`
}
`
: ""
}
**Key Concepts:**
- Tasks enable "call now, fetch later" patterns
- \`statusMessage\` provides human-readable progress updates
Expand Down Expand Up @@ -288,9 +299,7 @@ export const registerSimulateResearchQueryTool = (server: McpServer) => {
* Returns the current status of the research task.
*/
getTask: async (args, extra): Promise<GetTaskResult> => {
const task = await extra.taskStore.getTask(extra.taskId);
// The SDK's RequestTaskStore.getTask throws if not found, so task is always defined
return task;
return await extra.taskStore.getTask(extra.taskId);
},

/**
Expand Down
64 changes: 45 additions & 19 deletions src/everything/tools/trigger-elicitation-request-async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ const MAX_POLL_ATTEMPTS = 600;
*
* @param {McpServer} server - The McpServer instance where the tool will be registered.
*/
export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) => {
export const registerTriggerElicitationRequestAsyncTool = (
server: McpServer
) => {
// Check client capabilities
const clientCapabilities = server.server.getClientCapabilities() || {};

// Client must support elicitation AND tasks.requests.elicitation
const clientSupportsElicitation = clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { elicitation?: { create?: object } };
} | undefined;
const clientSupportsElicitation =
clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { elicitation?: { create?: object } };
}
| undefined;
const clientSupportsAsyncElicitation =
clientTasksCapability?.requests?.elicitation?.create !== undefined;

Expand All @@ -56,7 +61,8 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
task: {
ttl: 600000, // 10 minutes (user input may take a while)
},
message: "Please provide inputs for the following fields (async task demo):",
message:
"Please provide inputs for the following fields (async task demo):",
requestedSchema: {
type: "object" as const,
properties: {
Expand Down Expand Up @@ -107,14 +113,18 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
);

// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in elicitResponse && elicitResponse.task;
const isTaskResult = "task" in elicitResponse && elicitResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(elicitResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
elicitResponse,
null,
2
)}`,
},
],
};
Expand Down Expand Up @@ -145,19 +155,27 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);

taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;

// Only log status changes or every 10 polls to avoid spam
if (attempts === 1 || attempts % 10 === 0 || taskStatus !== "input_required") {
if (
attempts === 1 ||
attempts % 10 === 0 ||
taskStatus !== "input_required"
) {
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}
}
Expand All @@ -168,7 +186,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
Expand All @@ -180,7 +200,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
Expand All @@ -207,8 +229,10 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
const userData = result.content as Record<string, unknown>;
const lines = [];
if (userData.name) lines.push(`- Name: ${userData.name}`);
if (userData.favoriteColor) lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined) lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
if (userData.favoriteColor)
lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined)
lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);

content.push({
type: "text",
Expand All @@ -229,7 +253,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
// Include progress and raw result for debugging
content.push({
type: "text",
text: `\nProgress:\n${statusMessages.join("\n")}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
text: `\nProgress:\n${statusMessages.join(
"\n"
)}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
});

return { content };
Expand Down
5 changes: 4 additions & 1 deletion src/everything/tools/trigger-elicitation-request.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { ElicitResultSchema, CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import {
ElicitResultSchema,
CallToolResult,
} from "@modelcontextprotocol/sdk/types.js";

// Tool configuration
const name = "trigger-elicitation-request";
Expand Down
47 changes: 33 additions & 14 deletions src/everything/tools/trigger-sampling-request-async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {

// Client must support sampling AND tasks.requests.sampling
const clientSupportsSampling = clientCapabilities.sampling !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { sampling?: { createMessage?: object } };
} | undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { sampling?: { createMessage?: object } };
}
| undefined;
const clientSupportsAsyncSampling =
clientTasksCapability?.requests?.sampling?.createMessage !== undefined;

Expand All @@ -64,7 +66,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {

// Create the sampling request WITH task metadata
// The params.task field signals to the client that this should be executed as a task
const request: CreateMessageRequest & { params: { task?: { ttl: number } } } = {
const request: CreateMessageRequest & {
params: { task?: { ttl: number } };
} = {
method: "sampling/createMessage",
params: {
task: {
Expand Down Expand Up @@ -112,14 +116,19 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
);

// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in samplingResponse && samplingResponse.task;
const isTaskResult =
"task" in samplingResponse && samplingResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(samplingResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
samplingResponse,
null,
2
)}`,
},
],
};
Expand Down Expand Up @@ -150,16 +159,20 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);

taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}

Expand All @@ -169,7 +182,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
Expand All @@ -181,7 +196,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
Expand All @@ -201,7 +218,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join("\n")}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join(
"\n"
)}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
},
],
};
Expand Down
8 changes: 6 additions & 2 deletions src/everything/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { StreamableHTTPServerTransport, EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import {
StreamableHTTPServerTransport,
EventStore,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
import cors from "cors";

// Simple in-memory event store for SSE resumability
class InMemoryEventStore implements EventStore {
private events: Map<string, { streamId: string; message: unknown }> = new Map();
private events: Map<string, { streamId: string; message: unknown }> =
new Map();

async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = randomUUID();
Expand Down