Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/models/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class APIKey(Base):
created_by = Column(String(255), nullable=True)

@classmethod
def generate_key(cls) -> tuple[str, str]:
def generate_key(cls) -> tuple[str, str, str]:
"""
Generate a new API key.

Returns:
tuple: (raw_key, key_hash) where raw_key should be shown to user only once
tuple: (raw_key, key_hash, key_prefix) where raw_key should be shown to user only once
"""
# Generate 32 random bytes (256 bits)
raw_key = secrets.token_urlsafe(32)
Expand Down
4 changes: 4 additions & 0 deletions api/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ class Job(Base):
# Progress tracking
progress = Column(Float, default=0.0)
stage = Column(String, default="queued")
current_stage = Column(String, default="queued") # Alias for compatibility
status_message = Column(String, nullable=True)
fps = Column(Float, nullable=True)
eta_seconds = Column(Integer, nullable=True)
updated_at = Column(DateTime, nullable=True)
processing_stats = Column(JSON, nullable=True)

# Quality metrics
vmaf_score = Column(Float, nullable=True)
Expand Down
6 changes: 3 additions & 3 deletions api/services/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ async def get_job_logs(
# Job creation
logs.append(f"[{job.created_at.isoformat()}] Job created: {job_id}")
logs.append(f"[{job.created_at.isoformat()}] Status: QUEUED")
logs.append(f"[{job.created_at.isoformat()}] Input URL: {job.input_url}")
logs.append(f"[{job.created_at.isoformat()}] Operations: {len(job.operations)} operations requested")
logs.append(f"[{job.created_at.isoformat()}] Input: {job.input_path}")
logs.append(f"[{job.created_at.isoformat()}] Operations: {len(job.operations) if job.operations else 0} operations requested")

# Job parameters
if job.options:
Expand Down Expand Up @@ -85,7 +85,7 @@ async def get_job_logs(
if job.completed_at:
if job.status == JobStatus.COMPLETED:
logs.append(f"[{job.completed_at.isoformat()}] Status: COMPLETED")
logs.append(f"[{job.completed_at.isoformat()}] Output URL: {job.output_url}")
logs.append(f"[{job.completed_at.isoformat()}] Output: {job.output_path}")
logs.append(f"[{job.completed_at.isoformat()}] Processing completed successfully")

# Calculate processing time
Expand Down
9 changes: 9 additions & 0 deletions storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Storage module for managing multiple storage backends.

Supports local filesystem, S3-compatible storage, and other backends.
"""
from storage.base import StorageBackend
from storage.factory import create_storage_backend

__all__ = ["StorageBackend", "create_storage_backend"]
80 changes: 80 additions & 0 deletions storage/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Azure Blob Storage backend.

Placeholder implementation - full implementation requires azure-storage-blob.
"""
from typing import Any, AsyncIterator, Dict, List, Optional, Union

from storage.base import StorageBackend


class AzureStorageBackend(StorageBackend):
"""Azure Blob Storage backend."""

def __init__(self, config: Dict[str, Any]):
"""
Initialize Azure storage backend.

Args:
config: Configuration with:
- container: Azure container name
- connection_string: Azure connection string
- account_name: Storage account name (alternative to connection_string)
- account_key: Storage account key (alternative to connection_string)
"""
super().__init__(config)
self.container = config.get("container")

if not self.container:
raise ValueError("Azure backend requires 'container' in configuration")

# Check for azure-storage-blob
try:
from azure.storage.blob.aio import BlobServiceClient
self._available = True
except ImportError:
self._available = False

async def exists(self, path: str) -> bool:
"""Check if blob exists."""
if not self._available:
raise ImportError("Azure storage requires azure-storage-blob. Install with: pip install azure-storage-blob")
raise NotImplementedError("Azure storage backend not fully implemented")

async def read(self, path: str) -> AsyncIterator[bytes]:
"""Read blob as async iterator."""
if not self._available:
raise ImportError("Azure storage requires azure-storage-blob")
raise NotImplementedError("Azure storage backend not fully implemented")

async def write(self, path: str, data: Union[bytes, AsyncIterator[bytes]]) -> int:
"""Write data to blob."""
if not self._available:
raise ImportError("Azure storage requires azure-storage-blob")
raise NotImplementedError("Azure storage backend not fully implemented")

async def delete(self, path: str) -> bool:
"""Delete a blob."""
if not self._available:
raise ImportError("Azure storage requires azure-storage-blob")
raise NotImplementedError("Azure storage backend not fully implemented")

async def list(self, path: str = "", recursive: bool = False) -> List[str]:
"""List blobs in container."""
if not self._available:
raise ImportError("Azure storage requires azure-storage-blob")
raise NotImplementedError("Azure storage backend not fully implemented")

async def ensure_dir(self, path: str) -> None:
"""Azure doesn't need directory creation."""
pass

async def get_status(self) -> Dict[str, Any]:
"""Get backend status."""
return {
"name": self.name,
"type": "azure",
"container": self.container,
"available": self._available,
"implemented": False,
}
147 changes: 147 additions & 0 deletions storage/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Abstract base class for storage backends.
"""
from abc import ABC, abstractmethod
from typing import Any, AsyncIterator, Dict, List, Optional, Union
from pathlib import Path


class StorageBackend(ABC):
"""Abstract base class for storage backends."""

def __init__(self, config: Dict[str, Any]):
"""
Initialize storage backend.

Args:
config: Backend configuration dictionary
"""
self.config = config
self.name = config.get("name", "unknown")

@abstractmethod
async def exists(self, path: str) -> bool:
"""
Check if a file exists.

Args:
path: File path relative to backend root

Returns:
True if file exists, False otherwise
"""
pass

@abstractmethod
async def read(self, path: str) -> AsyncIterator[bytes]:
"""
Read file contents as an async iterator of chunks.

Args:
path: File path relative to backend root

Yields:
File content chunks as bytes
"""
pass

@abstractmethod
async def write(self, path: str, data: Union[bytes, AsyncIterator[bytes]]) -> int:
"""
Write data to a file.

Args:
path: File path relative to backend root
data: File content as bytes or async iterator of chunks

Returns:
Number of bytes written
"""
pass

@abstractmethod
async def delete(self, path: str) -> bool:
"""
Delete a file.

Args:
path: File path relative to backend root

Returns:
True if deleted, False if not found
"""
pass

@abstractmethod
async def list(self, path: str = "", recursive: bool = False) -> List[str]:
"""
List files in a directory.

Args:
path: Directory path relative to backend root
recursive: Whether to list recursively

Returns:
List of file paths
"""
pass

@abstractmethod
async def ensure_dir(self, path: str) -> None:
"""
Ensure a directory exists, creating it if necessary.

Args:
path: Directory path relative to backend root
"""
pass

async def get_file_info(self, path: str) -> Optional[Dict[str, Any]]:
"""
Get file metadata.

Args:
path: File path relative to backend root

Returns:
Dictionary with file info or None if not found
"""
if not await self.exists(path):
return None
return {
"path": path,
"exists": True,
}

async def get_size(self, path: str) -> int:
"""
Get file size in bytes.

Args:
path: File path relative to backend root

Returns:
File size in bytes
"""
info = await self.get_file_info(path)
return info.get("size", 0) if info else 0

async def get_status(self) -> Dict[str, Any]:
"""
Get backend status.

Returns:
Dictionary with backend status information
"""
return {
"name": self.name,
"type": self.__class__.__name__,
"available": True,
}

async def cleanup(self) -> None:
"""Clean up backend resources."""
pass

def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name}>"
51 changes: 51 additions & 0 deletions storage/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Factory for creating storage backends.
"""
from typing import Any, Dict

from storage.base import StorageBackend


def create_storage_backend(config: Dict[str, Any]) -> StorageBackend:
"""
Create a storage backend from configuration.

Args:
config: Backend configuration dictionary with at least:
- type: Backend type (filesystem, s3, azure, gcs)
- name: Backend name for identification

Returns:
Configured StorageBackend instance

Raises:
ValueError: If backend type is unknown or config is invalid
"""
backend_type = config.get("type", "").lower()

if not backend_type:
raise ValueError("Backend configuration must include 'type'")

if backend_type in ("filesystem", "local", "file"):
from storage.local import LocalStorageBackend
return LocalStorageBackend(config)

elif backend_type in ("s3", "aws", "minio"):
from storage.s3 import S3StorageBackend
return S3StorageBackend(config)

elif backend_type in ("azure", "blob", "azure_blob"):
from storage.azure import AzureStorageBackend
return AzureStorageBackend(config)

elif backend_type in ("gcs", "google", "google_cloud"):
from storage.gcs import GCSStorageBackend
return GCSStorageBackend(config)

elif backend_type in ("nfs", "smb", "cifs", "network"):
# Network storage uses local backend with network path
from storage.local import LocalStorageBackend
return LocalStorageBackend(config)

else:
raise ValueError(f"Unknown storage backend type: {backend_type}")
Loading