Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions frontend/src/pages/Events/List/hooks/useColumnDefinitions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ export const useColumnsDefinitions = () => {
</div>
);

case 'volume':
return (
<div>
Volume{' '}
{target.project_name && (
<NavigateLink href={ROUTES.PROJECT.DETAILS.FORMAT(target.project_name)}>
{target.project_name}
</NavigateLink>
)}
/{target.name}
</div>
);

default:
return '---';
}
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/pages/Events/List/hooks/useFilters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type RequestParamsKeys = keyof Pick<
| 'target_instances'
| 'target_runs'
| 'target_jobs'
| 'target_volumes'
| 'within_projects'
| 'within_fleets'
| 'within_runs'
Expand All @@ -31,6 +32,7 @@ const filterKeys: Record<string, RequestParamsKeys> = {
TARGET_INSTANCES: 'target_instances',
TARGET_RUNS: 'target_runs',
TARGET_JOBS: 'target_jobs',
TARGET_VOLUMES: 'target_volumes',
WITHIN_PROJECTS: 'within_projects',
WITHIN_FLEETS: 'within_fleets',
WITHIN_RUNS: 'within_runs',
Expand All @@ -47,6 +49,7 @@ const multipleChoiseKeys: RequestParamsKeys[] = [
'target_instances',
'target_runs',
'target_jobs',
'target_volumes',
'within_projects',
'within_fleets',
'within_runs',
Expand All @@ -61,6 +64,7 @@ const targetTypes = [
{ label: 'Instance', value: 'instance' },
{ label: 'Run', value: 'run' },
{ label: 'Job', value: 'job' },
{ label: 'Volume', value: 'volume' },
];

export const useFilters = () => {
Expand Down Expand Up @@ -153,6 +157,11 @@ export const useFilters = () => {
operators: ['='],
propertyLabel: 'Target jobs',
},
{
key: filterKeys.TARGET_VOLUMES,
operators: ['='],
propertyLabel: 'Target volumes',
},

{
key: filterKeys.WITHIN_PROJECTS,
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/types/event.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare type TEventTargetType = 'project' | 'user' | 'fleet' | 'instance' | 'run' | 'job';
declare type TEventTargetType = 'project' | 'user' | 'fleet' | 'instance' | 'run' | 'job' | 'volume';

declare type TEventListRequestParams = Omit<TBaseRequestListParams, 'prev_created_at'> & {
prev_recorded_at?: string;
Expand All @@ -8,6 +8,7 @@ declare type TEventListRequestParams = Omit<TBaseRequestListParams, 'prev_create
target_instances?: string[];
target_runs?: string[];
target_jobs?: string[];
target_volumes?: string[];
within_projects?: string[];
within_fleets?: string[];
within_runs?: string[];
Expand All @@ -16,7 +17,7 @@ declare type TEventListRequestParams = Omit<TBaseRequestListParams, 'prev_create
};

declare interface IEventTarget {
type: 'project' | 'user' | 'fleet' | 'instance' | 'run' | 'job';
type: 'project' | 'user' | 'fleet' | 'instance' | 'run' | 'job' | 'volume';
project_id?: string;
project_name?: string;
id: string;
Expand Down
12 changes: 12 additions & 0 deletions src/dstack/_internal/cli/commands/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ def _register(self):
dest="target_runs",
help="Only show events that target the specified runs",
)
target_filters_group.add_argument(
"--target-volume",
action="append",
metavar="NAME",
dest="target_volumes",
help="Only show events that target the specified volumes",
)
within_filters_group = parser.add_mutually_exclusive_group()
within_filters_group.add_argument(
"--within-fleet",
Expand Down Expand Up @@ -108,6 +115,11 @@ def _build_filters(args: argparse.Namespace, api: Client) -> EventListFilters:
filters.target_runs = [
api.client.runs.get(api.project, name).id for name in args.target_runs
]
elif args.target_volumes:
filters.target_volumes = [
api.client.volumes.get(project_name=api.project, name=name).id
for name in args.target_volumes
]

if args.within_fleets:
filters.within_fleets = [
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/cli/services/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
class EventListFilters:
target_fleets: Optional[list[uuid.UUID]] = None
target_runs: Optional[list[uuid.UUID]] = None
target_volumes: Optional[list[uuid.UUID]] = None
within_projects: Optional[list[uuid.UUID]] = None
within_fleets: Optional[list[uuid.UUID]] = None
within_runs: Optional[list[uuid.UUID]] = None
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/core/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class EventTargetType(str, Enum):
INSTANCE = "instance"
RUN = "run"
JOB = "job"
VOLUME = "volume"


class EventTarget(CoreModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dstack._internal.server.db import get_db, get_session_ctx
from dstack._internal.server.models import ProjectModel, UserModel, VolumeModel
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import events
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.volumes import (
get_volume_configuration,
Expand Down Expand Up @@ -100,8 +101,12 @@ async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]

volume_model.deleted = True
volume_model.deleted_at = get_current_datetime()

logger.info("Deleted idle volume %s", volume_model.name)
events.emit(
session=session,
message="Volume deleted due to exceeding auto_cleanup_duration",
actor=events.SystemActor(),
targets=[events.Target.from_model(volume_model)],
)

await session.commit()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import volumes as volumes_services
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.volumes import switch_volume_status
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime, run_async
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -79,8 +80,8 @@ async def _process_submitted_volume(session: AsyncSession, volume_model: VolumeM
volume.name,
volume.configuration.backend.value,
)
volume_model.status = VolumeStatus.FAILED
volume_model.status_message = "Backend not available"
switch_volume_status(session, volume_model, VolumeStatus.FAILED)
volume_model.last_processed_at = get_current_datetime()
await session.commit()
return
Expand All @@ -102,18 +103,18 @@ async def _process_submitted_volume(session: AsyncSession, volume_model: VolumeM
)
except BackendError as e:
logger.info("Failed to create volume %s: %s", volume_model.name, repr(e))
volume_model.status = VolumeStatus.FAILED
status_message = f"Backend error: {repr(e)}"
if len(e.args) > 0:
status_message = str(e.args[0])
volume_model.status_message = status_message
switch_volume_status(session, volume_model, VolumeStatus.FAILED)
volume_model.last_processed_at = get_current_datetime()
await session.commit()
return
except Exception as e:
logger.exception("Got exception when creating volume %s", volume_model.name)
volume_model.status = VolumeStatus.FAILED
volume_model.status_message = f"Unexpected error: {repr(e)}"
switch_volume_status(session, volume_model, VolumeStatus.FAILED)
volume_model.last_processed_at = get_current_datetime()
await session.commit()
return
Expand All @@ -123,6 +124,6 @@ async def _process_submitted_volume(session: AsyncSession, volume_model: VolumeM
# Provisioned volumes marked as active since they become available almost immediately in AWS
# TODO: Consider checking volume state
volume_model.volume_provisioning_data = vpd.json()
volume_model.status = VolumeStatus.ACTIVE
switch_volume_status(session, volume_model, VolumeStatus.ACTIVE)
volume_model.last_processed_at = get_current_datetime()
await session.commit()
1 change: 1 addition & 0 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ class VolumeModel(BaseModel):
deleted: Mapped[bool] = mapped_column(Boolean, default=False)
deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)

# NOTE: `status` must be changed only via `switch_volume_status()`
status: Mapped[VolumeStatus] = mapped_column(EnumAsString(VolumeStatus, 100), index=True)
status_message: Mapped[Optional[str]] = mapped_column(Text)

Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/routers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def list_events(
target_instances=body.target_instances,
target_runs=body.target_runs,
target_jobs=body.target_jobs,
target_volumes=body.target_volumes,
within_projects=body.within_projects,
within_fleets=body.within_fleets,
within_runs=body.within_runs,
Expand Down
6 changes: 4 additions & 2 deletions src/dstack/_internal/server/routers/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,7 @@ async def delete_volumes(
"""
Deletes one or more volumes.
"""
_, project = user_project
await volumes_services.delete_volumes(session=session, project=project, names=body.names)
user, project = user_project
await volumes_services.delete_volumes(
session=session, project=project, names=body.names, user=user
)
11 changes: 11 additions & 0 deletions src/dstack/_internal/server/schemas/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ class ListEventsRequest(CoreModel):
max_items=MAX_FILTER_ITEMS,
),
] = None
target_volumes: Annotated[
Optional[list[uuid.UUID]],
Field(
description=(
"List of volume IDs."
" The response will only include events that target the specified volumes"
),
min_items=MIN_FILTER_ITEMS,
max_items=MAX_FILTER_ITEMS,
),
] = None
within_projects: Annotated[
Optional[list[uuid.UUID]],
Field(
Expand Down
17 changes: 17 additions & 0 deletions src/dstack/_internal/server/services/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ProjectModel,
RunModel,
UserModel,
VolumeModel,
)
from dstack._internal.server.services.logging import fmt_entity
from dstack._internal.utils.common import get_current_datetime
Expand Down Expand Up @@ -91,6 +92,7 @@ def from_model(
ProjectModel,
RunModel,
UserModel,
VolumeModel,
],
) -> "Target":
if isinstance(model, FleetModel):
Expand Down Expand Up @@ -135,6 +137,13 @@ def from_model(
id=model.id,
name=model.name,
)
if isinstance(model, VolumeModel):
return Target(
type=EventTargetType.VOLUME,
project_id=model.project_id or model.project.id,
id=model.id,
name=model.name,
)
raise ValueError(f"Unsupported model type: {type(model)}")

def fmt(self) -> str:
Expand Down Expand Up @@ -212,6 +221,7 @@ async def list_events(
target_instances: Optional[list[uuid.UUID]],
target_runs: Optional[list[uuid.UUID]],
target_jobs: Optional[list[uuid.UUID]],
target_volumes: Optional[list[uuid.UUID]],
within_projects: Optional[list[uuid.UUID]],
within_fleets: Optional[list[uuid.UUID]],
within_runs: Optional[list[uuid.UUID]],
Expand Down Expand Up @@ -281,6 +291,13 @@ async def list_events(
EventTargetModel.entity_id.in_(target_jobs),
)
)
if target_volumes is not None:
target_filters.append(
and_(
EventTargetModel.entity_type == EventTargetType.VOLUME,
EventTargetModel.entity_id.in_(target_volumes),
)
)
if within_projects is not None:
target_filters.append(EventTargetModel.entity_project_id.in_(within_projects))
if within_fleets is not None:
Expand Down
48 changes: 36 additions & 12 deletions src/dstack/_internal/server/services/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timedelta
from typing import List, Optional

from sqlalchemy import and_, func, or_, select, update
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload

Expand Down Expand Up @@ -33,6 +33,7 @@
VolumeModel,
)
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import events
from dstack._internal.server.services.instances import get_instance_provisioning_data
from dstack._internal.server.services.locking import (
get_locker,
Expand All @@ -46,6 +47,24 @@
logger = get_logger(__name__)


def switch_volume_status(
session: AsyncSession,
volume_model: VolumeModel,
new_status: VolumeStatus,
actor: events.AnyActor = events.SystemActor(),
):
old_status = volume_model.status
if old_status == new_status:
return

volume_model.status = new_status

msg = f"Volume status changed {old_status.upper()} -> {new_status.upper()}"
if volume_model.status_message is not None:
msg += f" ({volume_model.status_message})"
events.emit(session, msg, actor=actor, targets=[events.Target.from_model(volume_model)])


async def list_volumes(
session: AsyncSession,
user: UserModel,
Expand Down Expand Up @@ -245,11 +264,19 @@ async def create_volume(
attachments=[],
)
session.add(volume_model)
events.emit(
session,
message=f"Volume created. Status: {volume_model.status.upper()}",
actor=events.UserActor.from_user(user),
targets=[events.Target.from_model(volume_model)],
)
await session.commit()
return volume_model_to_volume(volume_model)


async def delete_volumes(session: AsyncSession, project: ProjectModel, names: List[str]):
async def delete_volumes(
session: AsyncSession, project: ProjectModel, names: List[str], user: UserModel
):
res = await session.execute(
select(VolumeModel).where(
VolumeModel.project_id == project.id,
Expand Down Expand Up @@ -287,17 +314,14 @@ async def delete_volumes(session: AsyncSession, project: ProjectModel, names: Li
await _delete_volume(session=session, project=project, volume_model=volume_model)
except Exception:
logger.exception("Error when deleting volume %s", volume_model.name)
await session.execute(
update(VolumeModel)
.where(
VolumeModel.project_id == project.id,
VolumeModel.id.in_(volumes_ids),
)
.values(
deleted=True,
deleted_at=common.get_current_datetime(),
volume_model.deleted = True
volume_model.deleted_at = common.get_current_datetime()
events.emit(
session,
message="Volume deleted",
actor=events.UserActor.from_user(user),
targets=[events.Target.from_model(volume_model)],
)
)
await session.commit()


Expand Down
Loading