Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9232cca
feat: add experimental task-based session manager
challenger71498 Dec 19, 2025
fbbbb2f
fix: check exception only if done
challenger71498 Dec 19, 2025
b59cff4
test: add test for experimental MCP session manager
challenger71498 Dec 19, 2025
a3a6ddb
test: add test for session lifecycle
challenger71498 Dec 19, 2025
b814686
chore: tidy up error msg
challenger71498 Dec 26, 2025
fd022fb
chore: rm redundant comments
challenger71498 Dec 26, 2025
addda67
chore: rename SessionLifecycle to SessionContext
challenger71498 Dec 26, 2025
ada136a
feat: use SessionContext
challenger71498 Dec 26, 2025
4b79453
test: rm transports
challenger71498 Dec 26, 2025
379c7c3
chore: rm experimental
challenger71498 Dec 26, 2025
af1f471
chore: match error msg
challenger71498 Dec 26, 2025
e7bed5a
test: assert that initialized has been called
challenger71498 Dec 26, 2025
6e40bdb
test: validate Session on test_session_context.py
challenger71498 Dec 26, 2025
eace2b7
style: run autoformat.sh
challenger71498 Dec 26, 2025
1e45086
test: add case for task-safety
challenger71498 Dec 26, 2025
ba9b9d8
chore: update desc
challenger71498 Dec 26, 2025
db03a2e
test: execute create_task each
challenger71498 Dec 26, 2025
8b09154
chore: add missing s
challenger71498 Dec 26, 2025
0081acb
feat: add timeout on session close
challenger71498 Dec 26, 2025
9f23396
feat: set MCP client timeout to SSE read timeout
challenger71498 Dec 26, 2025
b0f69d7
style: run autoformat.sh
challenger71498 Dec 26, 2025
ecdda62
chore: rm unused import
challenger71498 Dec 26, 2025
64672cd
refactor: rm local var
challenger71498 Dec 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions src/google/adk/tools/mcp_tool/mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from mcp.client.streamable_http import streamablehttp_client
from pydantic import BaseModel

from .session_context import SessionContext

logger = logging.getLogger('google_adk.' + __name__)


Expand Down Expand Up @@ -334,29 +336,27 @@ async def create_session(
if hasattr(self._connection_params, 'timeout')
else None
)
sse_read_timeout_in_seconds = (
self._connection_params.sse_read_timeout
if hasattr(self._connection_params, 'sse_read_timeout')
else None
)

try:
client = self._create_client(merged_headers)

