Skip to content

Commit bbdd99b

Browse files
committed
Add basic OpenTelemetry tracing for client and server requests
Add opentelemetry-api as an optional dependency (mcp[otel]) and create spans for client request/response cycles and server request handling. Span names follow the pattern: - Client: "MCP {method} {target}" (e.g. "MCP tools/call my_tool") - Server: "MCP handle {method} {target}" When opentelemetry-api is not installed, tracing is a complete no-op with zero overhead.
1 parent e6235d1 commit bbdd99b

File tree

6 files changed

+211
-84
lines changed

6 files changed

+211
-84
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies = [
4646
rich = ["rich>=13.9.4"]
4747
cli = ["typer>=0.16.0", "python-dotenv>=1.0.0"]
4848
ws = ["websockets>=15.0.1"]
49+
otel = ["opentelemetry-api>=1.28.0"]
4950

5051
[project.scripts]
5152
mcp = "mcp.cli:app [cli]"
@@ -57,7 +58,7 @@ required-version = ">=0.9.5"
5758
[dependency-groups]
5859
dev = [
5960
# We add mcp[cli,ws] so `uv sync` considers the extras.
60-
"mcp[cli,ws]",
61+
"mcp[cli,ws,otel]",
6162
"pyright>=1.1.400",
6263
"pytest>=8.3.4",
6364
"ruff>=0.8.5",

src/mcp/server/lowlevel/server.py

Lines changed: 75 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ async def main():
6565
from mcp.server.streamable_http import EventStore
6666
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6767
from mcp.server.transport_security import TransportSecuritySettings
68+
from mcp.shared._otel import otel_span
6869
from mcp.shared._stream_protocols import ReadStream, WriteStream
6970
from mcp.shared.exceptions import MCPError
7071
from mcp.shared.message import ServerMessageMetadata, SessionMessage
@@ -446,72 +447,82 @@ async def _handle_request(
446447
):
447448
logger.info("Processing request of type %s", type(req).__name__)
448449

449-
if handler := self._request_handlers.get(req.method):
450-
logger.debug("Dispatching request of type %s", type(req).__name__)
450+
target = getattr(req.params, "name", None) if req.params else None
451+
span_name = f"MCP handle {req.method} {target}" if target else f"MCP handle {req.method}"
451452

452-
try:
453-
# Extract request context and close_sse_stream from message metadata
454-
request_data = None
455-
close_sse_stream_cb = None
456-
close_standalone_sse_stream_cb = None
457-
if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata):
458-
request_data = message.message_metadata.request_context
459-
close_sse_stream_cb = message.message_metadata.close_sse_stream
460-
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream
453+
with otel_span(
454+
span_name,
455+
kind="SERVER",
456+
attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id},
457+
):
458+
if handler := self._request_handlers.get(req.method):
459+
logger.debug("Dispatching request of type %s", type(req).__name__)
461460

