Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 109 additions & 6 deletions src/osekit/utils/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
jobs, with writting/submitting of PBS files.

"""
from __future__ import annotations

import subprocess
from dataclasses import dataclass
Expand Down Expand Up @@ -307,15 +308,35 @@ def write_pbs(self, path: Path) -> None:
self.path = path
self.progress()

def submit_pbs(self) -> None:
"""Submit the PBS file of the job to a PBS queueing system."""
def submit_pbs(self, dependency: Job | list[Job] | str | list[str] | None = None) -> None:
"""Submit the PBS file of the job to a PBS queueing system.

Parameters
----------
dependency: Job | list[Job] | str | None
Job dependency. Can be:
- A Job instance: will wait for that job to complete successfully
- A list of Job instances: will wait for all jobs to complete successfully
- A string: job ID (e.g., "12345.datarmor") or dependency specification
- None: no dependency

"""
if self.update_status() is not JobStatus.PREPARED:
msg = "Job should be written before being submitted."
raise ValueError(msg)

cmd = ["qsub"]

if dependency is not None:
dependency_str = self._build_dependency_string(dependency)
if dependency_str:
cmd.extend(["-W", f"depend={dependency_str}"])

cmd.append(str(self.path))

try:
request = subprocess.run(
["qsub", self.path],
cmd,
capture_output=True,
text=True,
check=False,
Expand All @@ -327,6 +348,72 @@ def submit_pbs(self) -> None:
self.job_id = request.stdout.split(".", maxsplit=1)[0].strip()
self.progress()

_VALID_DEPENDENCY_TYPES = {"afterok", "afterany", "afternotok", "after"}

@staticmethod
def _validate_dependency_type(dependency_type: str) -> None:
if dependency_type not in Job._VALID_DEPENDENCY_TYPES:
raise ValueError(
f"Unsupported dependency type '{dependency_type}'. "
f"Expected one of {sorted(Job._VALID_DEPENDENCY_TYPES)}."
)

@staticmethod
def _validate_dependency(dependency: list[str] | list[Job]) -> list[str]:
job_ids = [dep.job_id if isinstance(dep, Job) else dep for dep in dependency]
for job_id in job_ids:
if not job_id.isdigit() or len(job_id)!=7:
raise ValueError(
f"Invalid job ID '{job_id}'. Job IDs must be 7 digits long."
)
return job_ids

@staticmethod
def _build_dependency_string(
dependency: str | Job | list[str] | list[Job],
dependency_type: str = "afterok",
) -> str:
"""Build a PBS dependency string.

Parameters
----------
dependency: Job | str
Job or job ID to depend on.
dependency_type: str
Type of dependency (afterok, afterany, afternotok, after).

Returns
-------
str
PBS dependency string.

Examples
--------
>>> Job._build_dependency_string("1234567")
'afterok:1234567'
>>> Job._build_dependency_string(["1234567", "4567891"])
'afterok:1234567:4567891'
>>> Job._build_dependency_string("7894561", dependency_type="afterany")
'afterany:7894651'

"""
dependency = dependency if isinstance(dependency, list) else [dependency]
id_str = Job._validate_dependency(dependency)
Job._validate_dependency_type(dependency_type)

if unsubmitted_job := next(
(
j
for j in dependency
if isinstance(j, Job) and j.status.value < JobStatus.QUEUED.value
),
None,
):
msg = f"Job '{unsubmitted_job.name}' has not been submitted yet."
raise ValueError(msg)

return f"{dependency_type}:{':'.join(id_str)}"

def update_info(self) -> None:
"""Request info about the job and update it."""
if self.job_id is None:
Expand Down Expand Up @@ -443,9 +530,25 @@ def create_job(
job.write_pbs(output_folder / f"{name}.pbs")
self.jobs.append(job)

def submit_pbs(self) -> None:
"""Submit all repared jobs to the PBS queueing system."""
def submit_pbs(
self, dependencies: dict[str, "Job | list[Job]"] | None = None
) -> None:
"""Submit all prepared jobs to the PBS queueing system.

Parameters
----------
dependencies: dict[str, Job | list[Job]] | None
Optional dictionary mapping job names to their dependencies.
Example: {"job2": job1, "job3": [job1, job2]}

