Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions python/semantic_kernel/connectors/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from functools import partial
from typing import TYPE_CHECKING, Any

import httpx
from mcp import types
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
Expand Down Expand Up @@ -726,6 +727,27 @@ def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
args.update(self._client_kwargs)
return streamablehttp_client(**args)

async def __aenter__(self) -> Self:
"""Fail fast on authentication/authorization errors before connecting."""
timeout = self.timeout if self.timeout is not None else 30
try:
async with httpx.AsyncClient(timeout=timeout, headers=self.headers) as client:
response = await client.get(self.url)
if response.status_code in (httpx.codes.UNAUTHORIZED, httpx.codes.FORBIDDEN):
raise KernelPluginInvalidConfigurationError(
f"Failed to connect to the MCP server: received HTTP {response.status_code} (unauthorized/forbidden)."
)
# Raise for other HTTP errors to surface configuration/network issues early.
response.raise_for_status()
except KernelPluginInvalidConfigurationError:
raise
except Exception as ex: # pragma: no cover - guarded for unexpected failures
raise KernelPluginInvalidConfigurationError(
"Failed to connect to the MCP server. Please check your configuration."
) from ex

return await super().__aenter__()


class MCPWebsocketPlugin(MCPPluginBase):
"""MCP websocket server configuration."""
Expand Down
57 changes: 57 additions & 0 deletions python/tests/unit/connectors/mcp/test_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,63 @@ async def test_with_kwargs_streamablehttp(mock_session, mock_client, list_tool_c
assert len(loaded_plugin.functions["func2"].parameters) == 0


class _DummyHttpxClient:
def __init__(self, status_code: int):
self.status_code = status_code

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
return False

async def get(self, url: str): # pragma: no cover - trivial
import httpx

return httpx.Response(self.status_code, request=httpx.Request("GET", url))


@pytest.mark.parametrize("status_code", [401, 403])
@patch("semantic_kernel.connectors.mcp.ClientSession")
@patch("semantic_kernel.connectors.mcp.streamablehttp_client")
async def test_streamable_http_raises_on_401_403(mock_streamable_client, mock_session, monkeypatch, status_code):
# Avoid real network: preflight should catch unauthorized/forbidden before streamable client is used.
monkeypatch.setattr("semantic_kernel.connectors.mcp.httpx.AsyncClient", lambda **_: _DummyHttpxClient(status_code))

with pytest.raises(KernelPluginInvalidConfigurationError):
async with MCPStreamableHttpPlugin(
name="TestMCPPlugin",
description="Test MCP Plugin",
url="http://localhost:8080/mcp",
):
pass


@patch("semantic_kernel.connectors.mcp.ClientSession")
@patch("semantic_kernel.connectors.mcp.streamablehttp_client")
async def test_streamable_http_allows_success(mock_streamable_client, mock_session, monkeypatch):
# Simulate 200 OK preflight and a no-op streamable client context manager.
monkeypatch.setattr("semantic_kernel.connectors.mcp.httpx.AsyncClient", lambda **_: _DummyHttpxClient(200))

mock_read = MagicMock()
mock_write = MagicMock()
mock_callback = MagicMock()

mock_generator = MagicMock()
mock_generator.__aenter__.return_value = (mock_read, mock_write, mock_callback)
mock_generator.__aexit__.return_value = (mock_read, mock_write, mock_callback)
mock_streamable_client.return_value = mock_generator
mock_session.return_value.__aenter__.return_value.list_tools.return_value = []
mock_session.return_value.initialize = AsyncMock()

async with MCPStreamableHttpPlugin(
name="TestMCPPlugin",
description="Test MCP Plugin",
url="http://localhost:8080/mcp",
) as plugin:
assert plugin is not None


async def test_kernel_as_mcp_server(kernel: "Kernel", decorated_native_function, custom_plugin_class):
kernel.add_plugin(custom_plugin_class, "test")
kernel.add_functions("test", [decorated_native_function])
Expand Down