462-
client_capabilities = session.client_params.capabilities if session.client_params else None
463-
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
464-
# Get task metadata from request params if present
465-
task_metadata = None
466-
if hasattr(req, "params") and req.params is not None:
467-
task_metadata = getattr(req.params, "task", None)
468-
ctx = ServerRequestContext(
469-
request_id=message.request_id,
470-
meta=message.request_meta,
471-
session=session,
472-
lifespan_context=lifespan_context,
473-
experimental=Experimental(
474-
task_metadata=task_metadata,
475-
_client_capabilities=client_capabilities,
476-
_session=session,
477-
_task_support=task_support,
478-
),
479-
request=request_data,
480-
close_sse_stream=close_sse_stream_cb,
481-
close_standalone_sse_stream=close_standalone_sse_stream_cb,
482-
)
483-
response = await handler(ctx, req.params)
484-
except MCPError as err:
485-
response = err.error
486-
except anyio.get_cancelled_exc_class():
487-
if message.cancelled:
488-
# Client sent CancelledNotification; responder.cancel() already
489-
# sent an error response, so skip the duplicate.
490-
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
491-
return
492-
# Transport-close cancellation from the TG in run(); re-raise so the
493-
# TG swallows its own cancellation.
494-
raise
495-
except Exception as err:
496-
if raise_exceptions: # pragma: no cover
497-
raise err
498-
response = types.ErrorData(code=0, message=str(err))
499-
else: # pragma: no cover
500-
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
501-
502-
try:
503-
await message.respond(response)
504-
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
505-
# Transport closed between handler unblocking and respond. Happens
506-
# when _receive_loop's finally wakes a handler blocked on
507-
# send_request: the handler runs to respond() before run()'s TG
508-
# cancel fires, but after the write stream closed. Closed if our
509-
# end closed (_receive_loop's async-with exit); Broken if the peer
510-
# end closed first (streamable_http terminate()).
511-
logger.debug("Response for %s dropped - transport closed", message.request_id)
512-
return
513-
514-
logger.debug("Response sent")
461+
try:
462+
# Extract request context and close_sse_stream from message metadata
463+
request_data = None
464+
close_sse_stream_cb = None
465+
close_standalone_sse_stream_cb = None
466+
if message.message_metadata is not None and isinstance(
467+
message.message_metadata, ServerMessageMetadata
468+
):
469+
request_data = message.message_metadata.request_context
470+
close_sse_stream_cb = message.message_metadata.close_sse_stream
471+
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream
472+
473+
client_capabilities = session.client_params.capabilities if session.client_params else None
474+
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
475+
# Get task metadata from request params if present
476+
task_metadata = None
477+
if hasattr(req, "params") and req.params is not None:
478+
task_metadata = getattr(req.params, "task", None)
479+
ctx = ServerRequestContext(
480+
request_id=message.request_id,
481+
meta=message.request_meta,
482+
session=session,
483+
lifespan_context=lifespan_context,
484+
experimental=Experimental(
485+
task_metadata=task_metadata,
486+
_client_capabilities=client_capabilities,
487+
_session=session,
488+
_task_support=task_support,
489+
),
490+
request=request_data,
491+
close_sse_stream=close_sse_stream_cb,
492+
close_standalone_sse_stream=close_standalone_sse_stream_cb,
493+
)
494+
response = await handler(ctx, req.params)
495+
except MCPError as err:
496+
response = err.error
497+
except anyio.get_cancelled_exc_class():
498+
if message.cancelled:
499+
# Client sent CancelledNotification; responder.cancel() already
500+
# sent an error response, so skip the duplicate.
501+
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
502+
return
503+
# Transport-close cancellation from the TG in run(); re-raise so the
504+
# TG swallows its own cancellation.
505+
raise
506+
except Exception as err:
507+
if raise_exceptions: # pragma: no cover
508+
raise err
509+
response = types.ErrorData(code=0, message=str(err))
510+
else: # pragma: no cover
511+
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
512+
513+
try:
514+
await message.respond(response)
515+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
516+
# Transport closed between handler unblocking and respond. Happens
517+
# when _receive_loop's finally wakes a handler blocked on
518+
# send_request: the handler runs to respond() before run()'s TG
519+
# cancel fires, but after the write stream closed. Closed if our
520+
# end closed (_receive_loop's async-with exit); Broken if the peer
521+
# end closed first (streamable_http terminate()).
522+
logger.debug("Response for %s dropped - transport closed", message.request_id)
523+
return
524+
525+
logger.debug("Response sent")
515526

