Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ vault/agent-out
vault/keys/rsa_private_key.pem
vault/keys/*.pem.old

# RSA Private Keys - DO NOT COMMIT
vault/keys/rsa_private_key.pem
vault/keys/*.pem.old

# Snyk Security Extension - AI Rules (auto-generated)
.github/instructions/snyk_rules.instructions.md
# Dynamically created Ruuter health endpoint for tests
DSL/Ruuter.private/rag-search/GET/health.yml
DSL/Ruuter.private/rag-search/GET/health.yml
2 changes: 1 addition & 1 deletion GUI/src/components/MessageContent/MessageContent.scss
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@
color: rgba(255, 255, 255, 0.7);
}
}
}
}
2 changes: 1 addition & 1 deletion GUI/src/components/MessageContent/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ const MessageContent: FC<MessageContentProps> = ({ content }) => {
);
};

export default MessageContent;
export default MessageContent;
3 changes: 1 addition & 2 deletions GUI/src/hooks/useStreamingResponse.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,4 @@ export const useStreamingResponse = (channelId: string): UseStreamingResponseRet
stopStreaming,
isStreaming,
};
};

};
23 changes: 12 additions & 11 deletions generate_presigned_url.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import boto3
from botocore.client import Config
from typing import List, Dict
from loguru import logger

# Create S3 client for MinIO
s3_client = boto3.client(
Expand All @@ -20,7 +21,7 @@
# Generate presigned URLs
presigned_urls: List[str] = []

print("Generating presigned URLs...")
logger.info("Generating presigned URLs...")
for file_info in files_to_process:
try:
url = s3_client.generate_presigned_url(
Expand All @@ -29,11 +30,11 @@
ExpiresIn=24 * 3600, # 4 hours in seconds
)
presigned_urls.append(url)
print(f":white_check_mark: Generated URL for: {file_info['key']}")
print(f" URL: {url}")
logger.success(f"Generated URL for: {file_info['key']}")
logger.info(f" URL: {url}")
except Exception as e:
print(f":x: Failed to generate URL for: {file_info['key']}")
print(f" Error: {str(e)}")
logger.error(f"Failed to generate URL for: {file_info['key']}")
logger.error(f" Error: {str(e)}")

output_file: str = "minio_presigned_urls.txt"

Expand All @@ -50,14 +51,14 @@
for i, url in enumerate(presigned_urls, 1):
f.write(f"URL {i}:\n{url}\n\n")

print(f"\n:white_check_mark: Presigned URLs saved to: {output_file}")
print(f"Total URLs generated: {len(presigned_urls)}")
logger.success(f"Presigned URLs saved to: {output_file}")
logger.info(f"Total URLs generated: {len(presigned_urls)}")

# Display the combined URL string for easy copying
if presigned_urls:
print("\nCombined URL string (for signedUrls environment variable):")
print("=" * 60)
print("|||".join(presigned_urls))
logger.info("Combined URL string (for signedUrls environment variable):")
logger.info("=" * 60)
logger.info("|||".join(presigned_urls))

except Exception as e:
print(f":x: Failed to save URLs to file: {str(e)}")
logger.error(f"Failed to save URLs to file: {str(e)}")
14 changes: 7 additions & 7 deletions grafana-configs/loki_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class LokiLogger:

def __init__(
self, loki_url: str = "http://loki:3100", service_name: str = "default"
):
) -> None:
"""
Initialize LokiLogger

Expand All @@ -32,7 +32,7 @@ def __init__(
# Set default timeout for all requests
self.timeout = 5

def _send_to_loki(self, level: str, message: str):
def _send_to_loki(self, level: str, message: str) -> None:
"""Send log entry directly to Loki API"""
try:
# Create timestamp in nanoseconds (Loki requirement)
Expand Down Expand Up @@ -78,16 +78,16 @@ def _send_to_loki(self, level: str, message: str):

# Also print to console for immediate feedback
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {level: <8} | {message}")
print(f"[{timestamp}] {level: <8} | {message}") # noqa: T201

def info(self, message: str):
def info(self, message: str) -> None:
self._send_to_loki("INFO", message)

def error(self, message: str):
def error(self, message: str) -> None:
self._send_to_loki("ERROR", message)

def warning(self, message: str):
def warning(self, message: str) -> None:
self._send_to_loki("WARNING", message)

def debug(self, message: str):
def debug(self, message: str) -> None:
self._send_to_loki("DEBUG", message)
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,16 @@ unfixable = []
"src/contextual_retrieval/contextual_retrieval_api_client.py" = ["ANN401"] # httpx **kwargs pass-through
"src/tool_classifier/workflows/service_workflow.py" = ["ANN401"] # LLMManager passed as Any - dynamic multi-provider LLM interface
"src/guardrails/dspy_nemo_adapter.py" = ["ANN401"] # LangChain LLM interface + DSPy dynamic types
"src/tool_classifier/param_extractor.py" = ["ANN401"] # DSPy streamify Any type
"src/tool_classifier/api_response_formatter.py" = ["ANN401"] # DSPy streamify Any type
"src/llm_orchestrator_config/context_manager.py" = ["ANN401"] # MockResponse with dynamic attributes
"src/optimization/metrics/*.py" = ["ANN401"] # DSPy optimizer trace parameter (internal type)
"byk-stack-setup/script.py" = ["T201"] # CLI script uses print

"byk-stack-setup/script.py" = ["T201"] # Allow print statements in setup script


"src/utils/api_tool_session_store.py" = ["ANN401"] # Dynamic Pydantic model field updates via **kwargs
"src/tool_classifier/workflows/api_tool_workflow.py" = ["ANN401", "N815"] # Dynamic guardrails adapter + orchestration service Any types; camelCase _MinimalRequest field for API contract

[tool.ruff.format]
# Like Black, use double quotes for strings.
Expand Down Expand Up @@ -145,4 +150,4 @@ exclude = [
]

# --- Global strictness ---
typeCheckingMode = "standard" # Standard typechecking mode
typeCheckingMode = "standard" # Standard typechecking mode
28 changes: 19 additions & 9 deletions src/llm_orchestration_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""LLM Orchestration Service - Business logic for LLM orchestration."""

from typing import Optional, List, Dict, Union, Any, AsyncIterator
from typing import Optional, List, Dict, Union, Any, AsyncIterator, TYPE_CHECKING
import os
import time
import asyncio
Expand Down Expand Up @@ -45,8 +45,15 @@
from src.llm_orchestrator_config.stream_config import StreamConfig
from src.vector_indexer.constants import ResponseGenerationConstants
from src.utils.error_utils import generate_error_id, log_error_with_context
from src.utils.stream_manager import stream_manager
from src.utils.stream_manager import stream_manager, StreamContext
from src.utils.cost_utils import calculate_total_costs, get_lm_usage_since

if TYPE_CHECKING:
from src.llm_orchestrator_config.embedding_manager import EmbeddingManager
from src.llm_orchestrator_config.context_manager import (
ContextGenerationManager,
)
from src.llm_orchestrator_config.config.loader import ConfigurationLoader
from src.utils.time_tracker import log_step_timings
from src.utils.budget_tracker import get_budget_tracker
from src.utils.production_store import get_production_store
Expand Down Expand Up @@ -325,7 +332,7 @@ async def process_orchestration_request(

# Store detected language in request for use throughout pipeline
# Using setattr for type safety - adds dynamic attribute to Pydantic model instance
setattr(request, "_detected_language", detected_language)
setattr(request, "_detected_language", detected_language) # noqa: B010

# STEP 0.1: Multi-step service prefix check (bypass NLU pipeline)
if request.message.startswith(SERVICE_STEP_PREFIXES):
Expand Down Expand Up @@ -587,7 +594,7 @@ async def stream_orchestration_response(

# Store detected language in request for use throughout pipeline
# Using setattr for type safety - adds dynamic attribute to Pydantic model instance
setattr(request, "_detected_language", detected_language)
setattr(request, "_detected_language", detected_language) # noqa: B010

# STEP 0.1: Multi-step service prefix check (bypass NLU pipeline)
if request.message.startswith(SERVICE_STEP_PREFIXES):
Expand Down Expand Up @@ -716,6 +723,9 @@ async def stream_orchestration_response(

# Route to appropriate workflow (streaming)
# route_to_workflow returns AsyncIterator[str] when is_streaming=True
# Inject costs_metric into the classification context so the
# API Tool workflow can append its output guardrail costs.
classification.metadata["costs_metric"] = costs_metric
start_time = time.time()
stream_result = await self.tool_classifier.route_to_workflow(
classification=classification,
Expand Down Expand Up @@ -808,7 +818,7 @@ async def _stream_rag_pipeline(
self,
request: OrchestrationRequest,
components: Dict[str, Any],
stream_ctx: Any,
stream_ctx: StreamContext,
costs_metric: Dict[str, Dict[str, Any]],
time_metric: Dict[str, float],
) -> AsyncIterator[str]:
Expand Down Expand Up @@ -1787,7 +1797,7 @@ def _store_production_inference_data(

# Store the inference result asynchronously without blocking

def store_async():
def store_async() -> None:
"""Run async storage in a new event loop in a separate thread."""
try:
loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -2925,7 +2935,7 @@ def get_available_embedding_models_for_indexer(
# Lazy Initialization Helpers for Vector Indexer (Private Methods)
# ========================================================================

def _get_embedding_manager(self):
def _get_embedding_manager(self) -> "EmbeddingManager":
"""Lazy initialization of EmbeddingManager for vector indexer."""
if not hasattr(self, "_embedding_manager"):
from src.llm_orchestrator_config.embedding_manager import EmbeddingManager
Expand All @@ -2939,7 +2949,7 @@ def _get_embedding_manager(self):

return self._embedding_manager

def _get_context_manager(self):
def _get_context_manager(self) -> "ContextGenerationManager":
"""Lazy initialization of ContextGenerationManager for vector indexer."""
if not hasattr(self, "_context_manager"):
from src.llm_orchestrator_config.context_manager import (
Expand All @@ -2953,7 +2963,7 @@ def _get_context_manager(self):

return self._context_manager

def _get_config_loader(self):
def _get_config_loader(self) -> "ConfigurationLoader":
"""Lazy initialization of ConfigurationLoader for vector indexer."""
if not hasattr(self, "_config_loader"):
from src.llm_orchestrator_config.config.loader import ConfigurationLoader
Expand Down
30 changes: 16 additions & 14 deletions src/llm_orchestration_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
VALIDATION_GENERIC_ERROR,
)
from src.llm_orchestrator_config.stream_config import StreamConfig
from src.llm_orchestrator_config.exceptions import StreamTimeoutException
from src.llm_orchestrator_config.exceptions import StreamTimeoutError
from src.utils.stream_timeout import stream_timeout
from src.utils.error_utils import generate_error_id, log_error_with_context
from src.utils.rate_limiter import RateLimiter
Expand Down Expand Up @@ -132,7 +132,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:

# Custom exception handlers for user-friendly error messages
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
async def validation_exception_handler(
request: Request, exc: RequestValidationError
) -> StreamingResponse | JSONResponse:
"""
Handle Pydantic validation errors with user-friendly messages.

Expand Down Expand Up @@ -197,7 +199,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
pass

# Return SSE format for streaming endpoint
async def validation_error_stream():
async def validation_error_stream() -> AsyncGenerator[str, None]:
error_payload: Dict[str, Any] = {
"chatId": chat_id,
"payload": {"content": user_message},
Expand Down Expand Up @@ -440,7 +442,7 @@ async def test_orchestrate_llm_request(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error occurred",
)
) from e


@app.post(
Expand All @@ -452,7 +454,7 @@ async def test_orchestrate_llm_request(
async def stream_orchestrated_response(
http_request: Request,
request: OrchestrationRequest,
):
) -> StreamingResponse:
"""
Stream LLM orchestration response with validation-first guardrails.

Expand Down Expand Up @@ -492,7 +494,7 @@ async def stream_orchestrated_response(
import json as json_module
from datetime import datetime

def create_sse_error_stream(chat_id: str, error_message: str):
def create_sse_error_stream(chat_id: str, error_message: str) -> str:
"""Create SSE format error response."""
from typing import Dict, Any

Expand All @@ -517,7 +519,7 @@ def create_sse_error_stream(chat_id: str, error_message: str):
error_msg = f"Streaming is only available for production environment. Current environment: {request.environment}. Please use /orchestrate endpoint for non-streaming environments."
logger.warning(error_msg)

async def env_error_stream():
async def env_error_stream() -> AsyncGenerator[str, None]:
yield create_sse_error_stream(request.chatId, error_msg)

return StreamingResponse(
Expand All @@ -535,7 +537,7 @@ async def env_error_stream():
error_msg = "I apologize, but the service is not available at the moment. Please try again later."
logger.error("Orchestration service not found in app state")

async def service_error_stream():
async def service_error_stream() -> AsyncGenerator[str, None]:
yield create_sse_error_stream(request.chatId, error_msg)

return StreamingResponse(
Expand All @@ -553,7 +555,7 @@ async def service_error_stream():
error_msg = "I apologize, but the service is not available at the moment. Please try again later."
logger.error("Orchestration service is None")

async def service_none_stream():
async def service_none_stream() -> AsyncGenerator[str, None]:
yield create_sse_error_stream(request.chatId, error_msg)

return StreamingResponse(
Expand Down Expand Up @@ -598,7 +600,7 @@ async def service_none_stream():
)

# Return SSE format with rate limit error
async def rate_limit_error_stream():
async def rate_limit_error_stream() -> AsyncGenerator[str, None]:
yield create_sse_error_stream(request.chatId, error_msg)

return StreamingResponse(
Expand All @@ -614,16 +616,16 @@ async def rate_limit_error_stream():
)

# Wrap streaming response with timeout
async def timeout_wrapped_stream():
async def timeout_wrapped_stream() -> AsyncGenerator[str, None]:
"""Generator wrapper with timeout enforcement."""
try:
async with stream_timeout(StreamConfig.MAX_STREAM_DURATION_SECONDS):
async for (
chunk
) in orchestration_service.stream_orchestration_response(request):
yield chunk
except StreamTimeoutException as timeout_exc:
# StreamTimeoutException already has error_id
except StreamTimeoutError as timeout_exc:
# StreamTimeoutError already has error_id
log_error_with_context(
logger,
timeout_exc.error_id,
Expand Down Expand Up @@ -660,7 +662,7 @@ async def timeout_wrapped_stream():
error_id = generate_error_id()
logger.error(f"[{error_id}] Unexpected error in streaming endpoint: {str(e)}")

async def unexpected_error_stream():
async def unexpected_error_stream() -> AsyncGenerator[str, None]:
yield create_sse_error_stream(
request.chatId if hasattr(request, "chatId") else "unknown",
"I apologize, but I encountered an unexpected issue. Please try again.",
Expand Down
Loading
Loading