Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The Evo AI platform allows:
- Custom tools management
- **[Google Agent Development Kit (ADK)](https://google.github.io/adk-docs/)**: Base framework for agent development
- **[CrewAI Support](https://github.com/crewAI/crewAI)**: Alternative framework for agent development (in development)
- **[AG2 (formerly AutoGen)](https://github.com/ag2ai/ag2)**: Dynamic GroupChat, context-variable handoffs, and human-in-the-loop (`AI_ENGINE=ag2`)
- JWT authentication with email verification
- **[Agent 2 Agent (A2A) Protocol Support](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/)**: Interoperability between AI agents
- **[Workflow Agent with LangGraph](https://www.langchain.com/langgraph)**: Building complex agent workflows
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies = [
"crewai==0.120.1",
"crewai-tools==0.45.0",
"a2a-sdk==0.2.4",
"ag2[openai]>=0.11.0",
]

[project.optional-dependencies]
Expand Down
40 changes: 30 additions & 10 deletions src/api/chat_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse, FileData
from src.services.adk.agent_runner import run_agent as run_agent_adk, run_agent_stream
from src.services.crewai.agent_runner import run_agent as run_agent_crewai
from src.services.ag2.agent_runner import run_agent as run_agent_ag2, run_agent_stream as run_agent_stream_ag2
from src.core.exceptions import AgentNotFoundError
from src.services.service_providers import (
session_service,
Expand Down Expand Up @@ -221,16 +222,26 @@ async def websocket_chat(
logger.error(f"Error processing files: {str(e)}")
files = None

async for chunk in run_agent_stream(
agent_id=agent_id,
external_id=external_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
files=files,
):
if settings.AI_ENGINE == "ag2":
stream_gen = run_agent_stream_ag2(
agent_id=agent_id,
external_id=external_id,
message=message,
db=db,
files=files,
)
else:
stream_gen = run_agent_stream(
agent_id=agent_id,
external_id=external_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
files=files,
)
async for chunk in stream_gen:
await websocket.send_json(
{"message": json.loads(chunk), "turn_complete": False}
)
Expand Down Expand Up @@ -300,6 +311,15 @@ async def chat(
db,
files=request.files,
)
elif settings.AI_ENGINE == "ag2":
final_response = await run_agent_ag2(
agent_id,
external_id,
request.message,
session_service,
db,
files=request.files,
)

return {
"response": final_response["final_response"],
Expand Down
6 changes: 5 additions & 1 deletion src/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from .adk.agent_runner import run_agent
# google-adk is an optional dependency — guard so unit tests run without the full stack
try:
from .adk.agent_runner import run_agent
except ImportError:
pass
Empty file added src/services/ag2/__init__.py
Empty file.
199 changes: 199 additions & 0 deletions src/services/ag2/agent_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import uuid
from typing import Tuple, Optional
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat import initiate_group_chat
from autogen.agentchat.group.patterns import DefaultPattern, AutoPattern
from autogen.agentchat.group import (
ContextVariables,
RevertToUserTarget,
TerminateTarget,
AgentTarget,
OnCondition,
StringLLMCondition,
OnContextCondition,
ExpressionContextCondition,
ContextExpression,
)
from sqlalchemy.orm import Session
from src.services.agent_service import get_agent
from src.services.apikey_service import get_decrypted_api_key
from src.utils.logger import setup_logger

logger = setup_logger(__name__)


class AG2AgentBuilder:
def __init__(self, db: Session):
self.db = db

async def _get_api_key(self, agent) -> str:
"""Reuse the same key resolution logic as ADK and CrewAI builders."""
if hasattr(agent, "api_key_id") and agent.api_key_id:
key = get_decrypted_api_key(self.db, agent.api_key_id)
if key:
return key
raise ValueError(f"API key {agent.api_key_id} not found or inactive")
config_key = agent.config.get("api_key") if agent.config else None
if config_key:
try:
key = get_decrypted_api_key(self.db, uuid.UUID(config_key))
return key or config_key
except (ValueError, TypeError):
return config_key
raise ValueError(f"No API key configured for agent {agent.name}")

def _build_llm_config(self, agent, api_key: str) -> LLMConfig:
return LLMConfig({"model": agent.model, "api_key": api_key})

def _build_system_message(self, agent) -> str:
parts = []
if agent.role:
parts.append(f"Role: {agent.role}")
if agent.goal:
parts.append(f"Goal: {agent.goal}")
if agent.instruction:
parts.append(agent.instruction)
return "\n\n".join(parts)

async def build_conversable_agent(self, agent) -> ConversableAgent:
api_key = await self._get_api_key(agent)
# AG2 0.11+ rejects names containing whitespace for OpenAI models
safe_name = agent.name.replace(" ", "_")
return ConversableAgent(
name=safe_name,
system_message=self._build_system_message(agent),
description=agent.description or "",
llm_config=self._build_llm_config(agent, api_key),
)

def _apply_handoffs(self, ca: ConversableAgent, config: dict, all_agents: dict):
"""
Apply AG2 handoff conditions from the agent config's optional 'handoffs' field.

Config format:
{
"handoffs": [
{
"type": "llm",
"target_agent_id": "<uuid>",
"condition": "Route when the user asks about billing"
},
{
"type": "context",
"target_agent_id": "<uuid>",
"expression": "${is_vip} == True"
}
],
"after_work": "revert_to_user" // or "terminate"
}
"""
handoffs_config = config.get("handoffs", [])
llm_conditions = []
context_conditions = []

for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue

if h["type"] == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h["type"] == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
Comment on lines +94 to +116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Using h["type"] without validation can raise if a handoff entry is malformed.

In _apply_handoffs, this direct h["type"] access will raise a KeyError if a handoff entry is missing type, which will break setup for a misconfigured user config. Consider using h.get("type") and skipping/logging invalid entries, consistent with how missing target_agent is handled.

Suggested change
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
if h["type"] == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h["type"] == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
h_type = h.get("type")
if h_type not in ("llm", "context"):
logger.warning(
f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping"
)
continue
if h_type == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h_type == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)


if llm_conditions:
ca.handoffs.add_llm_conditions(llm_conditions)
if context_conditions:
ca.handoffs.add_context_conditions(context_conditions)

after_work = config.get("after_work", "revert_to_user")
if after_work == "terminate":
ca.handoffs.set_after_work(TerminateTarget())
else:
ca.handoffs.set_after_work(RevertToUserTarget())

async def build_group_chat_setup(self, root_agent) -> dict:
"""
Build a GroupChat pattern from an agent record with sub_agents.
Returns a dict consumed by the runner's initiate_group_chat call.
"""
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")

# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
Comment on lines +134 to +141
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.

In build_group_chat_setup, each aid triggers two get_agent calls—first to build ConversableAgents, then again when applying handoffs. Consider storing the initial db_agent objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.

Suggested change
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
db_sub_agents = {}
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
db_sub_agents[str(aid)] = db_agent
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)
root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca
# Apply handoffs to each sub-agent if configured, reusing cached db agents
for aid in sub_agent_ids:
db_agent = db_sub_agents.get(str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)

for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)

root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca

# Apply handoffs to each agent if configured
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)

api_key = await self._get_api_key(root_agent)
manager_llm = self._build_llm_config(root_agent, api_key)

pattern_type = config.get("pattern", "auto")
if pattern_type == "auto":
pattern = AutoPattern(
initial_agent=root_ca,
agents=[root_ca] + agents,
group_manager_args={"llm_config": manager_llm},
)
else:
pattern = DefaultPattern(
initial_agent=root_ca,
agents=[root_ca] + agents,
group_after_work=RevertToUserTarget(),
)

return {
"pattern": pattern,
"agents": [root_ca] + agents,
"max_rounds": config.get("max_rounds", 10),
"context_variables": ContextVariables(
data=config.get("context_variables", {})
),
}

async def build_agent(self, root_agent) -> Tuple[object, None]:
"""
Entry point matching the ADK/CrewAI AgentBuilder interface.
Returns (agent_or_setup_dict, exit_stack).

Orchestration mode is read from config["ag2_mode"]:
"group_chat" → GroupChat with sub-agents from config["sub_agents"]
"single" / absent → single ConversableAgent (default)
No new agent type is required in the DB; all AG2 agents use type="llm".
"""
ag2_mode = (root_agent.config or {}).get("ag2_mode", "single")
if ag2_mode == "group_chat":
return await self.build_group_chat_setup(root_agent), None
else:
return await self.build_conversable_agent(root_agent), None
Loading