516527
async def _handle_notification(
517528
self,

src/mcp/shared/_otel.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""OpenTelemetry helpers for MCP.
2+
3+
Provides a context manager that creates an OpenTelemetry span when
4+
``opentelemetry-api`` is installed, or acts as a no-op otherwise.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import functools
10+
from collections.abc import Iterator
11+
from contextlib import contextmanager
12+
from typing import Any
13+
14+
15+
@functools.lru_cache(maxsize=1)
16+
def _get_tracer() -> Any:
17+
"""Return the OTel tracer for ``mcp``, or ``None``."""
18+
try:
19+
from opentelemetry.trace import get_tracer
20+
21+
return get_tracer("mcp-python-sdk")
22+
except ImportError:
23+
return None
24+
25+
26+
@contextmanager
27+
def otel_span(
28+
name: str,
29+
*,
30+
kind: str = "INTERNAL",
31+
attributes: dict[str, Any] | None = None,
32+
) -> Iterator[Any]:
33+
"""Create an OTel span if ``opentelemetry-api`` is installed, else no-op."""
34+
tracer = _get_tracer()
35+
if tracer is None:
36+
yield None
37+
return
38+
39+
from opentelemetry.trace import SpanKind
40+
41+
span_kind = getattr(SpanKind, kind, SpanKind.INTERNAL)
42+
with tracer.start_as_current_span(name, kind=span_kind, attributes=attributes) as span:
43+
yield span

src/mcp/shared/session.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from pydantic import BaseModel, TypeAdapter
1313
from typing_extensions import Self
1414

15+
from mcp.shared._otel import otel_span
1516
from mcp.shared._stream_protocols import ReadStream, WriteStream
1617
from mcp.shared.exceptions import MCPError
1718
from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage
@@ -269,23 +270,32 @@ async def send_request(
269270

270271
try:
271272
jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
272-
await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))
273273

274-
# request read timeout takes precedence over session read timeout
275-
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
276-
277-
try:
278-
with anyio.fail_after(timeout):
279-
response_or_error = await response_stream_reader.receive()
280-
except TimeoutError:
281-
class_name = request.__class__.__name__
282-
message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
283-
raise MCPError(code=REQUEST_TIMEOUT, message=message)
284-
285-
if isinstance(response_or_error, JSONRPCError):
286-
raise MCPError.from_jsonrpc_error(response_or_error)
287-
else:
288-
return result_type.model_validate(response_or_error.result, by_name=False)
274+
target = request_data.get("params", {}).get("name")
275+
span_name = f"MCP {request.method} {target}" if target else f"MCP {request.method}"
276+
277+
with otel_span(
278+
span_name,
279+
kind="CLIENT",
280+
attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id},
281+
):
282+
await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))
283+
284+
# request read timeout takes precedence over session read timeout
285+
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
286+
287+
try:
288+
with anyio.fail_after(timeout):
289+
response_or_error = await response_stream_reader.receive()
290+
except TimeoutError:
291+
class_name = request.__class__.__name__
292+
message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
293+
raise MCPError(code=REQUEST_TIMEOUT, message=message)
294+
295+
if isinstance(response_or_error, JSONRPCError):
296+
raise MCPError.from_jsonrpc_error(response_or_error)
297+
else:
298+
return result_type.model_validate(response_or_error.result, by_name=False)
289299

290300
finally:
291301
self._response_streams.pop(request_id, None)

tests/shared/test_otel.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import annotations
2+
3+
from unittest.mock import patch
4+
5+
import pytest
6+
7+
from mcp.shared._otel import _get_tracer, otel_span
8+
9+
pytestmark = pytest.mark.anyio
10+
11+
12+
def test_otel_span_creates_span():
13+
_get_tracer.cache_clear()
14+
with otel_span("test.span", kind="CLIENT", attributes={"key": "value"}) as span:
15+
assert span is not None
16+
17+
18+
def test_otel_span_noop_when_unavailable():
19+
_get_tracer.cache_clear()
20+
with patch.dict("sys.modules", {"opentelemetry": None, "opentelemetry.trace": None}):
21+
_get_tracer.cache_clear()
22+
with otel_span("test.span") as span:
23+
assert span is None
24+
_get_tracer.cache_clear()

uv.lock

Lines changed: 41 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)