Skip to content
Merged
3 changes: 2 additions & 1 deletion tests/integration/endpoints/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from workerfacing_api.core.filesystem import FileSystem, LocalFilesystem, S3Filesystem
from workerfacing_api.core.queue import RDSJobQueue
from workerfacing_api.crud import job_tracking
from workerfacing_api.exceptions import JobDeletedException
from workerfacing_api.schemas.queue_jobs import (
AppSpecs,
EnvironmentTypes,
Expand Down Expand Up @@ -277,7 +278,7 @@ def test_put_job_status_canceled(
client.get(self.endpoint, params={"memory": 1})

def mock_update_job(*args: Any, **kwargs: Any) -> None:
raise ValueError("Job not found")
raise JobDeletedException("Job not found")

monkeypatch.setattr(job_tracking, "update_job", mock_update_job)
res = client.put(f"{self.endpoint}/1/status", params={"status": "running"})
Expand Down
1 change: 1 addition & 0 deletions tests/unit/crud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty init file
27 changes: 27 additions & 0 deletions tests/unit/crud/test_job_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Tests for job_tracking module."""

from unittest.mock import MagicMock, patch

import pytest

from workerfacing_api.crud.job_tracking import update_job
from workerfacing_api.exceptions import JobDeletedException
from workerfacing_api.schemas.rds_models import JobStates


@patch("workerfacing_api.crud.job_tracking.requests.put")
def test_update_job_raises_job_deleted_exception(mock_put: MagicMock) -> None:
"""Test that update_job raises JobDeletedException on 404 response."""
# Mock a 404 response
mock_response = MagicMock()
mock_response.status_code = 404
mock_put.return_value = mock_response

job_id = 789

with pytest.raises(JobDeletedException) as exc_info:
update_job(job_id, JobStates.running)

assert f"Job {job_id} not found; it was probably deleted by the user." in str(
exc_info.value
)
41 changes: 25 additions & 16 deletions workerfacing_api/core/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from workerfacing_api import settings
from workerfacing_api.crud import job_tracking
from workerfacing_api.exceptions import JobDeletedException, JobNotAssignedException
from workerfacing_api.schemas.queue_jobs import (
EnvironmentTypes,
JobFilter,
Expand Down Expand Up @@ -457,7 +458,7 @@ def pop(self, environment: EnvironmentTypes, receipt_handle: str) -> bool:
job.workers = ";".join(job.workers.split(";") + [hostname])
try:
self._update_job_status(session, job, status=JobStates.pulled)
except ValueError:
except JobDeletedException:
# job probably deleted by user
return False
return True
Expand All @@ -484,7 +485,7 @@ def get_job(
if hostname:
workers = job.workers.split(";")
if not workers or hostname != workers[-1]:
raise ValueError(
raise JobNotAssignedException(
f"Job with id {job_id} is not assigned to worker {hostname}"
)
return job
Expand All @@ -505,11 +506,11 @@ def _update_job_status(
job_id = job.job["meta"]["job_id"]
assert isinstance(job_id, int)
job_tracking.update_job(job_id, status, runtime_details)
except ValueError as e:
except JobDeletedException as e:
# job probably deleted by user
session.delete(job)
session.commit()
raise ValueError(f"Could not update job, probably deleted by user: {e}")
raise e from e

def update_job_status(
self,
Expand Down Expand Up @@ -545,19 +546,27 @@ def handle_timeouts(
# TODO: increase priority?
job.num_retries += 1
session.add(job)
self.update_job_status(
job.id,
JobStates.queued,
f"timeout {job.num_retries} (workers tried: {job.workers})",
)
n_retry += 1
try:
self.update_job_status(
job.id,
JobStates.queued,
f"timeout {job.num_retries} (workers tried: {job.workers})",
)
n_retry += 1
except JobDeletedException:
# job probably deleted by user, skip updating status
pass
jobs_failed = jobs_timeout.filter(QueuedJob.num_retries >= max_retries)
for job in jobs_failed:
self.update_job_status(
job.id,
JobStates.error,
"max retries reached",
)
n_failed += 1
try:
self.update_job_status(
job.id,
JobStates.error,
"max retries reached",
)
n_failed += 1
except JobDeletedException:
# job probably deleted by user, skip updating status
pass
session.commit()
return n_retry, n_failed
3 changes: 2 additions & 1 deletion workerfacing_api/crud/job_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fastapi.encoders import jsonable_encoder

import workerfacing_api.settings as settings
from workerfacing_api.exceptions import JobDeletedException
from workerfacing_api.schemas.rds_models import JobStates


Expand All @@ -19,7 +20,7 @@ def update_job(
headers={"x-api-key": settings.internal_api_key_secret},
)
if resp.status_code == 404:
raise ValueError(
raise JobDeletedException(
f"Job {job_id} not found; it was probably deleted by the user."
)
resp.raise_for_status()
3 changes: 2 additions & 1 deletion workerfacing_api/endpoints/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from workerfacing_api.core.filesystem import FileSystem
from workerfacing_api.core.queue import RDSJobQueue
from workerfacing_api.dependencies import filesystem_dep, queue_dep
from workerfacing_api.exceptions import JobDeletedException, JobNotAssignedException
from workerfacing_api.schemas.files import FileHTTPRequest
from workerfacing_api.schemas.queue_jobs import (
EnvironmentTypes,
Expand Down Expand Up @@ -111,7 +112,7 @@ async def put_job_status(
hostname = request.state.current_user.username
try:
queue.update_job_status(job_id, status, runtime_details, hostname=hostname)
except ValueError:
except (JobDeletedException, JobNotAssignedException):
# acts as a "cancel job" signal to worker
raise HTTPException(status_code=httpstatus.HTTP_404_NOT_FOUND)

Expand Down
13 changes: 13 additions & 0 deletions workerfacing_api/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Custom exceptions for the workerfacing API."""


class JobDeletedException(Exception):
"""Exception raised when a job has been deleted by the user."""

pass


class JobNotAssignedException(Exception):
"""Exception raised when a job is not assigned to the current user."""

pass