Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions src/db_services/ConfigurationServices.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ async def get_bridges_with_tools_and_apikeys(bridge_id, org_id, version_id=None)
},
{
'$project': { 'service': 1, 'apikey': 1, 'apikey_limit': { '$ifNull': ['$apikey_limit', 0] },
'apikey_usage': { '$ifNull': ['$apikey_usage', 0] } }
'apikey_usage': { '$ifNull': ['$apikey_usage', 0] }, 'status': {'$ifNull': ['$status', None]} }
}
],
'as': 'apikeys_docs'
}
},
# Stage 5: Map each service to its corresponding apikey, handling empty case
# Stage 5: Map each service to its corresponding apikey and status, handling empty case
{
'$addFields': {
'apikeys': {
Expand Down Expand Up @@ -267,9 +267,64 @@ async def get_bridges_with_tools_and_apikeys(bridge_id, org_id, version_id=None)
},
{}
]
},
'apikey_status': {
'$cond': [
{ '$gt' : [ {'$size': '$apikeys_array'}, 0] },
{
'$arrayToObject': {
'$map': {
'input': '$apikeys_array',
'as': 'item',
'in': [
'$$item.k',
{
'$let': {
'vars': {
'matched': {
'$arrayElemAt': [
{
'$filter': {
'input': '$apikeys_docs',
'as': 'doc',
'cond': {
'$eq': [
'$$doc._id',
{
'$convert': {
'input': '$$item.v',
'to': 'objectId',
'onError': None,
'onNull': None
}
}
]
}
}
},
0
]
}
},
'in': {
'$cond': [
{ '$ne' : ['$$matched', None] },
'$$matched.status',
None
]
}
}
}
]
}
}
},
{}
]
}
}
},

# Stage 6: Lookup 'rag_parent_datas' using 'doc_ids'
{
'$lookup': {
Expand Down
19 changes: 19 additions & 0 deletions src/db_services/api_key_status_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from bson import ObjectId
from globals import logger
from models.mongo_connection import db

apikeyCredentialsModel = db["apikeycredentials"]

async def update_apikey_status(apikey_id: str, status: str):
if not apikey_id:
return

try:
result = await apikeyCredentialsModel.update_one(
{"_id": ObjectId(apikey_id)},
{"$set": {"status": status}}
)
if not result.modified_count:
logger.warning(f"No apikey credential updated for id={apikey_id}")
except Exception as exc:
logger.error(f"Failed to update API key status for {apikey_id}: {exc}")
1 change: 1 addition & 0 deletions src/services/commonServices/baseService/baseService.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, params):
self.type = params.get('type')
self.token_calculator = params.get('token_calculator')
self.apikey_object_id = params.get('apikey_object_id')
self.apikey_status = params.get('apikey_status')
self.image_data = params.get('images')
self.tool_call_count = params.get('tool_call_count')
self.text = params.get('text')
Expand Down
13 changes: 8 additions & 5 deletions src/services/commonServices/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
process_background_tasks_for_playground,
process_variable_state,
handle_agent_transfer,
update_cost_and_last_used_in_background,
setup_agent_pre_tools
update_cost_usage_and_apikey_status_in_background,
setup_agent_pre_tools,
)
from src.services.utils.guardrails_validator import guardrails_check
from src.services.utils.rich_text_support import process_chatbot_response
Expand Down Expand Up @@ -122,6 +122,7 @@ async def chat_multiple_agents(request_body):
async def chat(request_body):
result ={}
class_obj= {}
first_execution_error_code = None
try:
# Store bridge_configurations for potential transfer logic
bridge_configurations = request_body.get('body', {}).get('bridge_configurations', {})
Expand Down Expand Up @@ -243,6 +244,7 @@ async def chat(request_body):
# Handle exceptions during execution
execution_failed = True
original_error = str(execution_exception)
first_execution_error_code = original_error.split()[7]
original_exception = execution_exception
logger.error(f"Initial execution failed with {parsed_data['service']}/{parsed_data['model']}: {original_error}")
result = {
Expand Down Expand Up @@ -367,8 +369,8 @@ async def chat(request_body):
result['response']['testcase_result'] = testcase_result
else:
await process_background_tasks_for_playground(result, parsed_data)
await update_cost_and_last_used_in_background(parsed_data)


# Save agent bridge_id to Redis for 3 days (259200 seconds)
thread_id = parsed_data.get('thread_id')
sub_thread_id = parsed_data.get('sub_thread_id')
Expand All @@ -384,6 +386,7 @@ async def chat(request_body):
await store_in_cache(redis_key, bridge_id_to_save, ttl=259200) # 3 days
logger.info(f"Cached agent {bridge_id} for thread {thread_id}_{sub_thread_id} with key based on original primary {original_primary_bridge_id}")

await update_cost_usage_and_apikey_status_in_background(parsed_data, first_execution_error_code, True)
return JSONResponse(status_code=200, content={"success": True, "response": result["response"]})

except (Exception, ValueError, BadRequestException) as error:
Expand Down Expand Up @@ -420,10 +423,10 @@ async def chat(request_body):
}
if parsed_data['is_playground'] and parsed_data['body']['bridge_configurations'].get('playground_response_format'):
await sendResponse(parsed_data['body']['bridge_configurations']['playground_response_format'], error_object, success=False, variables=parsed_data.get('variables',{}))

