Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/praisonai/praisonai/_async_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import asyncio
import os
import threading
from concurrent.futures import Future
import concurrent.futures
from concurrent.futures import CancelledError as FutureCancelledError, Future
from typing import Awaitable, TypeVar

T = TypeVar("T")
Expand Down Expand Up @@ -112,9 +113,30 @@ def run_sync(coro: Awaitable[T], *, timeout: float | None = _DEFAULT_TIMEOUT) ->
with _BG._lock:
loop = _BG.get_unlocked()
fut: Future = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result(timeout=timeout)

try:
return fut.result(timeout=timeout)
except (TimeoutError, concurrent.futures.TimeoutError):
# Propagate cancellation into the background loop so the underlying
# awaitable (DB query, HTTP call, subprocess wait) actually unwinds.
fut.cancel()
try:
# Give cancellation a short grace period to release resources.
fut.exception(timeout=1.0)
except (
TimeoutError,
concurrent.futures.TimeoutError,
asyncio.CancelledError,
FutureCancelledError,
):
pass
raise
Comment on lines +117 to +133
except BaseException:
# Ctrl-C / GeneratorExit / SystemExit must also cancel the bg task.
fut.cancel()
raise


def shutdown() -> None:
"""Public hook for long-running server processes to stop the bridge cleanly."""
_BG.shutdown()
_BG.shutdown()
74 changes: 72 additions & 2 deletions src/praisonai/praisonai/agents_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,73 @@ def replace_var(match):
return re.sub(pattern, replace_var, template)


def _wrap_with_timeout(tool, timeout_seconds: float):
"""Enforce per-call timeout on a tool, sync or async, without
leaking the worker thread/task on timeout.
"""
Comment on lines +95 to +97
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring is slightly misleading. For synchronous tools running in a ThreadPoolExecutor, fut.cancel() is a best-effort operation and cannot terminate a running thread. The thread will continue to run until the tool function completes, consuming resources. It's better to clarify this limitation in the docstring to set correct expectations.

Suggested change
"""Enforce per-call timeout on a tool, sync or async, without
leaking the worker thread/task on timeout.
"""
"""Enforce per-call timeout on a tool, sync or async. For sync tools,
cancellation on timeout is best-effort as the thread cannot be killed.
"""

if timeout_seconds is None or timeout_seconds <= 0 or not callable(tool):
return tool

import asyncio
import functools
import inspect
Comment on lines +101 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to PEP 8, imports should be at the top of the file. Moving these imports to the top level improves code organization, readability, and avoids the minor performance overhead of re-importing on each function call.

Please add import asyncio, import concurrent.futures, and import functools to the import section at the top of the file and remove these lines. Note that inspect is already imported at the top.

import json

if inspect.iscoroutinefunction(tool):
@functools.wraps(tool)
async def _async_wrapped(*args, **kwargs):
try:
return await asyncio.wait_for(tool(*args, **kwargs), timeout=timeout_seconds)
except asyncio.TimeoutError:
return json.dumps({
"error": "tool_timeout",
"tool": getattr(tool, "__name__", repr(tool)),
"timeout_seconds": timeout_seconds,
})
return _async_wrapped

@functools.wraps(tool)
def _sync_wrapped(*args, **kwargs):
import queue
import threading

result_queue: queue.Queue[tuple[bool, object]] = queue.Queue(maxsize=1)

def _runner():
try:
result_queue.put((True, tool(*args, **kwargs)))
except BaseException as exc:
result_queue.put((False, exc))

# Daemon thread avoids blocking process shutdown if the tool call hangs.
worker = threading.Thread(target=_runner, daemon=True)
worker.start()
worker.join(timeout_seconds)

if worker.is_alive():
return json.dumps({
"error": "tool_timeout",
"tool": getattr(tool, "__name__", repr(tool)),
"timeout_seconds": timeout_seconds,
})

try:
success, payload = result_queue.get_nowait()
except queue.Empty:
# Defensive fallback: if the worker exits without publishing a result,
# avoid blocking indefinitely and surface an execution failure.
return json.dumps({
"error": "tool_execution_error",
"tool": getattr(tool, "__name__", repr(tool)),
"detail": "worker_exited_without_result",
})

if success:
return payload
raise payload
return _sync_wrapped
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.


def noop(*args, **kwargs):
pass

Expand Down Expand Up @@ -1169,11 +1236,10 @@ def _run_praisonai(self, config, topic, tools_dict):
if acp_enabled or lsp_enabled:
runtime_started = False
try:
import asyncio
from praisonai.cli.features.interactive_runtime import InteractiveRuntime, RuntimeConfig
from praisonai.cli.features.agent_tools import create_agent_centric_tools
from ._async_bridge import run_sync

# Use scoped event loop instead of process-global mutations
runtime_config = RuntimeConfig(
workspace=os.getcwd(),
acp_enabled=acp_enabled,
Expand Down Expand Up @@ -1338,6 +1404,10 @@ def _run_praisonai(self, config, topic, tools_dict):
details.get('cli_backend'), self.logger
)

# Apply timeout wrapper at the wrapper boundary for defense-in-depth
if agent_tool_timeout:
agent_tools = [_wrap_with_timeout(t, agent_tool_timeout) for t in agent_tools]

agent = PraisonAgent(
name=role_filled,
role=role_filled,
Expand Down
18 changes: 18 additions & 0 deletions src/praisonai/tests/unit/test_async_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import asyncio
import threading
import unittest

from praisonai._async_bridge import _BG, run_sync
Expand Down Expand Up @@ -68,6 +69,23 @@ async def nested_call() -> bool:

self.assertTrue(run_sync(nested_call()))

def test_timeout_cancels_coroutine_and_runs_finally(self):
cleanup_done = threading.Event()

async def never_finishes() -> None:
try:
await asyncio.Event().wait()
finally:
cleanup_done.set()

with self.assertRaises(TimeoutError):
run_sync(never_finishes(), timeout=0.01)

self.assertTrue(
cleanup_done.wait(timeout=2.0),
"timed out coroutine should be cancelled and run its cleanup",
)


class TestBackgroundThread(unittest.TestCase):
def test_thread_is_daemon_and_alive(self):
Expand Down
Loading