Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions alts/shared/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
'ConfigNotFoundError',
'DBUpdateError',
'InstallPackageError',
'OpenNebulaQuotaExceededError',
'OpennebulaVMStopError',
'ProvisionError',
'PublishArtifactsError',
Expand Down Expand Up @@ -92,3 +93,8 @@ class SystemInfoCmdError(ALTSBaseError):

class OpennebulaVMStopError(ALTSBaseError):
pass


class OpenNebulaQuotaExceededError(ALTSBaseError):
"""Raised when OpenNebula quota capacity is insufficient."""
pass
4 changes: 4 additions & 0 deletions alts/shared/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ class OpennebulaConfig(BaseModel):
default_vm_disk_size: Optional[int] = 15360
default_vm_ram_size: Optional[int] = 1536
network: Optional[str] = None
# Quota checking configuration
quota_check_enabled: bool = False
quota_safety_margin: float = 0.1
quota_cache_ttl: int = 30


class RabbitmqBrokerConfig(BaseBrokerConfig):
Expand Down
133 changes: 133 additions & 0 deletions alts/worker/quota_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# -*- mode:python; coding:utf-8; -*-

"""OpenNebula quota cache for Celery worker processes."""

import logging
import time
from dataclasses import dataclass
from threading import Lock
from typing import Optional

__all__ = ['OpenNebulaQuotaCache', 'QuotaInfo']


@dataclass
class QuotaInfo:
"""OpenNebula quota information."""
vms_used: int
vms_limit: int
cpu_used: float
cpu_limit: float
memory_used: int
memory_limit: int
disk_used: int
disk_limit: int
timestamp: float


class OpenNebulaQuotaCache:
"""
Singleton cache for OpenNebula quota information.

This cache is per-worker process and uses TTL-based expiration.
Thread-safe for concurrent task access within a worker.
"""
_instance: Optional['OpenNebulaQuotaCache'] = None
_lock = Lock()

def __new__(cls, ttl: int = 30):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cache = {}
cls._instance._ttl = ttl
cls._instance._logger = logging.getLogger(__name__)
return cls._instance

def get(self, group_id: int) -> Optional[QuotaInfo]:
"""
Get cached quota info for a group if not expired.

Parameters
----------
group_id : int
OpenNebula group ID.

Returns
-------
Optional[QuotaInfo]
Cached quota info or None if not cached or expired.
"""
with self._lock:
if group_id not in self._cache:
return None
quota = self._cache[group_id]
if self._is_expired(quota):
self._logger.debug(
'Quota cache expired for group %d', group_id
)
del self._cache[group_id]
return None
return quota

def set(self, group_id: int, quota: QuotaInfo):
"""
Store quota info in cache with current timestamp.

Parameters
----------
group_id : int
OpenNebula group ID.
quota : QuotaInfo
Quota information to cache.
"""
with self._lock:
quota.timestamp = time.time()
self._cache[group_id] = quota
self._logger.debug(
'Cached quota for group %d: VMs %d/%d, CPU %.1f/%.1f, '
'Memory %d/%d MB, Disk %d/%d MB',
group_id,
quota.vms_used, quota.vms_limit,
quota.cpu_used, quota.cpu_limit,
quota.memory_used, quota.memory_limit,
quota.disk_used, quota.disk_limit
)

def _is_expired(self, quota: QuotaInfo) -> bool:
"""Check if cached quota entry has expired."""
return time.time() - quota.timestamp > self._ttl

def invalidate(self, group_id: int):
"""
Remove cached quota for a specific group.

Parameters
----------
group_id : int
OpenNebula group ID.
"""
with self._lock:
if group_id in self._cache:
del self._cache[group_id]
self._logger.debug(
'Invalidated quota cache for group %d', group_id
)

def clear(self):
"""Clear all cached quota entries."""
with self._lock:
self._cache.clear()
self._logger.debug('Cleared all quota cache entries')

def set_ttl(self, ttl: int):
"""
Update the cache TTL.

Parameters
----------
ttl : int
New TTL in seconds.
"""
with self._lock:
self._ttl = ttl
Loading
Loading