await update_cost_usage_and_apikey_status_in_background(parsed_data, first_execution_error_code, False)
raise ValueError(error_object)



@handle_exceptions
async def orchestrator_chat(request_body):
try:
Expand Down
28 changes: 28 additions & 0 deletions src/services/utils/api_key_status_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from src.db_services.api_key_status_service import update_apikey_status

STATUS_BY_CODE = {
"401": "invalid",
"429": "exhuasted",
}

def classify_status_from_error(code) -> str:
if code in STATUS_BY_CODE:
return STATUS_BY_CODE[code]

return code

async def mark_apikey_status_from_response(parsed_data, code=None):
apikey_map = parsed_data.get("apikey_object_id") or {}
status_map = parsed_data.get("apikey_status") or {}
service = parsed_data.get("service")
apikey_id = apikey_map.get(service)
if not apikey_id:
return

new_status = "working" if not code else classify_status_from_error(
code
)
if status_map.get(service) == new_status:
return # already up to date; skip DB write

await update_apikey_status(apikey_id, new_status)
14 changes: 6 additions & 8 deletions src/services/utils/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..commonServices.baseService.utils import sendResponse
from src.services.utils.rich_text_support import process_chatbot_response
from src.db_services.orchestrator_history_service import orchestrator_collector
from src.services.utils.api_key_status_helper import mark_apikey_status_from_response

def setup_agent_pre_tools(parsed_data, bridge_configurations):
"""
Expand Down Expand Up @@ -164,6 +165,7 @@ def parse_request_body(request_body):
"usage" : {},
"type" : body.get('configuration',{}).get('type'),
"apikey_object_id" : body.get('apikey_object_id'),
"apikey_status": body.get('apikey_status'),
"images" : body.get('images'),
"tool_call_count": body.get('tool_call_count'),
"tokens" : {},
Expand Down Expand Up @@ -1294,12 +1296,8 @@ async def update_cost_and_last_used(parsed_data):
except Exception as e:
logger.error(f"Error updating cost and last used: {str(e)}")

async def update_cost_and_last_used_in_background(parsed_data):
"""Kick off the async cost cache update using the data available on parsed_data."""
if not isinstance(parsed_data, dict):
logger.warning("Skipping background cost update due to invalid parsed data.")
return

asyncio.create_task(update_cost_and_last_used(parsed_data))


async def update_cost_usage_and_apikey_status_in_background(parsed_data, code, success):
if success:
asyncio.create_task(update_cost_and_last_used(parsed_data))
asyncio.create_task(mark_apikey_status_from_response(parsed_data, code))
2 changes: 2 additions & 0 deletions src/services/utils/getConfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async def _prepare_configuration_response(configuration, service, bridge_id, api

apikey = setup_api_key(service, result, apikey, chatbot)
apikey_object_id = result.get('bridges', {}).get('apikey_object_id')
apikey_status = result.get('bridges', {}).get('apikey_status')

# Handle image type early return
if configuration['type'] == 'image':
Expand Down Expand Up @@ -148,6 +149,7 @@ async def _prepare_configuration_response(configuration, service, bridge_id, api
'service': service,
'apikey': apikey,
'apikey_object_id': apikey_object_id,
'apikey_status': apikey_status,
'RTLayer': RTLayer,
'template': template_content.get('template') if template_content else None,
'user_reference': result.get('bridges', {}).get('user_reference', ''),
Expand Down