|
| 1 | +# Notes on using MCP |
| 2 | + |
| 3 | + |
| 4 | +## MCP |
| 5 | + |
| 6 | +See https://github.com/modelcontextprotocol/python-sdk |
| 7 | + |
| 8 | + |
| 9 | +Overall approach: |
| 10 | + |
| 11 | +``` |
| 12 | +import os |
| 13 | +from litellm import experimental_mcp_client |
| 14 | +from litellm import completion |
| 15 | +from mcp import ClientSession, StdioServerParameters |
| 16 | +from mcp.client.http import http_client |
| 17 | +from mcp.client.stdio import stdio_client |
| 18 | +
|
| 19 | +sparms = StdioServerParameters(command="npx", args=["@wonderwhy-er/desktop-commander@latest"]) |
| 20 | +
|
| 21 | +async with stdio_client(sparms) as (read, write): |
| 22 | + async with ClientSession(read, write) as session: |
| 23 | + await session.initialize() |
| 24 | + mcp_tools = await experimental_mcp_client.load_mcp_tools(session=session, format="openai") |
| 25 | + # messages = ... |
| 26 | + # response = await acompletion(model=, messages=, tools=mcp_tools, tool_choice="auto") |
| 27 | + # message = response.choices[0].message |
| 28 | + # if message.tool_calls: |
| 29 | + # for tool_call in message.tool_calls: |
| 30 | + # function_name = tool_call.function.name |
| 31 | + # function_args = json.loads(tool_call.function.arguments) |
| 32 | + # tool_result = await experimental_mcp_client.call_openai_tool(session=session, tool_call=tool_call) |
| 33 | + # messages.append(dict(tool_call_id=tool_call.id, role="tool", content=str(tool_result.content))) |
| 34 | + # final_response = await acompletion(model=, messages=messages, tools=mcp_tools) |
| 35 | +
|
| 36 | +``` |
| 37 | + |
| 38 | + |
| 39 | +In order to avoid moving all code into async functions, refactor all MCP async processing into a class: |
| 40 | + |
| 41 | +``` |
| 42 | +# mcp_handler.py |
| 43 | +import asyncio |
| 44 | +import threading |
| 45 | +import concurrent.futures # For Future.result() timeout |
| 46 | +
|
| 47 | +from typing import Dict, Any, List, Tuple, Set |
| 48 | +
|
| 49 | +from mcp import ClientSession, StdioServerParameters |
| 50 | +from mcp.client.stdio import stdio_client |
| 51 | +from mcp.client.http import http_client |
| 52 | +from litellm import experimental_mcp_client |
| 53 | +
|
| 54 | +class McpToolHandler: |
| 55 | + def __init__(self, server_configs: Dict[str, Dict[str, Any]]): |
| 56 | + self._server_configs = server_configs |
| 57 | + self._mcp_sessions: Dict[str, ClientSession] = {} |
| 58 | + self._mcp_tool_name_to_session_id: Dict[str, str] = {} |
| 59 | + self._session_contexts: List[Any] = [] # To store context managers for cleanup |
| 60 | + self._all_mcp_tools: List[Dict[str, Any]] = [] # Stored loaded tools for LLM |
| 61 | +
|
| 62 | + # Threading for the asyncio event loop |
| 63 | + self._loop: asyncio.AbstractEventLoop = None |
| 64 | + self._loop_thread: threading.Thread = None |
| 65 | + self._is_initialized = False |
| 66 | +
|
| 67 | + def _start_loop_thread(self): |
| 68 | + """Starts the asyncio event loop in a new thread.""" |
| 69 | + self._loop = asyncio.new_event_loop() |
| 70 | + self._loop_thread = threading.Thread(target=self._run_loop, args=(self._loop,), daemon=True) |
| 71 | + self._loop_thread.start() |
| 72 | + print("MCP asyncio event loop started in a background thread.") |
| 73 | +
|
| 74 | + def _run_loop(self, loop: asyncio.AbstractEventLoop): |
| 75 | + """Target function for the asyncio thread.""" |
| 76 | + asyncio.set_event_loop(loop) |
| 77 | + loop.run_forever() |
| 78 | +
|
| 79 | + def _run_async_method_sync(self, coro): |
| 80 | + """Helper to run an async coroutine from a synchronous context.""" |
| 81 | + if not self._is_initialized or not self._loop.is_running(): |
| 82 | + raise RuntimeError("McpToolHandler is not initialized or loop is not running.") |
| 83 | + |
| 84 | + # Submit the coroutine to the event loop in the other thread |
| 85 | + future = asyncio.run_coroutine_threadsafe(coro, self._loop) |
| 86 | + |
| 87 | + # Block the current thread until the coroutine in the other thread completes |
| 88 | + # Add a timeout to prevent indefinite hangs |
| 89 | + try: |
| 90 | + return future.result(timeout=60) # Increased timeout for potential long-running MCP tools |
| 91 | + except concurrent.futures.TimeoutError: |
| 92 | + print(f"Error: MCP tool execution timed out after 60 seconds for coroutine {coro}") |
| 93 | + future.cancel() # Attempt to cancel the task in the other thread |
| 94 | + raise |
| 95 | + except Exception as e: |
| 96 | + print(f"Error running async method sync: {e}") |
| 97 | + raise |
| 98 | +
|
| 99 | +
|
| 100 | + def initialize(self) -> List[Dict[str, Any]]: |
| 101 | + """ |
| 102 | + Synchronously initializes all MCP client sessions and loads their tools. |
| 103 | + Returns a list of all loaded MCP tool schemas in OpenAI format. |
| 104 | + """ |
| 105 | + if self._is_initialized: |
| 106 | + print("McpToolHandler already initialized.") |
| 107 | + return self._all_mcp_tools |
| 108 | +
|
| 109 | + print("Initializing MCP Tool Handler (synchronously)...") |
| 110 | + self._start_loop_thread() # Start the background async loop |
| 111 | +
|
| 112 | + # Submit the actual async initialization logic to the new thread |
| 113 | + # This part still needs to be an async coroutine, but it's run via _run_async_method_sync |
| 114 | + async def _async_init_logic(): |
| 115 | + for server_id, config in self._server_configs.items(): |
| 116 | + session = None |
| 117 | + context_manager = None |
| 118 | +
|
| 119 | + if config["type"] == "stdio": |
| 120 | + server_params = StdioServerParameters(command=config["command"], args=config["args"]) |
| 121 | + context_manager = stdio_client(server_params) |
| 122 | + read, write = await context_manager.__aenter__() |
| 123 | + session = ClientSession(read, write) |
| 124 | + elif config["type"] == "http": |
| 125 | + context_manager = http_client(config["url"]) |
| 126 | + session = await context_manager.__aenter__() |
| 127 | + else: |
| 128 | + print(f"Warning: Unknown MCP server type for {server_id}: {config['type']}") |
| 129 | + continue |
| 130 | +
|
| 131 | + self._session_contexts.append(context_manager) |
| 132 | + self._mcp_sessions[server_id] = session |
| 133 | + await session.initialize() |
| 134 | + print(f"Initialized MCP session for {server_id}") |
| 135 | +
|
| 136 | + server_tools = await experimental_mcp_client.load_mcp_tools( |
| 137 | + session=session, |
| 138 | + format="openai" |
| 139 | + ) |
| 140 | + print(f"Loaded {len(server_tools)} tools from {server_id}") |
| 141 | +
|
| 142 | + for tool in server_tools: |
| 143 | + tool_name = tool['function']['name'] |
| 144 | + if tool_name in self._mcp_tool_name_to_session_id: |
| 145 | + raise ValueError(f"Duplicate tool name found across MCP servers: '{tool_name}'. Please ensure all MCP tool names are unique.") |
| 146 | + self._mcp_tool_name_to_session_id[tool_name] = server_id |
| 147 | + self._all_mcp_tools.append(tool) |
| 148 | + |
| 149 | + print(f"MCP Tool Handler initialized with {len(self._all_mcp_tools)} tools.") |
| 150 | + return self._all_mcp_tools |
| 151 | + |
| 152 | + # Run the async initialization logic in the background loop, waiting for its completion |
| 153 | + self._all_mcp_tools = self._run_async_method_sync(_async_init_logic()) |
| 154 | + self._is_initialized = True |
| 155 | + return self._all_mcp_tools |
| 156 | +
|
| 157 | + def execute_mcp_tool(self, tool_call: Any) -> str: |
| 158 | + """ |
| 159 | + Synchronously executes an MCP tool call by submitting it to the |
| 160 | + background asyncio loop. |
| 161 | + """ |
| 162 | + function_name = tool_call.function.name |
| 163 | + session_id = self._mcp_tool_name_to_session_id.get(function_name) |
| 164 | +
|
| 165 | + if not session_id: |
| 166 | + return f"Error: MCP tool '{function_name}' not found or not associated with any session." |
| 167 | +
|
| 168 | + target_session = self._mcp_sessions[session_id] |
| 169 | +
|
| 170 | + async def _async_execute_tool(): |
| 171 | + print(f" -> Executing MCP tool '{function_name}' via session '{session_id}' (async background)") |
| 172 | + try: |
| 173 | + tool_result = await experimental_mcp_client.call_openai_tool( |
| 174 | + session=target_session, |
| 175 | + tool_call=tool_call |
| 176 | + ) |
| 177 | + return str(tool_result.content) |
| 178 | + except Exception as e: |
| 179 | + return f"Error executing MCP tool '{function_name}' from '{session_id}': {e}" |
| 180 | + |
| 181 | + return self._run_async_method_sync(_async_execute_tool()) |
| 182 | +
|
| 183 | +
|
| 184 | + def get_mcp_tool_names(self) -> Set[str]: |
| 185 | + """Returns a set of all MCP tool names managed by this handler.""" |
| 186 | + return set(self._mcp_tool_name_to_session_id.keys()) |
| 187 | +
|
| 188 | + def close(self): |
| 189 | + """Synchronously closes all active MCP client sessions and stops the background loop.""" |
| 190 | + print("Closing MCP Tool Handler (synchronously)...") |
| 191 | + if not self._is_initialized: |
| 192 | + print("McpToolHandler not initialized, nothing to close.") |
| 193 | + return |
| 194 | +
|
| 195 | + # Submit the async cleanup logic to the background loop |
| 196 | + async def _async_cleanup_logic(): |
| 197 | + for context_manager in self._session_contexts: |
| 198 | + await context_manager.__aexit__(None, None, None) |
| 199 | + self._mcp_sessions.clear() |
| 200 | + self._mcp_tool_name_to_session_id.clear() |
| 201 | + self._session_contexts.clear() |
| 202 | + print("All MCP sessions closed in background loop.") |
| 203 | + |
| 204 | + # Run cleanup, blocking until it's done |
| 205 | + self._run_async_method_sync(_async_cleanup_logic()) |
| 206 | +
|
| 207 | + # Stop the background event loop |
| 208 | + if self._loop and self._loop.is_running(): |
| 209 | + self._loop.call_soon_threadsafe(self._loop.stop) |
| 210 | + self._loop_thread.join(timeout=5) # Wait for thread to finish |
| 211 | + if self._loop_thread.is_alive(): |
| 212 | + print("Warning: Background asyncio thread did not stop gracefully.") |
| 213 | + |
| 214 | + self._is_initialized = False |
| 215 | + print("MCP Tool Handler closed.") |
| 216 | +``` |
| 217 | + |
| 218 | +and then use it like this: |
| 219 | + |
| 220 | +``` |
| 221 | +MCP_SERVER_CONFIGS = { |
| 222 | + "Server1": dict(type="stdio", command="npx", args=["@wonderwhy-er/desktop-commander@latest"]), |
| 223 | + "Server2": dict(type="http", url="http://localhost:8801"), |
| 224 | +} |
| 225 | +mcp_handler = McpToolHandler(MCP_SERVER_CONFIGS) |
| 226 | +mcp_tools_for_llm = mcp_handler.initialize() |
| 227 | +mcp_tool_names = mcp_handler.get_mcp_tool_names() |
| 228 | +# TODO: make sure the local tools do not have names identical to any of the MCP tools |
| 229 | +all_tools = mcp_tools_for_llm + python_tools |
| 230 | +
|
| 231 | +# ... create messages, invoke llm |
| 232 | +# .. then when going through any tool calls: |
| 233 | +if tool_call.function.name in mcp_tool_names: |
| 234 | + tool_result = mcp_handler.execute_mcp_tool(tool_call) |
| 235 | +else: |
| 236 | + # run local tool |
| 237 | +# extend message etc. |
| 238 | +``` |
| 239 | + |
| 240 | + |
| 241 | + |
| 242 | +## Package mcp_use |
| 243 | + |
| 244 | +See https://github.com/mcp-use/mcp-use |
| 245 | + |
| 246 | +## FastMCP |
| 247 | + |
| 248 | +See https://github.com/jlowin/fastmcp |
| 249 | + |
| 250 | +E.g. using the desktop commander: |
| 251 | + |
| 252 | +``` |
| 253 | +config = dict(mcpServers=dict(dc=dict(command="npx", args=["@wonderwhy-er/desktop-commander@latest"]))) |
| 254 | +
|
| 255 | +async def test(): |
| 256 | + # Connect via stdio to a local script |
| 257 | + async with Client(config) as client: |
| 258 | + tools = await client.list_tools() |
| 259 | + result = await client.call_tool("get_config") |
| 260 | + return tools, result |
| 261 | +``` |
| 262 | + |
| 263 | +The var `tools` contains a list of Tool instances with attrs name, title, description a.o. |
| 264 | + |
| 265 | + |
0 commit comments