Skip to content

Commit 525440c

Browse files
committed
Fix proxy transport semantics
1 parent ae20d9e commit 525440c

File tree

5 files changed

+292
-32
lines changed

5 files changed

+292
-32
lines changed

docs/proxying.md

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Proxying MCP Transports
2+
3+
The `mcp_proxy()` helper bridges two MCP transports and forwards messages in both directions.
4+
5+
It is useful when you want to put a transport boundary between an MCP client and an upstream MCP server without
6+
rewriting the forwarding loop yourself.
7+
8+
## What It Does
9+
10+
`mcp_proxy()` takes two transport pairs:
11+
12+
- a transport facing the downstream client
13+
- a transport facing the upstream server
14+
15+
While the context manager is active, it:
16+
17+
- forwards `SessionMessage` objects from client to server
18+
- forwards `SessionMessage` objects from server to client
19+
- sends transport exceptions to an optional `on_error` callback
20+
- closes the paired write side when the corresponding read side stops
21+
22+
## What It Does Not Do
23+
24+
`mcp_proxy()` is a transport relay, not a full proxy server.
25+
26+
It does not add:
27+
28+
- authentication
29+
- authorization
30+
- request or response rewriting
31+
- routing across multiple upstream servers
32+
- retries or buffering policies
33+
- metrics or tracing by default
34+
35+
If you need those behaviors, build them around the helper.
36+
37+
## Weather Service Example
38+
39+
This example proxies a small weather service. The upstream service is defined with `MCPServer` and exposed over
40+
streamable HTTP. The proxy bridges a downstream transport to that upstream transport.
41+
42+
- `get_weather(city)` for a structured weather snapshot
43+
- `get_weather_alerts(region)` for active alerts
44+
45+
The client talks only to the downstream side of the proxy.
46+
47+
```python
48+
import anyio
49+
import uvicorn
50+
51+
from mcp.client.session import ClientSession
52+
from mcp.client.streamable_http import streamable_http_client
53+
from mcp.proxy import mcp_proxy
54+
from mcp.server.mcpserver import MCPServer
55+
from mcp.shared.memory import create_client_server_memory_streams
56+
57+
58+
app = MCPServer("Weather Service")
59+
60+
61+
@app.tool()
62+
def get_weather(city: str) -> dict[str, str | float]:
63+
return {
64+
"city": city,
65+
"temperature_c": 22.5,
66+
"condition": "partly cloudy",
67+
"wind_speed_kmh": 12.3,
68+
}
69+
70+
71+
@app.tool()
72+
def get_weather_alerts(region: str) -> dict[str, object]:
73+
return {
74+
"region": region,
75+
"alerts": [{"severity": "medium", "title": "Heat advisory"}],
76+
}
77+
78+
79+
async def main() -> None:
80+
starlette_app = app.streamable_http_app(streamable_http_path="/mcp")
81+
config = uvicorn.Config(starlette_app, host="127.0.0.1", port=8765, log_level="warning")
82+
upstream_server = uvicorn.Server(config)
83+
84+
async with (
85+
create_client_server_memory_streams() as (client_streams, proxy_client_streams),
86+
streamable_http_client("http://127.0.0.1:8765/mcp") as proxy_server_streams,
87+
anyio.create_task_group() as tg,
88+
):
89+
tg.start_soon(upstream_server.serve)
90+
91+
async with mcp_proxy(
92+
proxy_client_streams,
93+
proxy_server_streams,
94+
):
95+
async with ClientSession(client_streams[0], client_streams[1]) as session:
96+
await session.initialize()
97+
weather = await session.call_tool("get_weather", {"city": "London"})
98+
alerts = await session.call_tool("get_weather_alerts", {"region": "California"})
99+
100+
print(weather.content[0].text)
101+
print(alerts.content[0].text)
102+
103+
upstream_server.should_exit = True
104+
tg.cancel_scope.cancel()
105+
106+
107+
anyio.run(main)
108+
```
109+
110+
## Error Handling
111+
112+
Use `on_error` to observe transport-level exceptions:
113+
114+
```python
115+
async with mcp_proxy(
116+
downstream_transport,
117+
upstream_transport,
118+
on_error=handle_transport_error,
119+
):
120+
...
121+
```
122+
123+
`on_error` is keyword-only. It may be either:
124+
125+
- an async callable
126+
- a sync callable, which will run in a worker thread
127+
128+
Exceptions raised by `on_error` are swallowed. Transport exceptions still terminate the proxy instead of being silently
129+
consumed.
130+
131+
## When To Use It
132+
133+
`mcp_proxy()` is a good fit when you are:
134+
135+
- exposing an upstream MCP server through a different transport boundary
136+
- inserting middleware-like behavior between two MCP transports
137+
- building a local relay for testing or development
138+
- experimenting with transport adapters
139+
140+
If all you need is to test a server directly, prefer [`Client`](testing.md), which already provides an in-memory
141+
transport for that use case.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ nav:
1717
- Documentation:
1818
- Concepts: concepts.md
1919
- Low-Level Server: low-level-server.md
20+
- Proxying Transports: proxying.md
2021
- Authorization: authorization.md
2122
- Testing: testing.md
2223
- Experimental:

src/mcp/proxy.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
from __future__ import annotations
44

5+
import contextvars
56
from collections.abc import AsyncGenerator, Awaitable, Callable
67
from contextlib import asynccontextmanager
78
from functools import partial
8-
from typing import cast
9+
from typing import Any, Protocol, cast
910

1011
import anyio
1112
from anyio import to_thread
@@ -18,6 +19,10 @@
1819
ErrorHandler = Callable[[Exception], None | Awaitable[None]]
1920

2021

22+
class ContextualWriteStream(Protocol):
23+
async def send_with_context(self, context: contextvars.Context, item: SessionMessage | Exception) -> None: ...
24+
25+
2126
@asynccontextmanager
2227
async def mcp_proxy(
2328
transport_to_client: MessageStream,
@@ -49,16 +54,38 @@ async def _forward_messages(
4954
async for item in read_stream:
5055
if isinstance(item, Exception):
5156
await _run_error_handler(item, on_error)
52-
continue
57+
raise item
5358

5459
try:
55-
await write_stream.send(item)
60+
await _forward_message(item, write_stream, read_stream)
5661
except anyio.ClosedResourceError:
5762
break
5863
except anyio.ClosedResourceError:
5964
return
6065

6166

67+
async def _forward_message(
68+
item: SessionMessage,
69+
write_stream: WriteStream[SessionMessage],
70+
read_stream: ReadStream[SessionMessage | Exception],
71+
) -> None:
72+
sender_context: contextvars.Context | None = getattr(read_stream, "last_context", None)
73+
context_write_stream = cast(ContextualWriteStream | None, _get_contextual_write_stream(write_stream))
74+
75+
if sender_context is not None and context_write_stream is not None:
76+
await context_write_stream.send_with_context(sender_context, item)
77+
return
78+
79+
await write_stream.send(item)
80+
81+
82+
def _get_contextual_write_stream(write_stream: WriteStream[SessionMessage]) -> Any:
83+
send_with_context = getattr(write_stream, "send_with_context", None)
84+
if callable(send_with_context):
85+
return write_stream
86+
return None
87+
88+
6289
async def _run_error_handler(error: Exception, on_error: ErrorHandler | None) -> None:
6390
if on_error is None:
6491
return

src/mcp/shared/_context_streams.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def __init__(self, inner: MemoryObjectSendStream[_Envelope[T]]) -> None:
3636
async def send(self, item: T) -> None:
3737
await self._inner.send((contextvars.copy_context(), item))
3838

39+
async def send_with_context(self, context: contextvars.Context, item: T) -> None:
40+
await self._inner.send((context, item))
41+
3942
def close(self) -> None:
4043
self._inner.close()
4144

0 commit comments

Comments
 (0)