Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/api/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID

import httpx
from fastapi import APIRouter, Depends, HTTPException
from kubernetes_asyncio.client.exceptions import ApiException
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
Expand Down Expand Up @@ -386,16 +386,13 @@ async def _collect_compute_usage(namespace: str, vm_name: str) -> tuple[int, int

async def _resolve_volume_stats(
*,
volume_identifier_resolver: Callable[[str], Awaitable[tuple[str, str | None]]],
volume_identifier_resolver: Callable[[str], Awaitable[tuple[UUID, UUID | None]]],
namespace: str,
) -> dict[str, int]:
volume_uuid, _ = await volume_identifier_resolver(namespace)
volume, _ = await volume_identifier_resolver(namespace)

async with create_simplyblock_api() as sb_api:
try:
return await sb_api.volume_iostats(volume_uuid=volume_uuid)
except httpx.HTTPStatusError as exc:
raise VelaSimplyblockAPIError(f"Failed to fetch iostats for volume {volume_uuid}: {exc}") from exc
return await sb_api.volume_iostats(volume=volume)


async def _collect_database_volume_usage(namespace: str) -> tuple[int, int]:
Expand Down
80 changes: 27 additions & 53 deletions src/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from collections.abc import Mapping
from importlib import resources
from typing import TYPE_CHECKING, Annotated, Any, Literal, cast
from uuid import UUID

import asyncpg
import httpx
import yaml
from cloudflare import AsyncCloudflare, CloudflareError
from kubernetes_asyncio import client as kubernetes_client
Expand Down Expand Up @@ -42,6 +42,7 @@
VelaDeploymentError,
VelaGrafanaError,
VelaKubernetesError,
VelaSimplyblockAPIError,
)
from ._util import deployment_namespace
from .deployment import DeploymentParameters, database_image_tag_to_database_images
Expand Down Expand Up @@ -70,6 +71,7 @@
SIMPLYBLOCK_NAMESPACE = "simplyblock"
SIMPLYBLOCK_CSI_CONFIGMAP = "simplyblock-csi-cm"
SIMPLYBLOCK_CSI_SECRET = "simplyblock-csi-secret"
SIMPLYBLOCK_CSI_STORAGE_CLASS = "simplyblock-csi-sc"
STORAGE_PVC_SUFFIX = "-storage-pvc"
DATABASE_PVC_SUFFIX = "-db-pvc"
AUTOSCALER_PVC_SUFFIX = "-block-data"
Expand Down Expand Up @@ -279,49 +281,26 @@ def _build_storage_class_manifest(*, storage_class_name: str, iops: int, base_st
return manifest


async def load_simplyblock_credentials() -> tuple[str, str, str]:
config_map = await kube_service.get_config_map(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_CONFIGMAP)
config_data = (config_map.data or {}).get("config.json")
if not config_data:
raise VelaDeploymentError("ConfigMap simplyblock-csi-cm missing 'config.json'")
async def load_simplyblock_credentials() -> tuple[str, UUID, str, str]:
try:
config = json.loads(config_data)
except (TypeError, ValueError) as exc:
raise VelaDeploymentError("Failed to parse Simplyblock CSI config JSON") from exc

cluster_cfg = config.get("simplybk")
if not isinstance(cluster_cfg, dict):
raise VelaDeploymentError("Simplyblock CSI config missing 'simplybk' section")

endpoint = cluster_cfg.get("ip")
cluster_id = cluster_cfg.get("uuid")
if not endpoint or not cluster_id:
raise VelaDeploymentError("Simplyblock CSI config missing required 'ip' or 'uuid'")

secret = await kube_service.get_secret(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_SECRET)
secret_blob = (secret.data or {}).get("secret.json")
if not secret_blob:
raise VelaDeploymentError("Secret simplyblock-csi-secret missing 'secret.json'")
try:
decoded_secret = base64.b64decode(secret_blob).decode()
except (TypeError, ValueError, UnicodeDecodeError) as exc:
raise VelaDeploymentError("Failed to decode Simplyblock CSI secret") from exc
try:
secret_json = json.loads(decoded_secret)
except (TypeError, ValueError) as exc:
raise VelaDeploymentError("Failed to parse Simplyblock CSI secret JSON") from exc
config_map = await kube_service.get_config_map(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_CONFIGMAP)
config = json.loads(config_map.data.get["config.json"])
cluster_endpoint = config["simplybk"]["ip"].rstrip("/")
cluster_id = UUID(config["simplybk"]["uuid"])

secret_cfg = secret_json.get("simplybk")
if not isinstance(secret_cfg, dict):
raise VelaDeploymentError("Simplyblock CSI secret missing 'simplybk' section")
cluster_secret = secret_cfg.get("secret")
if not cluster_secret:
raise VelaDeploymentError("Simplyblock CSI secret missing 'secret'")
encoded_secret = await kube_service.get_secret(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_SECRET)
secret = json.loads(base64.b64decode(encoded_secret["secret.json"]).decode())
cluster_secret = secret["secret"]

return endpoint.rstrip("/"), cluster_id, cluster_secret
storage_class = await kube_service.get_storage_class(SIMPLYBLOCK_CSI_STORAGE_CLASS)
pool_name = storage_class.parameters["pool_name"]

return cluster_endpoint, cluster_id, cluster_secret, pool_name
except (KeyError, TypeError, ValueError, json.JSONDecodeError, VelaKubernetesError) as e:
raise VelaDeploymentError("Failed to load simplyblock credentials") from e

async def _resolve_volume_identifiers(namespace: str, pvc_name: str) -> tuple[str, str | None]:

async def _resolve_volume_identifiers(namespace: str, pvc_name: str) -> tuple[UUID, UUID | None]:
pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name)
pvc_spec = getattr(pvc, "spec", None)
volume_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None
Expand All @@ -340,40 +319,35 @@ async def _resolve_volume_identifiers(namespace: str, pvc_name: str) -> tuple[st
volume_cluster_id = volume_attributes.get("cluster_id")
if not volume_uuid:
raise VelaDeploymentError(f"PersistentVolume {volume_name} missing 'uuid' attribute in CSI volume attributes")
return volume_uuid, volume_cluster_id
return UUID(volume_uuid), UUID(volume_cluster_id) if volume_cluster_id is not None else None


async def resolve_storage_volume_identifiers(namespace: str) -> tuple[str, str | None]:
async def resolve_storage_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]:
pvc_name = f"{_autoscaler_vm_name()}{STORAGE_PVC_SUFFIX}"
return await _resolve_volume_identifiers(namespace, pvc_name)


