Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/routes/v2/modelRouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,25 @@ async def auth_and_rate_limit(request: Request):
async def chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)):
request.state.is_playground = False
request.state.version = 2
if db_config.get('orchestrator_id'):
result = await orchestrator_chat(request)
return result
data_to_send = await make_request_data(request)
response_format = data_to_send.get('body',{}).get('configuration', {}).get('response_format', {})

if db_config.get('orchestrator_id'):
# If orchestrator_id exists and response_format is non-default, use queue
if response_format and response_format.get('type') != 'default':
try:
# Publish the orchestrator message to the queue
await queue_obj.publish_message(data_to_send)
return {"success": True, "message": "Your response will be sent through configured means."}
except Exception as e:
# Log the error and return a meaningful error response
logger.error(f"Failed to publish orchestrator message: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to publish orchestrator message.")
else:
# Direct orchestrator call for default response format
result = await orchestrator_chat(request)
return result

if response_format and response_format.get('type') != 'default':
try:
# Publish the message to the queue
Expand Down
6 changes: 5 additions & 1 deletion src/services/commonServices/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ async def chat(request_body):
@handle_exceptions
async def orchestrator_chat(request_body):
try:
body = await request_body.json()
# Handle both API request (with .json() method) and queue message (dict)
if hasattr(request_body, 'json'):
body = await request_body.json()
else:
body = request_body.get('body', {})
# Extract user query from the request
user = body.get('user')
thread_id = body.get('thread_id')
Expand Down
18 changes: 16 additions & 2 deletions src/services/commonServices/queueService/queueService.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from config import Config
from src.services.commonServices.common import chat, image
from src.services.commonServices.common import chat, image, orchestrator_chat
from aio_pika.abc import AbstractIncomingMessage
from src.services.utils.logger import logger
from src.services.utils.common_utils import process_background_tasks
Expand All @@ -24,7 +24,21 @@ def __init__(self):

async def process_messages(self, messages):
"""Implement your batch processing logic here."""
type = messages.get("body",{}).get('configuration',{}).get('type')
# Check if this is an orchestrator request
body = messages.get("body", {})

# Check for orchestrator indicators in the request
has_orchestrator_id = body.get('master_agent_id') or body.get('orchestrator_id')
has_agent_configurations = body.get('agent_configurations') or body.get('master_agent_config')

# If it looks like an orchestrator request, handle it accordingly
if has_orchestrator_id and has_agent_configurations:
# Call orchestrator_chat same as chat - both expect messages format
await orchestrator_chat(messages)
return

# Handle regular chat/image requests
type = body.get('configuration', {}).get('type')
if type == 'image':
await image(messages)
return
Expand Down