Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions backend/app/ai/agents/manager_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class ManagerAgent:
Does NOT execute anything itself — passes decisions to the ToolExecutor.
"""

def __init__(self, db: Session, user_role: str = "ai_agent"):
def __init__(self, db: Session, user_role: str = "ai_agent", channel: str = "chat"):
self.db = db
self.user_role = user_role
self.channel = channel
self.client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
self.model = settings.ai_model
self.vector_memory = VectorMemory()
Expand All @@ -49,8 +50,8 @@ def chat(self, session_id: str, user_message: str, on_tool_call=None) -> str:
memory.add_user_message(user_message)
history = memory.get_context_window(max_messages=20)

# Create executor scoped to this session + user role
executor = ToolExecutor(self.db, session_id=session_id, user_role=self.user_role)
# Create executor scoped to this session + user role + channel
executor = ToolExecutor(self.db, session_id=session_id, user_role=self.user_role, channel=self.channel)

# Retrieve long-term memory context
long_term_context = self._get_long_term_context(user_message)
Expand Down
4 changes: 2 additions & 2 deletions backend/app/ai/claude_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ClaudeClient:
def __init__(self, db: Session, user_role: str = "ai_agent"):
self.db = db
self.user_role = user_role
self.manager = ManagerAgent(db, user_role=user_role)
self.manager = ManagerAgent(db, user_role=user_role, channel="chat")

def chat(self, session_id: str, user_message: str) -> str:
return self.manager.chat(session_id, user_message)
Expand All @@ -40,7 +40,7 @@ async def chat_stream(self, session_id: str, user_message: str) -> AsyncGenerato
messages.append({"role": msg["role"], "content": msg["content"]})

client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
executor = ToolExecutor(self.db, session_id=session_id, user_role=self.user_role)
executor = ToolExecutor(self.db, session_id=session_id, user_role=self.user_role, channel="chat_stream")

response = client.messages.create(
model=settings.ai_model,
Expand Down
5 changes: 3 additions & 2 deletions backend/app/ai/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@
class ToolExecutor:
"""Pure execution layer. No LLM. No reasoning."""

def __init__(self, db: Session, session_id: str = "", user_role: str = "ai_agent"):
def __init__(self, db: Session, session_id: str = "", user_role: str = "ai_agent", channel: str = "chat"):
self.db = db
self.session_id = session_id
self.user_role = user_role
self.channel = channel
self._tools = None
self.guard = TransactionGuard()
self.idempotency = IdempotencyGuard(session_id)
self.permissions = AIPermissionChecker(user_role)
self.observer = AIObserver(session_id, user_role)
self.observer = AIObserver(session_id, user_role, channel=channel)
self.vector_memory = VectorMemory()

@property
Expand Down
20 changes: 14 additions & 6 deletions backend/app/ai/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Execution time
- Session context
- User role
- Channel (voice_ws, chat, chat_stream)

Stores in both structured logs (for debugging) and Redis (for audit dashboard).
"""
Expand All @@ -25,6 +26,8 @@
AUDIT_SESSION_PREFIX = "ai:audit:session:"
MAX_AUDIT_ENTRIES = 1000

VALID_CHANNELS = ("voice_ws", "chat", "chat_stream")


class AIAuditEntry:
"""Single auditable AI action."""
Expand All @@ -36,13 +39,15 @@ def __init__(
tool_name: str,
tool_input: dict,
decision_reason: Optional[str] = None,
channel: str = "chat",
):
self.entry_id = str(uuid.uuid4())[:12]
self.session_id = session_id
self.user_role = user_role
self.tool_name = tool_name
self.tool_input = tool_input
self.decision_reason = decision_reason
self.channel = channel if channel in VALID_CHANNELS else "chat"
self.started_at = time.time()
self.finished_at: Optional[float] = None
self.result: Optional[dict] = None
Expand Down Expand Up @@ -74,6 +79,7 @@ def to_dict(self) -> dict:
"entry_id": self.entry_id,
"session_id": self.session_id,
"user_role": self.user_role,
"channel": self.channel,
"tool_name": self.tool_name,
"tool_input": self.tool_input,
"decision_reason": self.decision_reason,
Expand All @@ -98,7 +104,7 @@ class AIObserver:
"""Observability service for AI tool execution.

Usage:
observer = AIObserver(session_id, user_role)
observer = AIObserver(session_id, user_role, channel="chat_stream")
entry = observer.start(tool_name, tool_input, reason)
try:
result = execute_tool(...)
Expand All @@ -107,9 +113,10 @@ class AIObserver:
observer.fail(entry, str(e))
"""

def __init__(self, session_id: str, user_role: str = "ai_agent"):
def __init__(self, session_id: str, user_role: str = "ai_agent", channel: str = "chat"):
self.session_id = session_id
self.user_role = user_role
self.channel = channel if channel in VALID_CHANNELS else "chat"
self.redis = get_redis()

def start(self, tool_name: str, tool_input: dict, reason: Optional[str] = None) -> AIAuditEntry:
Expand All @@ -120,10 +127,11 @@ def start(self, tool_name: str, tool_input: dict, reason: Optional[str] = None)
tool_name=tool_name,
tool_input=tool_input,
decision_reason=reason,
channel=self.channel,
)
logger.info(
f"[AI_DECISION] session={self.session_id} role={self.user_role} "
f"tool={tool_name} input={json.dumps(tool_input, default=str)[:200]}"
f"channel={self.channel} tool={tool_name} input={json.dumps(tool_input, default=str)[:200]}"
)
return entry

Expand All @@ -133,7 +141,7 @@ def complete(self, entry: AIAuditEntry, result: dict):
self._persist(entry)
logger.info(
f"[AI_COMPLETE] session={self.session_id} tool={entry.tool_name} "
f"time={entry.execution_ms}ms success=true"
f"channel={self.channel} time={entry.execution_ms}ms success=true"
)

def fail(self, entry: AIAuditEntry, error: str):
Expand All @@ -142,7 +150,7 @@ def fail(self, entry: AIAuditEntry, error: str):
self._persist(entry)
logger.warning(
f"[AI_FAIL] session={self.session_id} tool={entry.tool_name} "
f"time={entry.execution_ms}ms error={error[:100]}"
f"channel={self.channel} time={entry.execution_ms}ms error={error[:100]}"
)

def block(self, entry: AIAuditEntry, reason: str):
Expand All @@ -151,7 +159,7 @@ def block(self, entry: AIAuditEntry, reason: str):
self._persist(entry)
logger.warning(
f"[AI_BLOCKED] session={self.session_id} tool={entry.tool_name} "
f"role={self.user_role} reason={reason}"
f"channel={self.channel} role={self.user_role} reason={reason}"
)

def get_session_audit(self, limit: int = 50) -> list[dict]:
Expand Down
2 changes: 1 addition & 1 deletion backend/app/ai/voice_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class VoiceOrchestrator:
def __init__(self, db: Session, user_role: str = "ai_agent"):
self.db = db
self.user_role = user_role
self.manager = ManagerAgent(db, user_role=user_role)
self.manager = ManagerAgent(db, user_role=user_role, channel="voice_ws")

def process_voice_message(self, session_id: str, text: str, priority: str = "normal", on_tool_call=None) -> dict:
"""Process a voice message through the Manager Agent.
Expand Down
8 changes: 7 additions & 1 deletion backend/app/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@
},
}

celery_app.autodiscover_tasks(["app.tasks"])
celery_app.conf.include = [
"app.tasks.notifications",
"app.tasks.accounting",
"app.tasks.inventory",
"app.tasks.reports",
"app.tasks.ai",
]
Loading