async def resolve_autoscaler_volume_identifiers(namespace: str) -> tuple[str, str | None]:
async def resolve_autoscaler_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]:
pvc_name = f"{_autoscaler_vm_name()}{AUTOSCALER_PVC_SUFFIX}"
return await _resolve_volume_identifiers(namespace, pvc_name)


async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None:
namespace = deployment_namespace(branch_id)

volume_uuid, _ = await resolve_autoscaler_volume_identifiers(namespace)
volume, _ = await resolve_autoscaler_volume_identifiers(namespace)
try:
async with create_simplyblock_api() as sb_api:
await sb_api.update_volume(volume_uuid=volume_uuid, payload={"max_rw_iops": iops})
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or exc.response.reason_phrase or str(exc)
raise VelaDeploymentError(
f"Simplyblock volume API rejected IOPS update for volume {volume_uuid!r}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise VelaDeploymentError("Failed to reach Simplyblock volume API") from exc
await sb_api.update_volume(volume=volume, payload={"max_rw_iops": iops})
except VelaSimplyblockAPIError as exc:
raise VelaDeploymentError("Failed to update volume") from exc

logger.info("Updated Simplyblock volume %s IOPS to %s", volume_uuid, iops)
logger.info("Updated Simplyblock volume %s IOPS to %s", volume, iops)


async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str:
storage_class_name = branch_storage_class_name(branch_id)
base_storage_class = await kube_service.get_storage_class("simplyblock-csi-sc")
base_storage_class = await kube_service.get_storage_class(SIMPLYBLOCK_CSI_STORAGE_CLASS)
storage_class_manifest = _build_storage_class_manifest(
storage_class_name=storage_class_name,
iops=iops,
Expand Down
145 changes: 64 additions & 81 deletions src/deployment/simplyblock_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Self
from uuid import UUID

import httpx
Expand All @@ -16,119 +16,102 @@
logger = logging.getLogger(__name__)


class SimplyblockApi:
class SimplyblockPoolApi:
API_TIMEOUT_SECONDS: float = 10.0
STORAGE_POOL_NAME: str = "testing1"

def __init__(
self,
endpoint: str,
cluster_id: str,
cluster_id: UUID,
cluster_secret: str,
pool_name: str,
*,
client: httpx.AsyncClient | None = None,
timeout: float | httpx.Timeout | None = None,
timeout: float | httpx.Timeout = API_TIMEOUT_SECONDS,
) -> None:
self._endpoint = endpoint.rstrip("/")
self._cluster_id = cluster_id
self._cluster_secret = cluster_secret
self._pool_id_cache: dict[str, UUID] = {}
fallback_timeout = client.timeout if client is not None else self.API_TIMEOUT_SECONDS
self._timeout = timeout if timeout is not None else fallback_timeout
self._owns_client = client is None
self._client = client or httpx.AsyncClient(
base_url=self._endpoint,
self._timeout = timeout
self._client: httpx.AsyncClient | None = None

storage_pools_url = f"{endpoint}/api/v2/clusters/{self._cluster_id}/storage-pools/"
try:
response = httpx.get(storage_pools_url, headers=self._headers(), timeout=self._timeout)
response.raise_for_status()
except httpx.HTTPError as e:
raise VelaSimplyblockAPIError("Failed to retrieve storage pools") from e

pool_id = next((UUID(pool["id"]) for pool in response.json() if pool.get("name") == pool_name), None)
if pool_id is None:
raise VelaSimplyblockAPIError(f"Failed to retrieve storage pool {pool_name}")

self._base_url = storage_pools_url + f"{pool_id}/"

async def __aenter__(self) -> Self:
if self._client is not None:
raise RuntimeError("Cannot open instance repeatedly")

self._client = await httpx.AsyncClient(
base_url=self._base_url,
headers=self._headers(),
timeout=self._timeout,
)

async def __aenter__(self) -> SimplyblockApi:
).__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.aclose()

async def aclose(self) -> None:
if self._owns_client:
await self._client.aclose()
if self._client is None:
return

@property
def _cluster_base(self) -> str:
return f"{self._endpoint}/api/v2/clusters/{self._cluster_id}"
await self._client.__aexit__(exc_type, exc_val, exc_tb)
self._client = None

def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._cluster_secret}",
"Accept": "application/json",
}

