Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 129 additions & 12 deletions src/rotator_library/providers/chutes_provider.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,145 @@
import asyncio
import httpx
import logging
from typing import List
import os
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from .provider_interface import ProviderInterface
from .utilities.chutes_quota_tracker import ChutesQuotaTracker

if TYPE_CHECKING:
from ..usage_manager import UsageManager

# Create a local logger for this module
import logging

lib_logger = logging.getLogger(__name__)

lib_logger = logging.getLogger('rotator_library')
lib_logger.propagate = False # Ensure this logger doesn't propagate to root
if not lib_logger.handlers:
lib_logger.addHandler(logging.NullHandler())
# Concurrency limit for parallel quota fetches
QUOTA_FETCH_CONCURRENCY = 5

class ChutesProvider(ProviderInterface):

class ChutesProvider(ChutesQuotaTracker, ProviderInterface):
"""
Provider implementation for the chutes.ai API.
Provider implementation for the chutes.ai API with quota tracking.
"""

def __init__(self, *args, **kwargs):
"""Initialize ChutesProvider with quota tracking."""
super().__init__(*args, **kwargs)

# Quota tracking cache and refresh interval
self._quota_cache: Dict[str, Dict[str, Any]] = {}
self._quota_refresh_interval: int = int(
os.environ.get("CHUTES_QUOTA_REFRESH_INTERVAL", "300")
)

def get_model_quota_group(self, model: str) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix verified: Implementing get_model_quota_group correctly ensures that all Chutes models share the same credential-level quota pool in the UsageManager. This addresses the grouping issue raised in the previous review.

"""
Get the quota group for a model.

All Chutes models share the same credential-level quota pool,
so they all belong to the same quota group.

Args:
model: Model name (ignored - all models share quota)

Returns:
Quota group identifier for shared credential-level tracking
"""
return "chutes_global"

async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]:
"""
Fetches the list of available models from the chutes.ai API.
Fetch available models from the Chutes API.

Args:
api_key: Chutes API key
client: HTTP client

Returns:
List of model names prefixed with 'chutes/'
"""
try:
response = await client.get(
"https://llm.chutes.ai/v1/models",
headers={"Authorization": f"Bearer {api_key}"}
headers={"Authorization": f"Bearer {api_key}"},
)
response.raise_for_status()
return [f"chutes/{model['id']}" for model in response.json().get("data", [])]
return [
f"chutes/{model['id']}" for model in response.json().get("data", [])
]
except httpx.RequestError as e:
lib_logger.error(f"Failed to fetch chutes.ai models: {e}")
return []
return []

# =========================================================================
# BACKGROUND JOB CONFIGURATION
# =========================================================================

def get_background_job_config(self) -> Optional[Dict[str, Any]]:
"""
Configure periodic quota usage refresh.

Returns:
Background job configuration for quota refresh
"""
return {
"interval": self._quota_refresh_interval,
"name": "chutes_quota_refresh",
"run_on_start": True,
}

async def run_background_job(
self,
usage_manager: "UsageManager",
credentials: List[str],
) -> None:
"""
Refresh quota usage for all credentials in parallel.

Args:
usage_manager: UsageManager instance
credentials: List of API keys
"""
semaphore = asyncio.Semaphore(QUOTA_FETCH_CONCURRENCY)

async def refresh_single_credential(
api_key: str, client: httpx.AsyncClient
) -> None:
async with semaphore:
try:
usage_data = await self.fetch_quota_usage(api_key, client)

if usage_data.get("status") == "success":
# Update quota cache
self._quota_cache[api_key] = usage_data

# Calculate values for usage manager
remaining_fraction = usage_data.get("remaining_fraction", 0.0)
quota = usage_data.get("quota", 0)
reset_ts = usage_data.get("reset_at")

# Store baseline in usage manager
# Since Chutes uses credential-level quota, we use a virtual model name
await usage_manager.update_quota_baseline(
api_key,
"chutes/_quota", # Virtual model for credential-level tracking
remaining_fraction,
max_requests=quota, # Max requests = quota (1 request = 1 credit)
reset_timestamp=reset_ts,
)

lib_logger.debug(
f"Updated Chutes quota baseline for credential: "
f"{usage_data['remaining']:.0f}/{quota} remaining "
f"({remaining_fraction * 100:.0f}%)"
)

except Exception as e:
lib_logger.warning(f"Failed to refresh Chutes quota usage: {e}")

# Fetch all credentials in parallel with shared HTTP client
async with httpx.AsyncClient() as client:
tasks = [
refresh_single_credential(api_key, client) for api_key in credentials
]
await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix verified: The background job now correctly parallelizes quota fetches using asyncio.gather and a shared httpx.AsyncClient. This significantly improves efficiency when managing multiple credentials, as suggested.

Loading