"""
for job in self.jobs:
if job.update_status() is not JobStatus.PREPARED:
continue
job.submit_pbs()

# Check if this job has dependencies
depend_on = None
if dependencies and job.name in dependencies:
depend_on = dependencies[job.name]

job.submit_pbs(dependency=depend_on)
171 changes: 170 additions & 1 deletion tests/test_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations

import subprocess
from contextlib import nullcontext
from pathlib import Path

import pytest
Expand Down Expand Up @@ -414,7 +417,7 @@ def __init__(self, name: str, status: JobStatus) -> None:
self.name = name
self.status = status

def submit_pbs(self) -> None:
def submit_pbs(self, dependency=None) -> None:
submitted_jobs.append(self.name)

def update_status(self) -> JobStatus:
Expand All @@ -436,3 +439,169 @@ def update_status(self) -> JobStatus:
job_builder.submit_pbs()

assert submitted_jobs == ["prepared"]


@pytest.mark.parametrize(
("dependency", "ids", "status", "expected"),
[
pytest.param(
["1234567"],
[None],
[None],
nullcontext("afterok:1234567"),
id="single_job_id",
),
pytest.param(
["1234567", "4567891", "7891234"],
[None] * 3,
[None] * 3,
nullcontext("afterok:1234567:4567891:7891234"),
id="multiple_job_ids",
),
pytest.param(
["123"],
[None],
[None],
pytest.raises(
ValueError,
match=r"Invalid job ID '123'\. Job IDs must be 7 digits long\.",
),
id="invalid_job_id_too_short",
),
pytest.param(
[Job(script_path=Path("test.py"), name="job_1")],
["12345678"],
[JobStatus.QUEUED],
pytest.raises(
ValueError,
match=r"Invalid job ID '12345678'\. Job IDs must be 7 digits long\.",
),
id="invalid_job_id_too_long",
),
pytest.param(
["abcdefg"],
[None],
[None],
pytest.raises(
ValueError,
match=r"Invalid job ID 'abcdefg'\. Job IDs must be 7 digits long\.",
),
id="invalid_job_id_non_numeric",
),
pytest.param(
["1234567", "not_a_job_id"],
[None] * 2,
[None] * 2,
pytest.raises(
ValueError,
match=r"Invalid job ID 'not_a_job_id'\. Job IDs must be 7 digits long\.",
),
id="multiple_job_id_one_invalid",
),
pytest.param(
[Job(script_path=Path("test.py"), name="job_1")],
["1234567"],
[JobStatus.QUEUED],
nullcontext("afterok:1234567"),
id="single_job_instance",
),
pytest.param(
[
Job(script_path=Path("horse_with.py"), name="job_1"),
Job(script_path=Path("no_name.py"), name="job_2"),
],
["1234567", "4567891"],
[JobStatus.QUEUED, JobStatus.QUEUED],
nullcontext("afterok:1234567:4567891"),
id="multiple_job_instance",
),
pytest.param(
[
Job(script_path=Path("king_crimson.py"), name="job_1"),
Job(script_path=Path("crimson_king.py"), name="job_2"),
],
["1234567", "not_an_id"],
[JobStatus.QUEUED, JobStatus.QUEUED],
pytest.raises(
ValueError,
match=r"Invalid job ID 'not_an_id'\. Job IDs must be 7 digits long\.",
),
id="multiple_job_instance_invalid_one",
),
pytest.param(
[
Job(script_path=Path("king_crimson.py"), name="job_1"),
"9876543",
],
["1234567", None],
[JobStatus.QUEUED, None],
nullcontext("afterok:1234567:9876543"),
id="job_and_string_input",
),
pytest.param(
[Job(script_path=Path("test.py"), name="tornero")],
["1234567"],
[JobStatus.UNPREPARED],
pytest.raises(
ValueError,
match="Job 'tornero' has not been submitted yet.",
),
id="unprepared_job_instance",
),
pytest.param(
[
Job(script_path=Path("script.py"), name="dalida"),
Job(script_path=Path("script.py"), name="mourir_sur_scene"),
],
["1234567", "4567896"],
[JobStatus.QUEUED, JobStatus.PREPARED],
pytest.raises(
ValueError,
match="Job 'mourir_sur_scene' has not been submitted yet.",
),
id="multiple_job_instance_one_not_submitted",
),
],
)

def test_build_dependency_string_with_string_input(
dependency: list[str] | list[Job],
ids: list[str] | None,
status: list[JobStatus],
expected: str | None,
) -> None:
"""Test building dependency string from string and Job inputs."""
for dep, id, st in zip(dependency, ids, status, strict=True):
if isinstance(dep, Job):
dep.status = st
dep.job_id = id

with expected as e:
assert Job._build_dependency_string(dependency) == e


@pytest.mark.parametrize(
("dependency_type", "expected"),
[
pytest.param("afterok", nullcontext("afterok:1234567"), id="afterok"),
pytest.param("afterany", nullcontext("afterany:1234567"), id="afterany"),
pytest.param("afternotok", nullcontext("afternotok:1234567"), id="afternotok"),
pytest.param("after", nullcontext("after:1234567"), id="after"),
pytest.param(
"not_a_supported_type",
pytest.raises(
ValueError,
match=r"Unsupported dependency type 'not_a_supported_type'\. Expected one of \['after', 'afterany', 'afternotok', 'afterok'\]\.",
),
id="invalid_dependency_type",
),
],
)

def test_build_dependency_string_with_different_types(
dependency_type: str,
expected: type[Exception],
) -> None:
"""Test building dependency strings with different dependency types."""
with expected as e:
assert Job._build_dependency_string("1234567", dependency_type) == e