Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion backend/api_v2/rate_limit_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ class RateLimitDefaults:
DEFAULT_GLOBAL_LIMIT = 100 # Concurrent requests system-wide

# TTL and timing
DEFAULT_TTL_HOURS = 6 # Hours to keep execution in ZSET
# DEFAULT_TTL_HOURS is the Redis key TTL — controls when the whole ZSET
# is garbage-collected after the org becomes inactive.
# DEFAULT_STALE_ENTRY_HOURS is the per-entry cutoff — entries older than
# this are removed on every check_and_acquire/get_current_usage call.
# The two are split because a leaked entry (worker OOM/SIGKILL before
# release_slot fires) shouldn't tie up a slot for the full key TTL.
# Override via API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS and
# API_DEPLOYMENT_RATE_LIMIT_STALE_ENTRY_HOURS Django settings.
DEFAULT_TTL_HOURS = 6 # Hours before the ZSET key itself expires
DEFAULT_STALE_ENTRY_HOURS = 1 # Per-entry cutoff for stale releases
DEFAULT_CACHE_TTL_SECONDS = 600 # 10 minutes cache for org limits

# Lock timeouts
Expand Down
26 changes: 23 additions & 3 deletions backend/api_v2/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,38 @@ def _get_org_key(cls, org_id: str) -> str:

@classmethod
def _get_ttl_seconds(cls) -> int:
"""Get TTL in seconds from hours setting."""
"""Get Redis key TTL in seconds from hours setting."""
ttl_hours = getattr(
settings,
"API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS",
RateLimitDefaults.DEFAULT_TTL_HOURS,
)
return ttl_hours * 3600

@classmethod
def _get_stale_entry_seconds(cls) -> int:
"""Get per-entry stale cutoff in seconds.

Shorter than the key TTL so a leaked entry (worker OOM/SIGKILL
before the release path fires) frees its slot within hours rather
than the full key TTL.
"""
stale_hours = getattr(
settings,
"API_DEPLOYMENT_RATE_LIMIT_STALE_ENTRY_HOURS",
RateLimitDefaults.DEFAULT_STALE_ENTRY_HOURS,
)
return stale_hours * 3600

@classmethod
def _get_cutoff_timestamp(cls) -> float:
"""Get timestamp cutoff for removing stale entries."""
return time.time() - cls._get_ttl_seconds()
"""Get timestamp cutoff for removing stale entries.

Uses the stale-entry cutoff (not the key TTL) so leaks caused by
non-graceful worker termination are reaped without waiting for the
whole ZSET key to expire.
"""
return time.time() - cls._get_stale_entry_seconds()

@classmethod
def _cleanup_expired_entries(cls, key: str) -> None:
Expand Down
48 changes: 46 additions & 2 deletions workers/shared/workflow/connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,70 @@

from unstract.connectors.connectorkit import Connectorkit
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)


# Connector IDs that resolve to the system-internal MinIO bucket. These are
# the same bucket the workers already reach via FileSystem(...) — going
# through connectorkit makes us load MinioFS, which used to opt out of
# fsspec's instance cache and leak boto3 sessions under load. We short-circuit
# them to the cached FileSystem path instead.
_SYSTEM_INTERNAL_CONNECTOR_IDS = {
"pcs|b8cd25cd-4452-4d54-bd5e-e7d71459b702", # UnstractCloudStorage
}


class _SystemFileStorageAdapter(UnstractFileSystem):
"""UnstractFileSystem-shaped wrapper around the cached FileSystem path.

Returned in place of a connector instance for system-internal storage,
so call sites that do `connector.get_fsspec_fs()` keep working without
loading the leaky MinioFS code path.
"""

def __init__(self, storage_type: FileStorageType):
super().__init__("SystemFileStorage")
self._file_storage = FileSystem(storage_type).get_file_storage()

def get_fsspec_fs(self) -> Any:
return self._file_storage.fs

def test_credentials(self) -> bool:
return True


def get_connector_instance(
connector_id: str, settings: dict[str, Any]
connector_id: str,
settings: dict[str, Any],
storage_type: FileStorageType = FileStorageType.WORKFLOW_EXECUTION,
) -> UnstractFileSystem:
"""Get a filesystem connector instance.

For system-internal connector IDs (e.g., UnstractCloudStorage), returns
an adapter backed by the cached FileSystem instance instead of loading
the connector class. For all other connector IDs, behaves as before.

Args:
connector_id: Connector ID (e.g., "google_cloud_storage|uuid")
settings: Connector settings/credentials
storage_type: Which system storage role to use when short-circuiting

Returns:
UnstractFileSystem: Instantiated connector
UnstractFileSystem: Instantiated connector or system adapter

Raises:
ValueError: If connector not found or instantiation fails
"""
try:
if connector_id in _SYSTEM_INTERNAL_CONNECTOR_IDS:
logger.debug(
f"Routing system-internal connector {connector_id} through "
f"FileSystem({storage_type.value})"
)
return _SystemFileStorageAdapter(storage_type)

# Use Connectorkit to get connector class
connectorkit = Connectorkit()
connector_class = connectorkit.get_connector_class_by_connector_id(connector_id)
Expand Down