-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Python: Add observability for MCP tool according to latest OTel 1.40.0 conventions #4698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4ba1554
3afedd1
25e34a0
2fcbd9c
d3191ff
1306e5b
5a2bd27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |||||
| import logging | ||||||
| import re | ||||||
| import sys | ||||||
| import typing | ||||||
| from abc import abstractmethod | ||||||
| from collections.abc import Callable, Collection, Sequence | ||||||
| from contextlib import AsyncExitStack, _AsyncGeneratorContextManager # type: ignore | ||||||
|
|
@@ -25,14 +26,15 @@ | |||||
| from mcp.shared.context import RequestContext | ||||||
| from mcp.shared.exceptions import McpError | ||||||
| from mcp.shared.session import RequestResponder | ||||||
| from opentelemetry import propagate | ||||||
| from opentelemetry import propagate, trace | ||||||
|
|
||||||
| from ._tools import FunctionTool | ||||||
| from ._types import ( | ||||||
| Content, | ||||||
| Message, | ||||||
| ) | ||||||
| from .exceptions import ToolException, ToolExecutionException | ||||||
| from .observability import OtelAttr, get_mcp_call_span | ||||||
|
|
||||||
| if sys.version_info >= (3, 11): | ||||||
| from typing import Self # pragma: no cover | ||||||
|
|
@@ -61,6 +63,11 @@ class MCPSpecificApproval(TypedDict, total=False): | |||||
| _MCP_REMOTE_NAME_KEY = "_mcp_remote_name" | ||||||
| _MCP_NORMALIZED_NAME_KEY = "_mcp_normalized_name" | ||||||
|
|
||||||
| # Derive the JSON-RPC protocol version used by the MCP library from its type annotations. | ||||||
| # mcp.types.JSONRPCRequest defines `jsonrpc: Literal["2.0"]`; extracting it here ensures | ||||||
| # we always emit the version the library actually uses rather than a hardcoded magic string. | ||||||
| _JSONRPC_PROTOCOL_VERSION: str = typing.get_args(types.JSONRPCRequest.model_fields["jsonrpc"].annotation)[0] | ||||||
|
|
||||||
| # region: Helpers | ||||||
|
|
||||||
| LOG_LEVEL_MAPPING: dict[types.LoggingLevel, int] = { | ||||||
|
|
@@ -487,6 +494,7 @@ def __init__( | |||||
| self.is_connected: bool = False | ||||||
| self._tools_loaded: bool = False | ||||||
| self._prompts_loaded: bool = False | ||||||
| self._mcp_protocol_version: str | int | None = None | ||||||
|
|
||||||
| def __str__(self) -> str: | ||||||
| return f"MCPTool(name={self.name}, description={self.description})" | ||||||
|
|
@@ -590,7 +598,8 @@ async def connect(self, *, reset: bool = False) -> None: | |||||
| inner_exception=ex, | ||||||
| ) from ex | ||||||
| try: | ||||||
| await session.initialize() | ||||||
| init_result = await session.initialize() | ||||||
| self._mcp_protocol_version = init_result.protocolVersion | ||||||
| except Exception as ex: | ||||||
| await self._safe_close_exit_stack() | ||||||
| # Provide context about initialization failure | ||||||
|
|
@@ -605,7 +614,8 @@ async def connect(self, *, reset: bool = False) -> None: | |||||
| self.session = session | ||||||
| elif self.session._request_id == 0: # type: ignore[reportPrivateUsage] | ||||||
| # If the session is not initialized, we need to reinitialize it | ||||||
| await self.session.initialize() | ||||||
| init_result = await self.session.initialize() | ||||||
| self._mcp_protocol_version = init_result.protocolVersion | ||||||
| logger.debug("Connected to MCP server: %s", self.session) | ||||||
| self.is_connected = True | ||||||
| if self.load_tools_flag: | ||||||
|
|
@@ -927,50 +937,88 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]: | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| # Inject OpenTelemetry trace context into MCP _meta for distributed tracing. | ||||||
| otel_meta = _inject_otel_into_mcp_meta() | ||||||
|
|
||||||
| parser = self.parse_tool_results or _parse_tool_result_from_mcp | ||||||
|
|
||||||
| # Try the operation, reconnecting once if the connection is closed | ||||||
| for attempt in range(2): | ||||||
| try: | ||||||
| result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore | ||||||
| if result.isError: | ||||||
| parsed = parser(result) | ||||||
| text = ( | ||||||
| "\n".join(c.text for c in parsed if c.type == "text" and c.text) | ||||||
| if isinstance(parsed, list) | ||||||
| else str(parsed) | ||||||
| ) | ||||||
| raise ToolExecutionException(text or str(parsed)) | ||||||
| return parser(result) | ||||||
| except ToolExecutionException: | ||||||
| raise | ||||||
| except ClosedResourceError as cl_ex: | ||||||
| if attempt == 0: | ||||||
| # First attempt failed, try reconnecting | ||||||
| logger.info("MCP connection closed unexpectedly. Reconnecting...") | ||||||
| try: | ||||||
| await self.connect(reset=True) | ||||||
| continue # Retry the operation | ||||||
| except Exception as reconn_ex: | ||||||
| span_attributes: dict[str, Any] = { | ||||||
| OtelAttr.MCP_METHOD_NAME: "tools/call", | ||||||
| OtelAttr.TOOL_NAME: tool_name, | ||||||
| OtelAttr.OPERATION: OtelAttr.TOOL_EXECUTION_OPERATION, | ||||||
| OtelAttr.JSONRPC_PROTOCOL_VERSION: _JSONRPC_PROTOCOL_VERSION, | ||||||
| } | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Truthiness check will silently skip emitting
Suggested change
|
||||||
| if self._mcp_protocol_version: | ||||||
| span_attributes[OtelAttr.MCP_PROTOCOL_VERSION] = self._mcp_protocol_version | ||||||
|
|
||||||
| with get_mcp_call_span(span_attributes) as span: | ||||||
| # Try the operation, reconnecting once if the connection is closed | ||||||
| span_error_set = False | ||||||
| for attempt in range(2): | ||||||
| try: | ||||||
| # Capture the JSON-RPC request ID before the call is made. | ||||||
| # The MCP SDK stores the next request ID in the private `_request_id` | ||||||
| # attribute; no public API is available. We use getattr with a default | ||||||
| # so this degrades gracefully if the attribute is renamed in a future | ||||||
| # version of the library. | ||||||
| request_id = getattr(self.session, "_request_id", None) # type: ignore[union-attr] | ||||||
| if request_id is not None: | ||||||
| span.set_attribute(OtelAttr.JSONRPC_REQUEST_ID, str(request_id)) | ||||||
|
|
||||||
| # Inject OpenTelemetry trace context into MCP _meta for distributed tracing. | ||||||
| otel_meta = _inject_otel_into_mcp_meta() | ||||||
|
|
||||||
| result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore | ||||||
| if result.isError: | ||||||
| parsed = parser(result) | ||||||
| text = ( | ||||||
| "\n".join(c.text for c in parsed if c.type == "text" and c.text) | ||||||
| if isinstance(parsed, list) | ||||||
| else str(parsed) | ||||||
| ) | ||||||
| error_msg = text or str(parsed) | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, "ToolError") | ||||||
| span.set_status(trace.StatusCode.ERROR, error_msg) | ||||||
| span_error_set = True | ||||||
| raise ToolExecutionException(error_msg) | ||||||
| return parser(result) | ||||||
| except ToolExecutionException as ex: | ||||||
| if not span_error_set: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(ex)) | ||||||
| raise | ||||||
| except ClosedResourceError as cl_ex: | ||||||
| if attempt == 0: | ||||||
| # First attempt failed, try reconnecting | ||||||
| logger.info("MCP connection closed unexpectedly. Reconnecting...") | ||||||
| try: | ||||||
| await self.connect(reset=True) | ||||||
| continue # Retry the operation | ||||||
| except Exception as reconn_ex: | ||||||
| error_type = type(reconn_ex).__name__ | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, error_type) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(reconn_ex)) | ||||||
| raise ToolExecutionException( | ||||||
| "Failed to reconnect to MCP server.", | ||||||
| inner_exception=reconn_ex, | ||||||
| ) from reconn_ex | ||||||
| else: | ||||||
| # Second attempt also failed, give up | ||||||
| logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}") | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(cl_ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(cl_ex)) | ||||||
| raise ToolExecutionException( | ||||||
| "Failed to reconnect to MCP server.", | ||||||
| inner_exception=reconn_ex, | ||||||
| ) from reconn_ex | ||||||
| else: | ||||||
| # Second attempt also failed, give up | ||||||
| logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}") | ||||||
| raise ToolExecutionException( | ||||||
| f"Failed to call tool '{tool_name}' - connection lost.", | ||||||
| inner_exception=cl_ex, | ||||||
| ) from cl_ex | ||||||
| except McpError as mcp_exc: | ||||||
| raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc | ||||||
| except Exception as ex: | ||||||
| raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex | ||||||
| raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.") | ||||||
| f"Failed to call tool '{tool_name}' - connection lost.", | ||||||
| inner_exception=cl_ex, | ||||||
| ) from cl_ex | ||||||
| except McpError as mcp_exc: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, str(mcp_exc.error.code)) | ||||||
| span.set_status(trace.StatusCode.ERROR, mcp_exc.error.message) | ||||||
| raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc | ||||||
| except Exception as ex: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(ex)) | ||||||
| raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, "RetryExhausted") | ||||||
| span.set_status(trace.StatusCode.ERROR, f"Failed to call tool '{tool_name}' after retries.") | ||||||
| raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.") | ||||||
|
|
||||||
| async def get_prompt(self, prompt_name: str, **kwargs: Any) -> str: | ||||||
| """Call a prompt with the given arguments. | ||||||
|
|
@@ -995,35 +1043,60 @@ async def get_prompt(self, prompt_name: str, **kwargs: Any) -> str: | |||||
|
|
||||||
| parser = self.parse_prompt_results or _parse_prompt_result_from_mcp | ||||||
|
|
||||||
| # Try the operation, reconnecting once if the connection is closed | ||||||
| for attempt in range(2): | ||||||
| try: | ||||||
| prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore | ||||||
| return parser(prompt_result) | ||||||
| except ClosedResourceError as cl_ex: | ||||||
| if attempt == 0: | ||||||
| # First attempt failed, try reconnecting | ||||||
| logger.info("MCP connection closed unexpectedly. Reconnecting...") | ||||||
| try: | ||||||
| await self.connect(reset=True) | ||||||
| continue # Retry the operation | ||||||
| except Exception as reconn_ex: | ||||||
| span_attributes: dict[str, Any] = { | ||||||
| OtelAttr.MCP_METHOD_NAME: "prompts/get", | ||||||
| OtelAttr.PROMPT_NAME: prompt_name, | ||||||
| OtelAttr.JSONRPC_PROTOCOL_VERSION: _JSONRPC_PROTOCOL_VERSION, | ||||||
| } | ||||||
| if self._mcp_protocol_version: | ||||||
| span_attributes[OtelAttr.MCP_PROTOCOL_VERSION] = self._mcp_protocol_version | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same truthiness issue as in
Suggested change
|
||||||
|
|
||||||
| with get_mcp_call_span(span_attributes) as span: | ||||||
| # Try the operation, reconnecting once if the connection is closed | ||||||
| for attempt in range(2): | ||||||
| try: | ||||||
| # Capture the JSON-RPC request ID before the call is made. | ||||||
| # See call_tool for rationale on using getattr with a default. | ||||||
| request_id = getattr(self.session, "_request_id", None) # type: ignore[union-attr] | ||||||
| if request_id is not None: | ||||||
| span.set_attribute(OtelAttr.JSONRPC_REQUEST_ID, str(request_id)) | ||||||
|
|
||||||
| prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore | ||||||
| return parser(prompt_result) | ||||||
| except ClosedResourceError as cl_ex: | ||||||
| if attempt == 0: | ||||||
| # First attempt failed, try reconnecting | ||||||
| logger.info("MCP connection closed unexpectedly. Reconnecting...") | ||||||
| try: | ||||||
| await self.connect(reset=True) | ||||||
| continue # Retry the operation | ||||||
| except Exception as reconn_ex: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(reconn_ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(reconn_ex)) | ||||||
| raise ToolExecutionException( | ||||||
| "Failed to reconnect to MCP server.", | ||||||
| inner_exception=reconn_ex, | ||||||
| ) from reconn_ex | ||||||
| else: | ||||||
| # Second attempt also failed, give up | ||||||
| logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}") | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(cl_ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(cl_ex)) | ||||||
| raise ToolExecutionException( | ||||||
| "Failed to reconnect to MCP server.", | ||||||
| inner_exception=reconn_ex, | ||||||
| ) from reconn_ex | ||||||
| else: | ||||||
| # Second attempt also failed, give up | ||||||
| logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}") | ||||||
| raise ToolExecutionException( | ||||||
| f"Failed to call prompt '{prompt_name}' - connection lost.", | ||||||
| inner_exception=cl_ex, | ||||||
| ) from cl_ex | ||||||
| except McpError as mcp_exc: | ||||||
| raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc | ||||||
| except Exception as ex: | ||||||
| raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex | ||||||
| raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.") | ||||||
| f"Failed to call prompt '{prompt_name}' - connection lost.", | ||||||
| inner_exception=cl_ex, | ||||||
| ) from cl_ex | ||||||
| except McpError as mcp_exc: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, str(mcp_exc.error.code)) | ||||||
| span.set_status(trace.StatusCode.ERROR, mcp_exc.error.message) | ||||||
| raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc | ||||||
| except Exception as ex: | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, type(ex).__name__) | ||||||
| span.set_status(trace.StatusCode.ERROR, str(ex)) | ||||||
| raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex | ||||||
| span.set_attribute(OtelAttr.ERROR_TYPE, "RetryExhausted") | ||||||
| span.set_status(trace.StatusCode.ERROR, f"Failed to get prompt '{prompt_name}' after retries.") | ||||||
| raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.") | ||||||
|
|
||||||
| async def __aenter__(self) -> Self: | ||||||
| """Enter the async context manager. | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_JSONRPC_PROTOCOL_VERSIONis derived via reflection on internal Pydanticmodel_fieldsmetadata andtyping.get_args(...)[0]. If the MCP library changesJSONRPCRequest'sjsonrpcfield annotation away fromLiteral["2.0"](or renames the field), this raisesIndexErrororKeyErrorat import time. JSON-RPC 2.0 is a stable protocol constant—prefer hardcoding"2.0"directly, or at minimum add a try/except fallback.