Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ serve = [
"uvloop; sys_platform != 'win32'",
]
pydantic-ai = [
"pydantic-ai-slim[ag-ui]>=1.79,<2",
"pydantic-ai-slim[ag-ui]>=1.87,<2",
]
agno = [
# FIXME: find the lowest version for AG-UI
Expand Down Expand Up @@ -225,6 +225,7 @@ ignore_missing_imports = true
# It is a fundamental feature of ravnar to allow users to define methods on the pluggable
# components as sync or async. Unfortunately, this cannot be expressed through static types.
module = [
"_ravnar.agents",
"_ravnar.authenticators",
]
disable_error_code = [
Expand Down
200 changes: 189 additions & 11 deletions src/_ravnar/agents.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import abc
import dataclasses
import textwrap
import uuid
from collections.abc import AsyncIterator
Expand All @@ -13,6 +14,7 @@

if TYPE_CHECKING:
import agno.agent
import agno.tools
import pydantic_ai

from _ravnar.schema import QuickPrompt
Expand Down Expand Up @@ -115,7 +117,16 @@ async def run(self, input: ag_ui.core.RunAgentInput) -> AsyncIterator[ag_ui.core
yield ta.validate_json(sse.data)


class PydanticAiAgentWrapper(_AgentBase):
@dataclasses.dataclass
class _PydanticAiDynamicCapabilities:
tools: list[ag_ui.core.Tool]
reasoning_supported: bool | None
approvals: bool | None
image_output: bool | None
structured_output: bool | None


class PydanticAiAgentWrapper(Agent):
"""Pydantic AI agent wrapper"""

def __init__(
Expand All @@ -127,19 +138,125 @@ def __init__(
) -> None:
self._agent = agent

if capabilities is None:
capabilities = ag_ui.core.AgentCapabilities(
identity=ag_ui.core.IdentityCapabilities(name=agent.name),
transport=ag_ui.core.TransportCapabilities(streaming=True),
)
self._capabilities: ag_ui.core.AgentCapabilities
if capabilities is not None:
self._capabilities = capabilities

super().__init__(capabilities=capabilities, quick_prompts=quick_prompts)
if quick_prompts is None:
quick_prompts = []
self._quick_prompts = quick_prompts

async def setup(self) -> None:
if hasattr(self, "_capabilities"):
return

self._capabilities = await self.extract_capabilities(self._agent)

def run(self, input: ag_ui.core.RunAgentInput) -> AsyncIterator[ag_ui.core.Event]:
from pydantic_ai.ui.ag_ui import AGUIAdapter

return AGUIAdapter(agent=self._agent, run_input=input, accept="text/event-stream").run_stream() # type: ignore[return-value]

def get_capabilities(self) -> ag_ui.core.AgentCapabilities:
"""The capabilities of the agent."""
return self._capabilities

def get_quick_prompts(self) -> list[QuickPrompt]:
"""The quick prompts of the agent."""
return self._quick_prompts

@staticmethod
async def extract_capabilities(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened pydantic/pydantic-ai#5686 to hopefully push this upstream.

agent: pydantic_ai.Agent, *, ctx: pydantic_ai.RunContext | None = None
) -> ag_ui.core.AgentCapabilities:
import pydantic_ai.models
from pydantic_ai.usage import RunUsage

capabilities = ag_ui.core.AgentCapabilities(
identity=ag_ui.core.IdentityCapabilities(
name=agent.name,
description=agent.description,
type="pydantic-ai",
),
transport=ag_ui.core.TransportCapabilities(streaming=True),
tools=ag_ui.core.ToolsCapabilities(
supported=True,
client_provided=True,
),
)

if ctx is None and isinstance(agent.model, pydantic_ai.models.Model):
ctx = pydantic_ai.RunContext(deps=None, model=agent.model, usage=RunUsage())
if ctx is not None:
dynamic_capabilities = await PydanticAiAgentWrapper._extract_dynamic_capabilities(agent, ctx=ctx)

assert capabilities.tools is not None
capabilities.tools.items = dynamic_capabilities.tools
if dynamic_capabilities.reasoning_supported is not None:
capabilities.reasoning = ag_ui.core.ReasoningCapabilities(
supported=dynamic_capabilities.reasoning_supported
)
if dynamic_capabilities.approvals is not None:
capabilities.human_in_the_loop = ag_ui.core.HumanInTheLoopCapabilities(
approvals=dynamic_capabilities.approvals
)
if dynamic_capabilities.image_output is not None:
capabilities.multimodal = ag_ui.core.MultimodalCapabilities(
output=ag_ui.core.MultimodalOutputCapabilities(image=dynamic_capabilities.image_output)
)
if dynamic_capabilities.structured_output is not None:
capabilities.output = ag_ui.core.OutputCapabilities(
structured_output=dynamic_capabilities.structured_output
)

return capabilities

@staticmethod
async def _extract_dynamic_capabilities(
agent: pydantic_ai.Agent, *, ctx: pydantic_ai.RunContext
) -> _PydanticAiDynamicCapabilities:
tools = [
ag_ui.core.Tool(
name=(td := tool.tool_def).name,
description=td.description or "",
parameters=td.parameters_json_schema,
)
for toolset in agent.toolsets
for tool in (await toolset.get_tools(ctx)).values()
]

reasoning_supported: bool | None = None
approvals: bool | None = None
image_output: bool | None = None

if agent.root_capability is not None:
from pydantic_ai.capabilities import HandleDeferredToolCalls, ImageGeneration, Thinking

for capability in agent.root_capability.capabilities:
if isinstance(capability, Thinking):
reasoning_supported = True
elif isinstance(capability, HandleDeferredToolCalls):
approvals = True
elif isinstance(capability, ImageGeneration):
image_output = True

if not isinstance(agent.output_type, type):
structured_output = None
elif issubclass(agent.output_type, str):
structured_output = False
elif issubclass(agent.output_type, pydantic.BaseModel):
structured_output = True
else:
structured_output = None

return _PydanticAiDynamicCapabilities(
tools=tools,
reasoning_supported=reasoning_supported,
approvals=approvals,
image_output=image_output,
structured_output=structured_output,
)


class AgnoAgentWrapper(_AgentBase):
"""Agno agent wrapper"""
Expand All @@ -154,14 +271,75 @@ def __init__(
self._agent = agent

if capabilities is None:
capabilities = ag_ui.core.AgentCapabilities(
identity=ag_ui.core.IdentityCapabilities(name=agent.name),
transport=ag_ui.core.TransportCapabilities(streaming=True),
)
capabilities = self.extract_capabilities(agent)

super().__init__(capabilities=capabilities, quick_prompts=quick_prompts)

def run(self, input: ag_ui.core.RunAgentInput) -> AsyncIterator[ag_ui.core.Event]:
from agno.os.interfaces.agui.router import run_agent

return run_agent(self._agent, input) # type: ignore[return-value]

@staticmethod
def extract_capabilities(agent: agno.agent.Agent) -> ag_ui.core.AgentCapabilities:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened agno-agi/agno#8127 to hopefully push this upstream.

from agno.tools import Function, Toolkit

tools: list[ag_ui.core.Tool] | None = None
approvals: bool | None = None
if isinstance(agent.tools, list):
tools = []
approvals = False
for tool in agent.tools:
functions: list[Function]
if isinstance(tool, Toolkit):
functions = [fn for fn in tool.functions.values() if isinstance(fn, Function)]
elif isinstance(tool, Function):
functions = [tool]
elif callable(tool):
functions = [Function.from_callable(tool)]
else:
continue

for fn in functions:
tools.append(
ag_ui.core.Tool(
name=fn.name,
description=fn.description or "",
parameters=fn.parameters,
)
)
if fn.requires_confirmation or fn.requires_user_input:
approvals = True

return ag_ui.core.AgentCapabilities(
identity=ag_ui.core.IdentityCapabilities(
name=agent.name,
description=agent.description,
metadata=agent.metadata,
type="agno",
),
transport=ag_ui.core.TransportCapabilities(streaming=True),
tools=ag_ui.core.ToolsCapabilities(
supported=True,
items=tools,
client_provided=False,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

),
human_in_the_loop=ag_ui.core.HumanInTheLoopCapabilities(approvals=approvals)
if approvals is not None
else None,
output=ag_ui.core.OutputCapabilities(structured_output=structured_output)
if (structured_output := agent.structured_outputs is True or agent.output_schema is not None)
else None,
reasoning=ag_ui.core.ReasoningCapabilities(supported=agent.reasoning),
multi_agent=ag_ui.core.MultiAgentCapabilities(
supported=True,
sub_agents=[
ag_ui.core.SubAgentInfo(
name=agent.reasoning_agent.name,
description=agent.reasoning_agent.description,
)
],
)
if agent.reasoning_agent is not None and agent.reasoning_agent.name is not None
else None,
)
12 changes: 4 additions & 8 deletions src/_ravnar/api/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,15 @@ def _make_dynamic_agents_router(
)

@router.post("", description=description)
async def register_agent(
data: schema.RegisterAgentData,
) -> schema.AgentInfo:
async def register_agent(data: schema.RegisterAgentData) -> schema.AgentInfo:
agent = data.agent()
agent_handler.add_agent(data.id, agent)
await agent_handler.add_agent(data.id, agent)
return schema.AgentInfo(
id=data.id,
capabilities=agent.get_capabilities(),
quick_prompts=agent.get_quick_prompts(),
)

@router.delete("/{agentId}", description=description)
async def unregister_agent(
agent_id: Annotated[str, Path(alias="agentId")],
) -> None:
agent_handler.remove_agent(agent_id)
async def unregister_agent(agent_id: Annotated[str, Path(alias="agentId")]) -> None:
await agent_handler.remove_agent(agent_id)
35 changes: 28 additions & 7 deletions src/_ravnar/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import TYPE_CHECKING, cast

Expand All @@ -14,8 +15,9 @@

from _ravnar import schema
from _ravnar.events import EventProcessor
from _ravnar.mixin import SetupTeardownMixin
from _ravnar.observability import configure_logging, configure_tracing, traced
from _ravnar.utils import resolve_forward_references
from _ravnar.utils import as_awaitable, resolve_forward_references

from .api import make_router as make_api_router
from .config import AgentConfig, BaseConfig, Config
Expand All @@ -39,9 +41,12 @@ def __init__(self, config: BaseConfig | None = None) -> None:
self.app = self._make_app(config)

def _make_app(self, config: BaseConfig) -> FastAPI:
agent_handler = AgentHandler(config.agents)

app = FastAPI(
title="ravnar",
version=__version__,
lifespan=SetupTeardownMixin.lifespan_factory(agent_handler),
root_path=config.server.root_path,
)

Expand Down Expand Up @@ -74,8 +79,6 @@ async def health() -> Response:
async def version() -> str:
return __version__

agent_handler = AgentHandler(config.agents)

api_router = make_api_router(
storage_config=config.storage,
agent_handler=agent_handler,
Expand Down Expand Up @@ -105,13 +108,29 @@ def serve(self) -> None:
)


class AgentHandler:
class AgentHandler(SetupTeardownMixin):
def __init__(self, agent_config: AgentConfig) -> None:
self._static_agents: dict[str, Agent] = {id: factory() for id, factory in agent_config.static.items()}
self._dynamic_agents: dict[str, Agent] = {}
self._event_encoder = ag_ui.encoder.EventEncoder()
self._dynamic_enabled = agent_config.dynamic.enabled

@staticmethod
async def _setup_agent(agent: Agent) -> None:
await as_awaitable(agent.setup)

@staticmethod
async def _teardown_agent(agent: Agent) -> None:
await as_awaitable(agent.teardown)

async def setup(self) -> None: # type: ignore[override]
await asyncio.gather(*[self._setup_agent(agent) for agent in self._static_agents.values()])

async def teardown(self) -> None: # type: ignore[override]
await asyncio.gather(
*[self._teardown_agent(agent) for agent in (*self._static_agents.values(), *self._dynamic_agents.values())]
)

def infos(self) -> list[schema.AgentInfo]:
agents = dict(self._static_agents)
if self._dynamic_enabled:
Expand Down Expand Up @@ -139,17 +158,19 @@ def _get_agent(self, agent_id: str) -> Agent:
def assert_available(self, agent_id: str) -> None:
self._get_agent(agent_id)

def add_agent(self, agent_id: str, agent: Agent) -> None:
async def add_agent(self, agent_id: str, agent: Agent) -> None:
if agent_id in self._static_agents or agent_id in self._dynamic_agents:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Agent ID already exists")
self._dynamic_agents[agent_id] = agent
await self._setup_agent(agent)

def remove_agent(self, agent_id: str) -> None:
async def remove_agent(self, agent_id: str) -> None:
if agent_id in self._static_agents:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Static agents cannot be deleted")
if agent_id not in self._dynamic_agents:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found")
del self._dynamic_agents[agent_id]
agent = self._dynamic_agents.pop(agent_id)
await self._teardown_agent(agent)

def _sse_encoder(self, data: fastsse.Data) -> bytes:
return self._event_encoder.encode(cast(ag_ui.core.Event, data)).encode()
Expand Down
Loading
Loading