transports = await asyncio.wait_for(
exit_stack.enter_async_context(client),
is_stdio = isinstance(self._connection_params, StdioConnectionParams)

session = await asyncio.wait_for(
exit_stack.enter_async_context(
SessionContext(
client=client,
timeout=timeout_in_seconds,
sse_read_timeout=sse_read_timeout_in_seconds,
is_stdio=is_stdio,
)
),
timeout=timeout_in_seconds,
)
# The streamable http client returns a GetSessionCallback in addition to the
# read/write MemoryObjectStreams needed to build the ClientSession, we limit
# then to the two first values to be compatible with all clients.
if isinstance(self._connection_params, StdioConnectionParams):
session = await exit_stack.enter_async_context(
ClientSession(
*transports[:2],
read_timeout_seconds=timedelta(seconds=timeout_in_seconds),
)
)
else:
session = await exit_stack.enter_async_context(
ClientSession(*transports[:2])
)
await asyncio.wait_for(session.initialize(), timeout=timeout_in_seconds)

# Store session and exit stack in the pool
self._sessions[session_key] = (session, exit_stack)
Expand Down
168 changes: 168 additions & 0 deletions src/google/adk/tools/mcp_tool/session_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
from contextlib import AsyncExitStack
from datetime import timedelta
import logging
from typing import AsyncContextManager
from typing import Optional

from mcp import ClientSession

logger = logging.getLogger('google_adk.' + __name__)


class SessionContext:
"""Represents the context of a single MCP session within a dedicated task.

AnyIO's TaskGroup/CancelScope requires that the start and end of a scope
occur within the same task. Since MCP clients use AnyIO internally, we need
to ensure that the client's entire lifecycle (creation, usage, and cleanup)
happens within a single dedicated task.

This class spawns a background task that:
1. Enters the MCP client's async context and initializes the session
2. Signals readiness via an asyncio.Event
3. Waits for a close signal
4. Cleans up the client within the same task

This ensures CancelScope constraints are satisfied regardless of which
task calls start() or close().

Can be used in two ways:
1. Direct method calls: start() and close()
2. As an async context manager: async with lifecycle as session: ...
"""

def __init__(
self,
client: AsyncContextManager,
timeout: Optional[float],
sse_read_timeout: Optional[float],
is_stdio: bool = False,
):
"""
Args:
client: An MCP client context manager (e.g., from streamablehttp_client,
sse_client, or stdio_client).
timeout: Timeout in seconds for connection and initialization.
sse_read_timeout: Timeout in seconds for reading data from the MCP SSE
server.
is_stdio: Whether this is a stdio connection (affects read timeout).
"""
self._client = client
self._timeout = timeout
self._sse_read_timeout = sse_read_timeout
self._is_stdio = is_stdio
self._session: Optional[ClientSession] = None
self._ready_event = asyncio.Event()
self._close_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None

@property
def session(self) -> Optional[ClientSession]:
"""Get the managed ClientSession, if available."""
return self._session

async def start(self) -> ClientSession:
"""Start the runner and wait for the session to be ready.

Returns:
The initialized ClientSession.

Raises:
ConnectionError: If session creation fails.
"""
self._task = asyncio.create_task(self._run())
await self._ready_event.wait()

if self._task.cancelled():
raise ConnectionError('Failed to create MCP session: task cancelled')

if self._task.done() and self._task.exception():
raise ConnectionError(
f'Failed to create MCP session: {self._task.exception()}'
) from self._task.exception()

return self._session

async def close(self):
"""Signal the context task to close and wait for cleanup."""
self._close_event.set()
if self._task:
try:
await asyncio.wait_for(self._task, timeout=self._timeout)
except asyncio.TimeoutError:
logger.warning('Failed to close MCP session: task timed out')
self._task.cancel()
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning(f'Failed to close MCP session: {e}')

async def __aenter__(self) -> ClientSession:
return await self.start()

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

async def _run(self):
"""Run the complete session context within a single task."""
try:
async with AsyncExitStack() as exit_stack:
transports = await asyncio.wait_for(
exit_stack.enter_async_context(self._client),
timeout=self._timeout,
)
# The streamable http client returns a GetSessionCallback in addition
# to the read/write MemoryObjectStreams needed to build the
# ClientSession. We limit to the first two values to be compatible
# with all clients.
if self._is_stdio:
session = await exit_stack.enter_async_context(
ClientSession(
*transports[:2],
read_timeout_seconds=timedelta(seconds=self._timeout)
if self._timeout is not None
else None,
)
)
else:
# For SSE and Streamable HTTP clients, use the sse_read_timeout
# instead of the connection timeout as the read_timeout for the session.
session = await exit_stack.enter_async_context(
ClientSession(
*transports[:2],
read_timeout_seconds=timedelta(seconds=self._sse_read_timeout)
if self._sse_read_timeout is not None
else None,
)
)
await asyncio.wait_for(session.initialize(), timeout=self._timeout)
logger.debug('Session has been successfully initialized')

self._session = session
self._ready_event.set()

# Wait for close signal - the session remains valid while we wait
await self._close_event.wait()
except BaseException as e:
logger.warning(f'Error on session runner task: {e}')
raise
finally:
self._ready_event.set()
self._close_event.set()
Loading