async def _cluster_pool_base(self) -> str:
pool_id = await self.pool_id()
return f"{self._cluster_base}/storage-pools/{pool_id}"

async def pool(self, name: str | None = None) -> dict[str, Any]:
pool_name = name or self.STORAGE_POOL_NAME
url = f"{self._cluster_base}/storage-pools/"
response = await self._client.get(url, headers=self._headers(), timeout=self._timeout)
response.raise_for_status()

pools = response.json()
if isinstance(pools, list):
for pool in pools:
if isinstance(pool, dict) and pool.get("name") == pool_name:
return pool
raise KeyError(f"Storage pool {pool_name!r} not found")

async def pool_id(self, name: str | None = None) -> UUID:
pool_name = name or self.STORAGE_POOL_NAME
cached = self._pool_id_cache.get(pool_name)
if cached:
return cached
pool = await self.pool(pool_name)
identifier = UUID(str(pool["id"]))
self._pool_id_cache[pool_name] = identifier
return identifier

async def volume_iostats(self, volume_uuid: str) -> dict[str, Any]:
base_url = await self._cluster_pool_base()
url = f"{base_url}/volumes/{volume_uuid}/iostats"
response = await self._client.get(url, headers=self._headers(), timeout=self._timeout)
response.raise_for_status()
payload = response.json()
if len(payload) == 0:
raise VelaSimplyblockAPIError(f"Empty iostats payload for volume {volume_uuid}")
return payload[0]
async def _get(self, url) -> dict | list:
if self._client is None:
raise RuntimeError("Cannot use unopened instance")

try:
response = await self._client.get(url)
response.raise_for_status()
result = response.json()
except httpx.HTTPError as e:
raise VelaSimplyblockAPIError("Request failed") from e

return result

async def _put(self, url, data) -> None:
if self._client is None:
raise RuntimeError("Cannot use unopened instance")

try:
response = await self._client.put(url, json=data)
response.raise_for_status()
except httpx.HTTPError as e:
raise VelaSimplyblockAPIError("Request failed") from e

async def volume_iostats(self, volume: UUID) -> dict[str, Any]:
iostats = await self._get(f"volumes/{volume}/iostats")
if len(iostats) == 0:
raise VelaSimplyblockAPIError(f"Empty iostats payload for volume {volume}")
return iostats[0]

async def update_volume(
self,
volume_uuid: str,
volume: UUID,
payload: dict[str, Any],
) -> None:
headers = self._headers()
headers["Content-Type"] = "application/json"
base_url = await self._cluster_pool_base()
url = f"{base_url}/volumes/{volume_uuid}/"
response = await self._client.put(
url,
headers=headers,
json=payload,
timeout=self._timeout,
)
response.raise_for_status()
await self._put(f"volumes/{volume}/", data=payload)


@asynccontextmanager
async def create_simplyblock_api(
client: httpx.AsyncClient | None = None,
) -> AsyncIterator[SimplyblockApi]:
async def create_simplyblock_api() -> AsyncIterator[SimplyblockPoolApi]:
from . import load_simplyblock_credentials

endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials()
api = SimplyblockApi(
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
client=client,
)
api = SimplyblockPoolApi(*(await load_simplyblock_credentials()))
async with api:
yield api