A robust, asynchronous, and thread-safe Python library for managing a pool of API keys. It is designed to be integrated into applications (such as the Universal LLM API Proxy included in this project) to provide a powerful layer of resilience and high availability when interacting with multiple LLM providers.
- Asynchronous by Design: Built with
asyncioandhttpxfor high-performance, non-blocking I/O. - Anthropic API Compatibility: Built-in translation layer (
anthropic_compat) enables Anthropic API clients (like Claude Code) to use any supported provider. - Advanced Concurrency Control: A single API key can be used for multiple concurrent requests. By default, it supports concurrent requests to different models. With configuration (
MAX_CONCURRENT_REQUESTS_PER_KEY_<PROVIDER>), it can also support multiple concurrent requests to the same model using the same key. - Smart Key Management: Selects the optimal key for each request using a tiered, model-aware locking strategy to distribute load evenly and maximize availability.
- Configurable Rotation Strategy: Choose between deterministic least-used selection (perfect balance) or default weighted random selection (unpredictable, harder to fingerprint).
- Deadline-Driven Requests: A global timeout ensures that no request, including all retries and key selections, exceeds a specified time limit.
- OAuth & API Key Support: Built-in support for standard API keys and complex OAuth flows.
- Gemini CLI: Full OAuth 2.0 web flow with automatic project discovery, free-tier onboarding, and credential prioritization (paid vs free tier).
- Antigravity: Full OAuth 2.0 support for Gemini 3, Gemini 2.5, and Claude Sonnet 4.5 models with thought signature caching(Full support for Gemini 3 and Claude models). First on the scene to provide full support for Gemini 3 via Antigravity with advanced features like thought signature caching and tool hallucination prevention.
- Qwen Code: Device Code flow support.
- iFlow: Authorization Code flow with local callback handling.
- Stateless Deployment Ready: Can load complex OAuth credentials from environment variables, eliminating the need for physical credential files in containerized environments.
- Intelligent Error Handling:
- Escalating Per-Model Cooldowns: Failed keys are placed on a temporary, escalating cooldown for specific models.
- Key-Level Lockouts: Keys failing across multiple models are temporarily removed from rotation.
- Stream Recovery: The client detects mid-stream errors (like quota limits) and gracefully handles them.
- Credential Prioritization: Automatic tier detection and priority-based credential selection (e.g., paid tier credentials used first for models that require them).
- Advanced Model Requirements: Support for model-tier restrictions (e.g., Gemini 3 requires paid-tier credentials).
- Robust Streaming Support: Includes a wrapper for streaming responses that reassembles fragmented JSON chunks.
- Detailed Usage Tracking: Tracks daily and global usage for each key, persisted to a JSON file.
- Automatic Daily Resets: Automatically resets cooldowns and archives stats daily.
- Provider Agnostic: Works with any provider supported by
litellm. - Extensible: Easily add support for new providers through a simple plugin-based architecture.
- Temperature Override: Global temperature=0 override to prevent tool hallucination with low-temperature settings.
- Shared OAuth Base: Refactored OAuth implementation with reusable
GoogleOAuthBasefor multiple providers. - Fair Cycle Rotation: Ensures each credential exhausts at least once before any can be reused within a tier. Prevents a single credential from being repeatedly used while others sit idle. Configurable per provider with tracking modes and cross-tier support.
- Custom Usage Caps: Set custom limits per tier, per model/group that are more restrictive than actual API limits. Supports percentages (e.g., "80%") and multiple cooldown modes (
quota_reset,offset,fixed). Credentials go on cooldown before hitting actual API limits. - Centralized Defaults: All tunable defaults are defined in
config/defaults.pyfor easy customization and visibility. - Static Model Fallback: Providers can define
get_static_models()to return a hardcoded model list when dynamic discovery fails (network errors, auth failures, server errors). Ensures the model list is always available even when the provider API is unreachable. - Performance Optimizations: Hot paths are optimized with
__slots__on critical classes (reducing memory and attribute lookup overhead), pre-compiled regex patterns, cached model/provider lookups, and lazy logging initialization. These optimizations reduce per-request overhead without changing behavior. - Batched Persistence: Usage tracking data is persisted to disk via a background batched writer (
BatchedPersistence) instead of synchronous writes on every update. State changes are kept in-memory and flushed periodically, dramatically reducing disk I/O in high-throughput scenarios while ensuring data durability on shutdown. - Async Read-Write Locks:
ReadWriteLockallows multiple concurrent readers with exclusive writer access, replacing standardasyncio.Lockin read-heavy patterns (model lookups, usage queries) for better throughput. - Transaction Logger: Correlated request/response logging (
TransactionLogger) creates a unique directory per API transaction with both client-level I/O and provider-specific details. Supports streaming chunk logging, metadata capture (timing, usage, reasoning), and provider subdirectory logging viaProviderLogger. - Enhanced Error Context:
InternalServerErrornow includesllm_providerandmodelparameters for precise error attribution. Streaming retry handling robustly processes non-dict error details (e.g., string error payloads) by wrapping them before field access. - Moonshot Provider Support: Moonshot AI (
moonshot/) is supported as a known litellm provider with streaming and function calling.reasoning_contentfrom Moonshot responses is preserved through the streaming pipeline and correctly translated to Anthropic thinking blocks viaanthropic_compat. - ZAI Provider Support: Full support for the ZAI (z.ai) API with hourly request quota tracking (lite/pro/max tiers), sequential rotation mode, static model fallback, and additional native endpoints (video generation, image generation, agent chat, tokenizer, layout parsing, web reader).
To install the library, you can install it directly from a local path. Using the -e flag installs it in "editable" mode, which is recommended for development.
pip install -e .This is the main class for interacting with the library. It is designed to be a long-lived object that manages the state of your API key pool.
import os
from dotenv import load_dotenv
from rotator_library import RotatingClient
# Load environment variables from .env file
load_dotenv()
# Dynamically load all provider API keys from environment variables
api_keys = {}
for key, value in os.environ.items():
# This pattern finds keys like "GEMINI_API_KEY_1" or "OPENAI_API_KEY"
if (key.endswith("_API_KEY") or "_API_KEY_" in key) and key != "PROXY_API_KEY":
# Extracts "gemini" from "GEMINI_API_KEY_1"
provider = key.split("_API_KEY")[0].lower()
if provider not in api_keys:
api_keys[provider] = []
api_keys[provider].append(value)
# Initialize empty dictionary for OAuth credentials (or load from CredentialManager)
oauth_credentials = {}
client = RotatingClient(
api_keys=api_keys,
oauth_credentials=oauth_credentials,
max_retries=2,
usage_file_path="key_usage.json",
configure_logging=True,
global_timeout=30,
abort_on_callback_error=True,
litellm_provider_params={},
ignore_models={},
whitelist_models={},
enable_request_logging=False,
max_concurrent_requests_per_key={},
rotation_tolerance=2.0, # 0.0=deterministic, 2.0=recommended random
data_dir=None, # Auto-detect: EXE dir if frozen, else cwd
)-
api_keys(Optional[Dict[str, List[str]]]): A dictionary mapping provider names (e.g., "openai", "anthropic") to a list of API keys. -
oauth_credentials(Optional[Dict[str, List[str]]]): A dictionary mapping provider names (e.g., "gemini_cli", "qwen_code") to a list of file paths to OAuth credential JSON files. -
max_retries(int, default:2): The number of times to retry a request with the same key if a transient server error (e.g., 500, 503) occurs. -
usage_file_path(str, default:"key_usage.json"): The path to the JSON file where usage statistics (tokens, cost, success counts) are persisted. -
configure_logging(bool, default:True): IfTrue, configures the library's logger to propagate logs to the root logger. Set toFalseif you want to handle logging configuration manually. -
global_timeout(int, default:30): A hard time limit (in seconds) for the entire request lifecycle. If the request (including all retries) takes longer than this, it is aborted. -
abort_on_callback_error(bool, default:True): IfTrue, any exception raised bypre_request_callbackwill abort the request. IfFalse, the error is logged and the request proceeds. -
litellm_provider_params(Optional[Dict[str, Any]], default:None): A dictionary of extra parameters to pass tolitellmfor specific providers. -
ignore_models(Optional[Dict[str, List[str]]], default:None): A dictionary where keys are provider names and values are lists of model names/patterns to exclude (blacklist). Supports wildcards (e.g.,"*-preview"). -
whitelist_models(Optional[Dict[str, List[str]]], default:None): A dictionary where keys are provider names and values are lists of model names/patterns to always include, overridingignore_models. -
enable_request_logging(bool, default:False): IfTrue, enables detailed per-request file logging (useful for debugging complex interactions). -
max_concurrent_requests_per_key(Optional[Dict[str, int]], default:None): A dictionary defining the maximum number of concurrent requests allowed for a single API key for a specific provider. Defaults to 1 if not specified. -
rotation_tolerance(float, default:0.0): Controls credential rotation strategy:0.0: Deterministic - Always selects the least-used credential for perfect load balance.2.0(default, recommended): Weighted Random - Randomly selects credentials with bias toward less-used ones. Provides unpredictability (harder to fingerprint) while maintaining good balance.5.0+: High Randomness - Even heavily-used credentials have significant selection probability. Maximum unpredictability.
The weight formula is:
weight = (max_usage - credential_usage) + tolerance + 1Use Cases:
0.0: When perfect load balance is critical2.0: When avoiding fingerprinting/rate limit detection is important5.0+: For stress testing or maximum unpredictability
-
data_dir(Optional[Union[str, Path]], default:None): Root directory for all data files (logs, cache, oauth credentials, key_usage.json). IfNone, auto-detects: uses the EXE directory if running as a frozen binary, otherwise the current working directory.
The RotatingClient is asynchronous and manages an httpx.AsyncClient internally. It's crucial to close the client properly to release resources. The recommended way is to use an async with block.
import asyncio
async def main():
async with RotatingClient(api_keys=api_keys) as client:
# ... use the client ...
response = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response)
asyncio.run(main())This is the primary method for making API calls. It's a wrapper around litellm.acompletion that adds the core logic for key acquisition, selection, and retries.
- Parameters: Accepts the same keyword arguments as
litellm.acompletion. Themodelparameter is required and must be a string in the formatprovider/model_name. - Returns:
- For non-streaming requests, it returns the
litellmresponse object. - For streaming requests, it returns an async generator that yields OpenAI-compatible Server-Sent Events (SSE). The wrapper ensures that key locks are released and usage is recorded only after the stream is fully consumed.
- For non-streaming requests, it returns the
Streaming Example:
async def stream_example():
async with RotatingClient(api_keys=api_keys) as client:
response_stream = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Tell me a long story."}],
stream=True
)
async for chunk in response_stream:
print(chunk)
asyncio.run(stream_example())A wrapper around litellm.aembedding that provides the same key management and retry logic for embedding requests.
Calculates the token count for a given text or list of messages using litellm.token_counter.
Fetches a list of available models for a specific provider, applying any configured whitelists or blacklists. Results are cached in memory. If the provider API is unreachable, falls back to get_static_models() if available.
async def get_all_available_models(self, grouped: bool = True) -> Union[Dict[str, List[str]], List[str]]:
Fetches a dictionary of all available models, grouped by provider, or as a single flat list if grouped=False.
Handle Anthropic Messages API requests. Accepts requests in Anthropic's format, translates them to OpenAI format internally, processes them through acompletion, and returns responses in Anthropic's format.
- Parameters:
request: AnAnthropicMessagesRequestobject (fromanthropic_compat.models)raw_request: Optional raw request object for client disconnect checkspre_request_callback: Optional async callback before each API request
- Returns:
- For non-streaming: dict in Anthropic Messages format
- For streaming: AsyncGenerator yielding Anthropic SSE format strings
Handle Anthropic count_tokens API requests. Counts the number of tokens that would be used by a Messages API request.
- Parameters:
request- AnAnthropicCountTokensRequestobject - Returns: Dict with
input_tokenscount in Anthropic format
The library includes a translation layer (anthropic_compat) that enables Anthropic API clients to use any OpenAI-compatible provider.
from rotator_library.anthropic_compat import (
AnthropicMessagesRequest,
AnthropicCountTokensRequest,
translate_anthropic_request,
openai_to_anthropic_response,
anthropic_streaming_wrapper,
)
# Create an Anthropic-format request
request = AnthropicMessagesRequest(
model="gemini/gemini-2.5-flash",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Use with RotatingClient
async with RotatingClient(api_keys=api_keys) as client:
response = await client.anthropic_messages(request)
print(response["content"][0]["text"])- Full Message Translation: Converts between Anthropic and OpenAI message formats including text, images, tool_use, and tool_result blocks
- Extended Thinking Support: Translates Anthropic's
thinkingconfiguration toreasoning_effortfor providers that support it - Streaming SSE Conversion: Converts OpenAI streaming chunks to Anthropic's SSE event format (
message_start,content_block_delta, etc.) - Reasoning Content Preservation:
reasoning_contentfields from providers (Moonshot, DeepSeek, etc.) are correctly translated to Anthropicthinkingblocks in streaming responses - Cache Token Handling: Properly translates
prompt_tokens_details.cached_tokensto Anthropic'scache_read_input_tokens - Tool Call Support: Full support for tool definitions and tool use/result blocks
The library includes a utility to manage credentials easily:
python -m src.rotator_library.credential_toolUse this tool to:
- Initialize OAuth: Run the interactive login flows for Gemini, Qwen, and iFlow.
- Export Credentials: Generate
.envcompatible configuration blocks from your saved OAuth JSON files. This is essential for setting up stateless deployments.
- Auth: Uses OAuth 2.0 Device Flow. Requires manual entry of email/identifier if not returned by the provider.
- Resilience: Injects a dummy tool (
do_not_call_me) into requests with no tools to prevent known stream corruption issues on the API. - Reasoning: Parses
<think>tags in the response and exposes them asreasoning_content. - Schema Cleaning: Recursively removes
strictandadditionalPropertiesfrom all tool schemas. Qwen's API has stricter validation than OpenAI's, and these properties cause400 Bad Requesterrors.
- Auth: Uses Authorization Code Flow with a local callback server (port 11451).
- Key Separation: Distinguishes between the OAuth
access_token(used to fetch user info) and theapi_key(used for actual chat requests). - Resilience: Similar to Qwen, injects a placeholder tool to stabilize streaming for empty tool lists.
- Schema Cleaning: Recursively removes
strictandadditionalPropertiesfrom all tool schemas to prevent API validation errors. - Custom Models: Supports model definitions via
IFLOW_MODELSenvironment variable (JSON array of model IDs or objects).
- Discovery: Dynamically fetches available models from the NVIDIA API.
- Thinking: Automatically injects the
thinkingparameter intoextra_bodyfor DeepSeek models (deepseek-v3.1, etc.) whenreasoning_effortis set to low/medium/high.
- Auth: Simulates the Google Cloud CLI authentication flow.
- Project Discovery: Automatically discovers the default Google Cloud Project ID with enhanced onboarding flow.
- Credential Prioritization: Automatic detection and prioritization of paid vs free tier credentials.
- Model Tier Requirements: Gemini 3 models automatically filtered to paid-tier credentials only.
- Gemini 3 Support: Full support for Gemini 3 models with:
thinkingLevelconfiguration (low/high)- Tool hallucination prevention via system instruction injection
- ThoughtSignature caching for multi-turn conversations
- Parameter signature injection into tool descriptions
- Rate Limits: Implements smart fallback strategies (e.g., switching from
gemini-1.5-protogemini-1.5-pro-002) when rate limits are hit.
- Auth: Uses OAuth 2.0 flow similar to Gemini CLI, with Antigravity-specific credentials and scopes.
- Credential Prioritization: Automatic detection and prioritization of paid vs free tier credentials (paid tier resets every 5 hours, free tier resets weekly).
- Models: Supports Gemini 3 Pro, Gemini 2.5 Flash/Flash Lite, Claude Sonnet 4.5 (with/without thinking), Claude Opus 4.5 (thinking only), and GPT-OSS 120B via Google's internal Antigravity API.
- Quota Groups: Models that share quota are automatically grouped:
- Claude/GPT-OSS:
claude-sonnet-4-5,claude-opus-4-5,gpt-oss-120b-medium - Gemini 3 Pro:
gemini-3-pro-high,gemini-3-pro-low,gemini-3-pro-preview - Gemini 2.5 Flash:
gemini-2.5-flash,gemini-2.5-flash-thinking,gemini-2.5-flash-lite - All models in a group deplete the usage of the group equally. So in claude group - it is beneficial to use only Opus, and forget about Sonnet and GPT-OSS.
- Claude/GPT-OSS:
- Quota Baseline Tracking: Background job fetches quota status from API every 5 minutes to provide accurate remaining quota estimates.
- Thought Signature Caching: Server-side caching of
thoughtSignaturedata for multi-turn conversations with Gemini 3 models. - Tool Hallucination Prevention: Automatic injection of system instructions and parameter signatures for Gemini 3 and Claude to prevent tool parameter hallucination.
- Parallel Tool Usage Instruction: Configurable instruction injection to encourage parallel tool calls (enabled by default for Claude).
- Thinking Support:
- Gemini 3: Uses
thinkingLevel(string: "low"/"high") - Gemini 2.5 Flash: Uses
-thinkingvariant whenreasoning_effortis provided - Claude Sonnet 4.5: Uses
thinkingBudget(optional - supports both thinking and non-thinking modes) - Claude Opus 4.5: Uses
thinkingBudget(always uses thinking variant)
- Gemini 3: Uses
- Base URL Fallback: Automatic fallback between sandbox and production endpoints.
- Fair Cycle Rotation: Enabled by default in sequential mode. Ensures all credentials cycle before reuse.
- Custom Caps: Configurable per-tier caps with offset cooldowns for pacing usage. See
config/defaults.py.
- API: OpenAI-compatible completion API at
https://api.moonshot.ai/v1. Supported via litellm as a known provider with routemoonshot/. - Auth: Standard API key via
MOONSHOT_API_KEYenvironment variable. OptionalMOONSHOT_API_BASEfor custom endpoint. - Features: Streaming and function calling support.
- Reasoning Content:
reasoning_contentfrom Moonshot responses is preserved through the streaming pipeline and correctly translated to Anthropicthinkingblocks when usinganthropic_compat.
- API: OpenAI-compatible completion API at
https://api.z.ai/api/coding/paas/v4. Configurable viaZAI_API_BASEenvironment variable. - Auth: Standard API key via
ZAI_API_KEYenvironment variable. - Quota Tracking: Hourly request quotas with three tiers:
lite: 100 requests/hourpro: 1000 requests/hourmax: 4000 requests/hour- Background quota monitoring via
GET https://api.z.ai/api/monitor/usage/quota/limit.
- Quota Error Parsing: Custom parser for ZAI-specific error codes:
1113= insufficient balance (cooldown until midnight UTC),429= hourly quota exhausted (cooldown until next hour boundary). - Rotation: Sequential rotation mode by default -- use one key until its quota is exhausted, then switch.
- Static Model Fallback: Returns a documented model list (
glm-5.1,glm-5,glm-5-turbo,glm-4.7,glm-4.6,glm-4.5, etc.) when dynamic discovery fails. - Native Endpoints: Additional ZAI-specific API endpoints beyond chat completions:
video/generateandvideo/{id}/statusfor async video generationimages/generationsandimages/{id}for async image generationagents/chat,agents/file-upload,agents/async-result,agents/conversationfor agent workflowstools/tokenizer,tools/layout-parsing,tools/web-readerfor utility tools
- All models share a single hourly quota group (
zai_global), so cooldowns on one model propagate to all others.
The client uses a sophisticated error handling mechanism:
- Error Classification: All exceptions from
litellmare passed through aclassify_errorfunction to determine their type (rate_limit,authentication,server_error,quota,context_length, etc.). - Server Errors: The client will retry the request with the same key up to
max_retriestimes, using an exponential backoff strategy.InternalServerErrorexceptions includellm_providerandmodelparameters for precise error attribution. - Key-Specific Errors (Authentication, Quota, etc.): The client records the failure in the
UsageManager, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request. - Escalating Cooldown Strategy: Consecutive failures for a key on the same model result in increasing cooldown periods:
- 1st failure: 10 seconds
- 2nd failure: 30 seconds
- 3rd failure: 60 seconds
- 4th+ failure: 120 seconds
- Key-Level Lockouts: If a key fails on multiple different models (3+ distinct models), the
UsageManagerapplies a global 5-minute lockout for that key, removing it from rotation entirely. - Authentication Errors: Immediate 5-minute global lockout (key is assumed revoked or invalid).
- Streaming Error Resilience: Mid-stream error payloads are parsed robustly -- non-dict error details (e.g., string messages) are wrapped into a dict before field access, preventing
AttributeErrorduring retry handling.
To ensure predictable performance, the client now operates on a strict time budget defined by the global_timeout parameter.
- Deadline Enforcement: When a request starts, a
deadlineis set. The entire process, including all key rotations and retries, must complete before this deadline. - Deadline-Aware Retries: If a retry requires a wait time that would exceed the remaining budget, the wait is skipped, and the client immediately rotates to the next key.
- Silent Internal Errors: Intermittent failures like provider capacity limits or temporary server errors are logged internally but are not raised to the caller. The client will simply rotate to the next key.
Usage tracking data is persisted to disk via BatchedPersistence, which replaces synchronous writes with a background batched writer:
- In-Memory State: State changes are applied immediately in memory and marked as "dirty".
- Background Writer: A background task writes dirty state to disk periodically (default: every 10 seconds).
- Forced Writes: Maximum dirty age (default: 30 seconds) triggers a forced write even if the interval hasn't elapsed.
- Shutdown Safety:
stop()ensures a final write of any pending state before the writer terminates. - Coalesced Updates: Bursty updates are coalesced -- only the latest state is written, reducing redundant I/O.
- Environment Configuration: Intervals can be overridden via
USAGE_PERSISTENCE_WRITE_INTERVALandUSAGE_PERSISTENCE_MAX_DIRTY_AGEenvironment variables.
The specialized UsagePersistenceManager wraps BatchedPersistence specifically for the key_usage.json file.
ReadWriteLock provides an alternative to asyncio.Lock for read-heavy access patterns:
- Multiple Concurrent Readers: Read locks can be held by multiple coroutines simultaneously.
- Exclusive Writer: Write locks require exclusive access (no active readers or writers).
- Reader Batching: Limits the number of consecutive reader batches (default: 8) before yielding to a waiting writer, preventing writer starvation.
- Timeout Support: Both read and write acquisitions support optional timeouts (default: 30 seconds).
- Context Managers:
async with lock.read()andasync with lock.write()for safe acquisition/release.
Used internally for model list lookups, usage queries, and other read-mostly operations where a standard mutex would unnecessarily serialize concurrent reads.
TransactionLogger provides correlated request/response logging for debugging complex interactions:
- Per-Transaction Directory: Each API call gets a unique directory under
logs/transactions/with the formatMMDD_HHMMSS_{format}_{provider}_{model}_{request_id}/. - Client-Level Logging:
request.json(OpenAI-compatible input),response.json(OpenAI-compatible output),streaming_chunks.jsonl(if streaming), andmetadata.json(timing, usage, model, provider). - Provider-Level Logging: A
provider/subdirectory for provider-specific logs (request payload, raw response stream, final response, errors). Providers extendProviderLoggerfor custom logging needs. - Reasoning Capture: Automatically extracts
reasoning_contentfrom responses and includes it in metadata. - Correlation Context:
TransactionContextis passed to providers so their logs share the same directory and request ID as the client-level logs. - Enabled via
enable_request_logging: Set toTrueduringRotatingClientinitialization to activate.
The library applies several performance optimizations to minimize per-request overhead:
__slots__: Critical classes (ResilienceOrchestrator,TransactionLogger,ProviderLogger,TTLDict, etc.) use__slots__to reduce memory footprint and eliminate dynamic attribute dictionary overhead.- Pre-Compiled Regex: Error classification patterns and model matching patterns are compiled once at module load, not recompiled per request.
- Cached Lookups: Model-to-provider resolution, provider instance creation, and model pattern matching are cached with TTL to avoid repeated lookups.
- Lazy Logging Initialization: The failure logger and transaction logger are initialized on first use rather than at import time, reducing startup overhead for applications that don't need logging.
- Hot Path Optimization: The request/retry loop avoids unnecessary object allocations and string formatting in the common (success) path.
ResilienceOrchestrator is a facade that delegates to specialized resilience components:
- Cooldown Manager: Manages per-model and per-key cooldown periods.
- Circuit Breaker: Tracks provider-level failure rates and temporarily blocks requests to failing providers (with half-open probing for recovery).
- IP Throttle Detector: Detects rate limits that apply at the IP level rather than the API key level, enabling provider-level cooldowns instead of key rotation.
- Adaptive Rate Limiter: Token bucket with AIMD (Additive Increase / Multiplicative Decrease) rate adjustment. Proactive per-provider request pacing that decreases rate on 429s and gradually increases on successes. Disabled by default; enable via
ADAPTIVE_RATE_LIMIT_ENABLED=true.
The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to:
- Create a new provider file in
src/rotator_library/providers/(e.g.,my_provider.py). - Implement the
ProviderInterface: Inside your new file, create a class that inherits fromProviderInterfaceand implements theget_modelsmethod. - Optional - Static Model Fallback: Implement
get_static_models()to return a hardcoded list when dynamic discovery fails.
# src/rotator_library/providers/my_provider.py
from .provider_interface import ProviderInterface
from typing import List
import httpx
class MyProvider(ProviderInterface):
def get_static_models(self) -> List[str]:
"""Return hardcoded model list as fallback when API is unreachable."""
return ["my_provider/model-a", "my_provider/model-b"]
async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
# Logic to fetch and return a list of model names
# The credential argument allows using the key to fetch models
# On failure, callers will fall back to get_static_models()
passThe system will automatically discover and register your new provider.
For a more in-depth technical explanation of the library's architecture, including the UsageManager's concurrency model and the error classification system, please refer to the Technical Documentation.