|
1 | 1 | """Simple MCP server demonstrating pagination for tools, resources, and prompts. |
2 | 2 |
|
3 | | -This example shows how to use the paginated decorators to handle large lists |
| 3 | +This example shows how to use the on_* handler pattern to handle large lists |
4 | 4 | of items that need to be split across multiple pages. |
5 | 5 | """ |
6 | 6 |
|
|
9 | 9 | import anyio |
10 | 10 | import click |
11 | 11 | from mcp import types |
| 12 | +from mcp.server.context import ServerRequestContext |
12 | 13 | from mcp.server.lowlevel import Server |
13 | 14 | from starlette.requests import Request |
14 | 15 |
|
|
44 | 45 | ] |
45 | 46 |
|
46 | 47 |
|
| 48 | +async def handle_list_tools( |
| 49 | + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None |
| 50 | +) -> types.ListToolsResult: |
| 51 | + """Paginated list_tools - returns 5 tools per page.""" |
| 52 | + page_size = 5 |
| 53 | + |
| 54 | + cursor = params.cursor if params is not None else None |
| 55 | + if cursor is None: |
| 56 | + start_idx = 0 |
| 57 | + else: |
| 58 | + try: |
| 59 | + start_idx = int(cursor) |
| 60 | + except (ValueError, TypeError): |
| 61 | + return types.ListToolsResult(tools=[], next_cursor=None) |
| 62 | + |
| 63 | + page_tools = SAMPLE_TOOLS[start_idx : start_idx + page_size] |
| 64 | + |
| 65 | + next_cursor = None |
| 66 | + if start_idx + page_size < len(SAMPLE_TOOLS): |
| 67 | + next_cursor = str(start_idx + page_size) |
| 68 | + |
| 69 | + return types.ListToolsResult(tools=page_tools, next_cursor=next_cursor) |
| 70 | + |
| 71 | + |
| 72 | +async def handle_list_resources( |
| 73 | + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None |
| 74 | +) -> types.ListResourcesResult: |
| 75 | + """Paginated list_resources - returns 10 resources per page.""" |
| 76 | + page_size = 10 |
| 77 | + |
| 78 | + cursor = params.cursor if params is not None else None |
| 79 | + if cursor is None: |
| 80 | + start_idx = 0 |
| 81 | + else: |
| 82 | + try: |
| 83 | + start_idx = int(cursor) |
| 84 | + except (ValueError, TypeError): |
| 85 | + return types.ListResourcesResult(resources=[], next_cursor=None) |
| 86 | + |
| 87 | + page_resources = SAMPLE_RESOURCES[start_idx : start_idx + page_size] |
| 88 | + |
| 89 | + next_cursor = None |
| 90 | + if start_idx + page_size < len(SAMPLE_RESOURCES): |
| 91 | + next_cursor = str(start_idx + page_size) |
| 92 | + |
| 93 | + return types.ListResourcesResult(resources=page_resources, next_cursor=next_cursor) |
| 94 | + |
| 95 | + |
| 96 | +async def handle_list_prompts( |
| 97 | + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None |
| 98 | +) -> types.ListPromptsResult: |
| 99 | + """Paginated list_prompts - returns 7 prompts per page.""" |
| 100 | + page_size = 7 |
| 101 | + |
| 102 | + cursor = params.cursor if params is not None else None |
| 103 | + if cursor is None: |
| 104 | + start_idx = 0 |
| 105 | + else: |
| 106 | + try: |
| 107 | + start_idx = int(cursor) |
| 108 | + except (ValueError, TypeError): |
| 109 | + return types.ListPromptsResult(prompts=[], next_cursor=None) |
| 110 | + |
| 111 | + page_prompts = SAMPLE_PROMPTS[start_idx : start_idx + page_size] |
| 112 | + |
| 113 | + next_cursor = None |
| 114 | + if start_idx + page_size < len(SAMPLE_PROMPTS): |
| 115 | + next_cursor = str(start_idx + page_size) |
| 116 | + |
| 117 | + return types.ListPromptsResult(prompts=page_prompts, next_cursor=next_cursor) |
| 118 | + |
| 119 | + |
| 120 | +async def handle_call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult: |
| 121 | + """Handle tool calls.""" |
| 122 | + tool = next((t for t in SAMPLE_TOOLS if t.name == params.name), None) |
| 123 | + if not tool: |
| 124 | + raise ValueError(f"Unknown tool: {params.name}") |
| 125 | + |
| 126 | + return types.CallToolResult( |
| 127 | + content=[ |
| 128 | + types.TextContent( |
| 129 | + type="text", |
| 130 | + text=f"Called tool '{params.name}' with arguments: {params.arguments}", |
| 131 | + ) |
| 132 | + ] |
| 133 | + ) |
| 134 | + |
| 135 | + |
| 136 | +async def handle_read_resource( |
| 137 | + ctx: ServerRequestContext[Any], params: types.ReadResourceRequestParams |
| 138 | +) -> types.ReadResourceResult: |
| 139 | + """Handle read_resource requests.""" |
| 140 | + resource = next((r for r in SAMPLE_RESOURCES if r.uri == params.uri), None) |
| 141 | + if not resource: |
| 142 | + raise ValueError(f"Unknown resource: {params.uri}") |
| 143 | + |
| 144 | + return types.ReadResourceResult( |
| 145 | + contents=[ |
| 146 | + types.TextResourceContents( |
| 147 | + uri=params.uri, |
| 148 | + text=f"Content of {resource.name}: This is sample content for the resource.", |
| 149 | + mime_type="text/plain", |
| 150 | + ) |
| 151 | + ] |
| 152 | + ) |
| 153 | + |
| 154 | + |
| 155 | +async def handle_get_prompt( |
| 156 | + ctx: ServerRequestContext[Any], params: types.GetPromptRequestParams |
| 157 | +) -> types.GetPromptResult: |
| 158 | + """Handle get_prompt requests.""" |
| 159 | + prompt = next((p for p in SAMPLE_PROMPTS if p.name == params.name), None) |
| 160 | + if not prompt: |
| 161 | + raise ValueError(f"Unknown prompt: {params.name}") |
| 162 | + |
| 163 | + message_text = f"This is the prompt '{params.name}'" |
| 164 | + if params.arguments: |
| 165 | + message_text += f" with arguments: {params.arguments}" |
| 166 | + |
| 167 | + return types.GetPromptResult( |
| 168 | + description=prompt.description, |
| 169 | + messages=[ |
| 170 | + types.PromptMessage( |
| 171 | + role="user", |
| 172 | + content=types.TextContent(type="text", text=message_text), |
| 173 | + ) |
| 174 | + ], |
| 175 | + ) |
| 176 | + |
| 177 | + |
47 | 178 | @click.command() |
48 | 179 | @click.option("--port", default=8000, help="Port to listen on for SSE") |
49 | 180 | @click.option( |
|
53 | 184 | help="Transport type", |
54 | 185 | ) |
55 | 186 | def main(port: int, transport: str) -> int: |
56 | | - app = Server("mcp-simple-pagination") |
57 | | - |
58 | | - # Paginated list_tools - returns 5 tools per page |
59 | | - @app.list_tools() |
60 | | - async def list_tools_paginated(request: types.ListToolsRequest) -> types.ListToolsResult: |
61 | | - page_size = 5 |
62 | | - |
63 | | - cursor = request.params.cursor if request.params is not None else None |
64 | | - if cursor is None: |
65 | | - # First page |
66 | | - start_idx = 0 |
67 | | - else: |
68 | | - # Parse cursor to get the start index |
69 | | - try: |
70 | | - start_idx = int(cursor) |
71 | | - except (ValueError, TypeError): |
72 | | - # Invalid cursor, return empty |
73 | | - return types.ListToolsResult(tools=[], next_cursor=None) |
74 | | - |
75 | | - # Get the page of tools |
76 | | - page_tools = SAMPLE_TOOLS[start_idx : start_idx + page_size] |
77 | | - |
78 | | - # Determine if there are more pages |
79 | | - next_cursor = None |
80 | | - if start_idx + page_size < len(SAMPLE_TOOLS): |
81 | | - next_cursor = str(start_idx + page_size) |
82 | | - |
83 | | - return types.ListToolsResult(tools=page_tools, next_cursor=next_cursor) |
84 | | - |
85 | | - # Paginated list_resources - returns 10 resources per page |
86 | | - @app.list_resources() |
87 | | - async def list_resources_paginated( |
88 | | - request: types.ListResourcesRequest, |
89 | | - ) -> types.ListResourcesResult: |
90 | | - page_size = 10 |
91 | | - |
92 | | - cursor = request.params.cursor if request.params is not None else None |
93 | | - if cursor is None: |
94 | | - # First page |
95 | | - start_idx = 0 |
96 | | - else: |
97 | | - # Parse cursor to get the start index |
98 | | - try: |
99 | | - start_idx = int(cursor) |
100 | | - except (ValueError, TypeError): |
101 | | - # Invalid cursor, return empty |
102 | | - return types.ListResourcesResult(resources=[], next_cursor=None) |
103 | | - |
104 | | - # Get the page of resources |
105 | | - page_resources = SAMPLE_RESOURCES[start_idx : start_idx + page_size] |
106 | | - |
107 | | - # Determine if there are more pages |
108 | | - next_cursor = None |
109 | | - if start_idx + page_size < len(SAMPLE_RESOURCES): |
110 | | - next_cursor = str(start_idx + page_size) |
111 | | - |
112 | | - return types.ListResourcesResult(resources=page_resources, next_cursor=next_cursor) |
113 | | - |
114 | | - # Paginated list_prompts - returns 7 prompts per page |
115 | | - @app.list_prompts() |
116 | | - async def list_prompts_paginated( |
117 | | - request: types.ListPromptsRequest, |
118 | | - ) -> types.ListPromptsResult: |
119 | | - page_size = 7 |
120 | | - |
121 | | - cursor = request.params.cursor if request.params is not None else None |
122 | | - if cursor is None: |
123 | | - # First page |
124 | | - start_idx = 0 |
125 | | - else: |
126 | | - # Parse cursor to get the start index |
127 | | - try: |
128 | | - start_idx = int(cursor) |
129 | | - except (ValueError, TypeError): |
130 | | - # Invalid cursor, return empty |
131 | | - return types.ListPromptsResult(prompts=[], next_cursor=None) |
132 | | - |
133 | | - # Get the page of prompts |
134 | | - page_prompts = SAMPLE_PROMPTS[start_idx : start_idx + page_size] |
135 | | - |
136 | | - # Determine if there are more pages |
137 | | - next_cursor = None |
138 | | - if start_idx + page_size < len(SAMPLE_PROMPTS): |
139 | | - next_cursor = str(start_idx + page_size) |
140 | | - |
141 | | - return types.ListPromptsResult(prompts=page_prompts, next_cursor=next_cursor) |
142 | | - |
143 | | - # Implement call_tool handler |
144 | | - @app.call_tool() |
145 | | - async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.ContentBlock]: |
146 | | - # Find the tool in our sample data |
147 | | - tool = next((t for t in SAMPLE_TOOLS if t.name == name), None) |
148 | | - if not tool: |
149 | | - raise ValueError(f"Unknown tool: {name}") |
150 | | - |
151 | | - # Simple mock response |
152 | | - return [ |
153 | | - types.TextContent( |
154 | | - type="text", |
155 | | - text=f"Called tool '{name}' with arguments: {arguments}", |
156 | | - ) |
157 | | - ] |
158 | | - |
159 | | - # Implement read_resource handler |
160 | | - @app.read_resource() |
161 | | - async def read_resource(uri: str) -> str: |
162 | | - # Find the resource in our sample data |
163 | | - resource = next((r for r in SAMPLE_RESOURCES if r.uri == uri), None) |
164 | | - if not resource: |
165 | | - raise ValueError(f"Unknown resource: {uri}") |
166 | | - |
167 | | - # Return a simple string - the decorator will convert it to TextResourceContents |
168 | | - return f"Content of {resource.name}: This is sample content for the resource." |
169 | | - |
170 | | - # Implement get_prompt handler |
171 | | - @app.get_prompt() |
172 | | - async def get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult: |
173 | | - # Find the prompt in our sample data |
174 | | - prompt = next((p for p in SAMPLE_PROMPTS if p.name == name), None) |
175 | | - if not prompt: |
176 | | - raise ValueError(f"Unknown prompt: {name}") |
177 | | - |
178 | | - # Simple mock response |
179 | | - message_text = f"This is the prompt '{name}'" |
180 | | - if arguments: |
181 | | - message_text += f" with arguments: {arguments}" |
182 | | - |
183 | | - return types.GetPromptResult( |
184 | | - description=prompt.description, |
185 | | - messages=[ |
186 | | - types.PromptMessage( |
187 | | - role="user", |
188 | | - content=types.TextContent(type="text", text=message_text), |
189 | | - ) |
190 | | - ], |
191 | | - ) |
| 187 | + app = Server( |
| 188 | + "mcp-simple-pagination", |
| 189 | + on_list_tools=handle_list_tools, |
| 190 | + on_call_tool=handle_call_tool, |
| 191 | + on_list_resources=handle_list_resources, |
| 192 | + on_read_resource=handle_read_resource, |
| 193 | + on_list_prompts=handle_list_prompts, |
| 194 | + on_get_prompt=handle_get_prompt, |
| 195 | + ) |
192 | 196 |
|
193 | 197 | if transport == "sse": |
194 | 198 | from mcp.server.sse import SseServerTransport |
|
0 commit comments