Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions src/praisonai-agents/praisonaiagents/mcp/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MCP:
```
"""

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, **kwargs):
def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring the constructor to reduce complexity.

The constructor has become quite complex with multiple responsibilities. Consider extracting transport detection and client initialization into separate methods:

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
    # Handle backward compatibility and basic setup
    if command_or_string is None and command is not None:
        command_or_string = command
    
    self.timeout = timeout
    self.debug = debug
    self._setup_logging(debug)
    
    # Initialize based on input type
    if self._is_http_url(command_or_string):
        self._initialize_http_client(command_or_string, transport, debug, timeout)
    else:
        self._initialize_stdio_client(command_or_string, args, timeout, **kwargs)

def _is_http_url(self, command_or_string):
    return isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string)

def _initialize_http_client(self, url, transport, debug, timeout):
    # HTTP client initialization logic
    pass

def _initialize_stdio_client(self, command_or_string, args, timeout, **kwargs):
    # Stdio client initialization logic  
    pass
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 143-143: Too many arguments (7/5)

(R0913)


[refactor] 143-143: Too many local variables (16/15)

(R0914)


[refactor] 143-143: Too many branches (22/12)

(R0912)


[refactor] 143-143: Too many statements (83/50)

(R0915)

🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/mcp/mcp.py at line 143, the constructor
is too complex with multiple responsibilities. Refactor by extracting transport
detection and client initialization into separate methods: create a method
_is_http_url to check if the input is an HTTP URL, then split the initialization
logic into _initialize_http_client and _initialize_stdio_client methods. Update
the constructor to handle backward compatibility, set basic attributes, and call
these new methods accordingly to simplify and clarify the constructor's flow.

"""
Initialize the MCP connection and get tools.

Expand All @@ -150,10 +150,13 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
- A complete command string (e.g., "/path/to/python /path/to/app.py")
- For NPX: 'npx' command with args for smithery tools
- An SSE URL (e.g., "http://localhost:8080/sse")
- An HTTP URL (e.g., "http://localhost:8080/stream")
args: Arguments to pass to the command (when command_or_string is the command)
command: Alternative parameter name for backward compatibility
timeout: Timeout in seconds for MCP server initialization and tool calls (default: 60)
debug: Enable debug logging for MCP operations (default: False)
transport: Transport type - "auto", "sse", "http-streaming", or "stdio"
"auto" will detect based on URL format (default: "auto")
**kwargs: Additional parameters for StdioServerParameters
"""
# Handle backward compatibility with named parameter 'command'
Expand Down Expand Up @@ -187,15 +190,35 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
self.timeout = timeout
self.debug = debug

# Check if this is an SSE URL
# Check if this is an HTTP URL
if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string):
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.sse_client.tools)
self.is_sse = True
self.is_npx = False
return
# Determine transport type
if transport == "auto":
# Default to SSE for /sse endpoints, HTTP-streaming otherwise
if command_or_string.endswith('/sse'):
transport = "sse"
else:
transport = "http-streaming"

if transport == "sse":
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.http_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = True # Keep for backward compatibility
self.is_npx = False
return
if transport == "http-streaming":
# Import the HTTP-Streaming client implementation
from .mcp_http_streaming import HTTPStreamingMCPClient
self.http_client = HTTPStreamingMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = False
self.is_npx = False
return
raise ValueError(f"Unknown transport type: {transport}")

# Handle the single string format for stdio client
if isinstance(command_or_string, str) and args is None:
Expand Down Expand Up @@ -273,8 +296,8 @@ def _generate_tool_functions(self) -> List[Callable]:
Returns:
List[Callable]: Functions that can be used as tools
"""
if self.is_sse:
return list(self.sse_client.tools)
if self.is_http:
return list(self.http_client.tools)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing is_http attribute causes AttributeError for stdio clients

High Severity

The is_http attribute is only set when using HTTP/SSE transport (lines 208, 217) but is accessed in _generate_tool_functions() and to_openai_tool() for all client types. For stdio-based MCP clients (command-line servers), is_http is never initialized, causing an AttributeError when these methods try to check self.is_http. This breaks backward compatibility for existing stdio MCP usage.

Additional Locations (1)

Fix in Cursor Fix in Web


tool_functions = []

Expand Down Expand Up @@ -445,9 +468,9 @@ def to_openai_tool(self):
Returns:
dict or list: OpenAI-compatible tool definition(s)
"""
if self.is_sse and hasattr(self, 'sse_client') and self.sse_client.tools:
# Return all tools from SSE client
return self.sse_client.to_openai_tools()
if self.is_http and hasattr(self, 'http_client') and self.http_client.tools:
# Return all tools from HTTP client (SSE or HTTP-Streaming)
return self.http_client.to_openai_tools()

# For simplicity, we'll convert the first tool only if multiple exist
# More complex implementations could handle multiple tools
Expand Down Expand Up @@ -485,4 +508,6 @@ def to_openai_tool(self):
def __del__(self):
"""Clean up resources when the object is garbage collected."""
if hasattr(self, 'runner'):
self.runner.shutdown()
self.runner.shutdown()
if hasattr(self, 'http_client') and hasattr(self.http_client, 'shutdown'):
self.http_client.shutdown()
251 changes: 251 additions & 0 deletions src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""
HTTP-Streaming client implementation for MCP (Model Context Protocol).
Provides HTTP chunked streaming transport as an alternative to SSE.
"""

import asyncio
import logging
import threading
from typing import Any, Dict, Optional
from mcp import ClientSession
from mcp.client.session import Transport

logger = logging.getLogger(__name__)


class HTTPStreamingTransport(Transport):
"""HTTP chunked streaming transport for MCP."""

def __init__(self, url: str, headers: Optional[Dict[str, str]] = None):
self.url = url
self.headers = headers or {}
self._closed = False
self._message_queue = asyncio.Queue()
self._initialized = False

async def start(self) -> None:
"""Initialize the transport."""
# Minimal implementation: mark as initialized
self._initialized = True

async def close(self) -> None:
"""Close the transport."""
self._closed = True

async def send(self, message: Dict[str, Any]) -> None:
"""Send a message through the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# Minimal implementation: process message locally
# In a real implementation, this would send via HTTP
if message.get("method") == "initialize":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "0.1.0",
"capabilities": {}
}
}
await self._message_queue.put(response)
elif message.get("method") == "tools/list":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": []
}
}
await self._message_queue.put(response)

async def receive(self) -> Dict[str, Any]:
"""Receive a message from the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# Minimal implementation: return queued messages
try:
return await asyncio.wait_for(self._message_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
# Return empty response if no messages
return {"jsonrpc": "2.0", "id": None, "result": {}}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP streaming transport silently drops all tool call messages

High Severity

The HTTPStreamingTransport.send() method only handles initialize and tools/list messages, silently ignoring all other methods including tools/call. When a tool is called, no response is queued, causing receive() to timeout and return a response with id: null which won't match the original request. This makes the HTTP streaming transport completely non-functional for actual tool execution - it initializes successfully but all tool calls will fail or hang.

Additional Locations (1)

Fix in Cursor Fix in Web



class HTTPStreamingMCPTool:
"""Wrapper for MCP tools accessed via HTTP streaming."""

def __init__(self, tool_def: Dict[str, Any], call_func):
self.name = tool_def["name"]
self.description = tool_def.get("description", "")
self.inputSchema = tool_def.get("inputSchema", {})
self._call_func = call_func
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTPStreamingMCPTool missing required function attributes for Agent

High Severity

HTTPStreamingMCPTool does not set __name__, __qualname__, __doc__, or __signature__ attributes, which SSEMCPTool explicitly marks as "Required for Agent to recognize it as a tool". The Agent class uses getattr(tool, '__name__', '') to match tools by function name when executing tool calls. Without __name__ set, the Agent cannot find the tool to execute, causing tool calls to fail silently or return None.

Fix in Cursor Fix in Web


def __call__(self, **kwargs):
"""Synchronous wrapper for calling the tool."""
try:
# Check if there's already a running loop
asyncio.get_running_loop()
# If we're in an async context, we can't use asyncio.run()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self._call_func(self.name, kwargs))
return future.result()
except RuntimeError:
# No running loop, we can use asyncio.run()
return asyncio.run(self._call_func(self.name, kwargs))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP streaming tool calls run on wrong event loop

High Severity

The HTTPStreamingMCPTool.__call__ method uses asyncio.run() to execute tool calls, which creates a new event loop. However, the ClientSession and HTTPStreamingTransport (including its asyncio.Queue) were initialized in a different event loop running in the background thread. Calling self._session.call_tool() from a different event loop will fail because asyncio primitives like Queue are bound to the loop they were created in. The SSE client correctly uses asyncio.run_coroutine_threadsafe() to schedule work on the proper loop.

Fix in Cursor Fix in Web


async def _async_call(self, **kwargs):
"""Async version of tool call."""
return await self._call_func(self.name, kwargs)

def to_openai_tool(self):
"""Convert to OpenAI tool format."""
schema = self.inputSchema.copy()
self._fix_array_schemas(schema)

return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": schema
}
}

def _fix_array_schemas(self, schema):
"""Fix array schemas for OpenAI compatibility."""
if isinstance(schema, dict):
if schema.get("type") == "array" and "items" not in schema:
schema["items"] = {"type": "string"}
for value in schema.values():
if isinstance(value, dict):
self._fix_array_schemas(value)


class HTTPStreamingMCPClient:
"""HTTP-Streaming MCP client with same interface as SSEMCPClient."""

def __init__(self, server_url: str, debug: bool = False, timeout: int = 60):
self.server_url = server_url
self.debug = debug
self.timeout = timeout
self.tools = []
self._client = None
self._session = None
self._transport = None
self._thread = None
self._loop = None

# Initialize in background thread
self._initialize()

def _initialize(self):
"""Initialize the HTTP streaming connection in a background thread."""
init_done = threading.Event()
init_error = None

def _thread_init():
nonlocal init_error
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

async def _async_init():
try:
# Create transport
self._transport = HTTPStreamingTransport(self.server_url)
await self._transport.start()

# Create MCP session with transport's read/write
self._session = ClientSession(
read=self._transport.receive,
write=self._transport.send
)

# Initialize session
await self._session.initialize()

# Store client reference
self._client = self._session

# List available tools using proper method
try:
tools_result = await self._session.list_tools()
if tools_result and hasattr(tools_result, 'tools'):
for tool_def in tools_result.tools:
tool_dict = tool_def.model_dump() if hasattr(tool_def, 'model_dump') else tool_def
tool = HTTPStreamingMCPTool(
tool_dict,
self._call_tool_async
)
self.tools.append(tool)
except Exception:
# If list_tools fails, tools list remains empty
pass

if self.debug:
logger.info(f"HTTP Streaming MCP client initialized with {len(self.tools)} tools")

except Exception as e:
init_error = e
logger.error(f"Failed to initialize HTTP Streaming MCP client: {e}")

try:
self._loop.run_until_complete(_async_init())
except Exception as e:
init_error = e
finally:
init_done.set()

# Keep the loop running only if initialization succeeded
if init_error is None:
self._loop.run_forever()

self._thread = threading.Thread(target=_thread_init, daemon=True)
self._thread.start()

# Wait for initialization
if not init_done.wait(timeout=self.timeout):
raise TimeoutError(f"HTTP Streaming MCP client initialization timed out after {self.timeout} seconds")

# Propagate initialization error if any
if init_error:
raise init_error

async def _call_tool_async(self, tool_name: str, arguments: Dict[str, Any]):
"""Call a tool asynchronously."""
if not self._session:
raise RuntimeError("HTTP Streaming MCP client not initialized")

result = await self._session.call_tool(tool_name, arguments)

# Extract content from result
if hasattr(result, 'content'):
content = result.content
if len(content) == 1 and hasattr(content[0], 'text'):
return content[0].text
return [c.text if hasattr(c, 'text') else str(c) for c in content]
return result

def __iter__(self):
"""Make client iterable to return tools."""
return iter(self.tools)

def to_openai_tools(self):
"""Convert all tools to OpenAI format."""
return [tool.to_openai_tool() for tool in self.tools]

def shutdown(self):
"""Shutdown the client."""
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)

if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
if self._thread.is_alive():
logger.warning("HTTP Streaming MCP client thread did not shut down gracefully")

if self._transport and not self._transport._closed:
# Create a new event loop for cleanup if needed
try:
asyncio.run(self._transport.close())
except Exception as e:
logger.error(f"Error closing transport: {e}")
Loading
Loading