Skip to content

feat(JobWrapper): Report the status of the job to DiracX#111

Open
Loxeris wants to merge 11 commits intoDIRACGrid:mainfrom
Loxeris:diracx-job-statuses
Open

feat(JobWrapper): Report the status of the job to DiracX#111
Loxeris wants to merge 11 commits intoDIRACGrid:mainfrom
Loxeris:diracx-job-statuses

Conversation

@Loxeris
Copy link
Member

@Loxeris Loxeris commented Feb 9, 2026

Reports the status of the job to DiracX using diracx-api.

closes #83

@Loxeris Loxeris force-pushed the diracx-job-statuses branch from 71983e2 to 2ff453b Compare February 9, 2026 17:19
Copy link
Contributor

@aldbr aldbr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Loxeris, it's a good conversion from the existing DIRAC JobWrapper. Based on that, and my experience with it, I have a few comments to improve it.

This module tests the functionalities of the job wrapper.
"""

import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usage scenarios.
"""

import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here

abstract interfaces.
"""

import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here too


if __name__ == "__main__":
main()
asyncio.run(main())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling sys.exit from the main (now async) function, I think it would be cleaner to return an int, and sys.exit from here like:

        return 1

if __name__ == "__main__:
    sys.exit(asyncio.run(main())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just rename the data_management_mocks into mocks

Comment on lines +60 to +65
ret = await set_job_status(
jobID,
"Done",
"Execution Complete",
source=src,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the calls to set_job_status within the JobWrapper?
The reason is that we also use the JobWrapper but without the JobWrapperTemplate with HPC with no external connectivity.

Comment on lines +69 to +73
ret = await set_job_status(
jobID,
"Failed",
source=src,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

:param job_path: Path to the job working directory.
"""
assert arguments.sandbox is not None
await set_job_status(self.jobID, minor_status="Downloading InputSandbox", source=src)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, in DIRAC, there is a list of the existing minor status: https://github.com/DIRACGrid/DIRAC/blob/88fb54782ecae01d2b58205d0720efef0ae8d8ac/src/DIRAC/WorkloadManagementSystem/Client/JobMinorStatus.py

For now, you can make an StrEnum in dirac-cwl out of it (that will be later moved to diracx).
I think you could potentially even add more minor status like: InputSandbox successfully downloaded for instance.


async def initialize(self) -> None:
"""Initialize the JobWrapper."""
await set_job_status(self.jobID, "Running", "Job Initialization", source=src)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you could name them uniformly like: Initializing job..., Downloading input sandbox..., Resolving input data..., etc.


# Execute the task
logger.info("Executing Task: %s", command)
await set_job_status(self.jobID, minor_status="Application", source=src)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing payload...

@Loxeris Loxeris force-pushed the diracx-job-statuses branch from 2ff453b to 140b8bc Compare February 23, 2026 16:53
Comment on lines +10 to +27
class JobStatus(StrEnum):
"""List of all available job statuses."""

SUBMITTING = "Submitting"
RECEIVED = "Received"
CHECKING = "Checking"
STAGING = "Staging"
WAITING = "Waiting"
MATCHED = "Matched"
RUNNING = "Running"
STALLED = "Stalled"
COMPLETING = "Completing"
DONE = "Done"
COMPLETED = "Completed"
FAILED = "Failed"
DELETED = "Deleted"
KILLED = "Killed"
RESCHEDULED = "Rescheduled"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you reuse the job status from diracx-core?

Suggested change
class JobStatus(StrEnum):
"""List of all available job statuses."""
SUBMITTING = "Submitting"
RECEIVED = "Received"
CHECKING = "Checking"
STAGING = "Staging"
WAITING = "Waiting"
MATCHED = "Matched"
RUNNING = "Running"
STALLED = "Stalled"
COMPLETING = "Completing"
DONE = "Done"
COMPLETED = "Completed"
FAILED = "Failed"
DELETED = "Deleted"
KILLED = "Killed"
RESCHEDULED = "Rescheduled"
from diracx.core.models import JobStatus

# add job status record
self.job_status_info.append((status, minor_status, application_status.strip(' "' + "'"), timestamp))

def setApplicationStatus(self, appStatus):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be (almost) consistent with setJobStatus?

Suggested change
def setApplicationStatus(self, appStatus):
def setApplicationStatus(self, application_status: str | None = None):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure if the best is str = "" or str | None = None, can you check?

self._client = client

def setJobStatus(
self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better like this? I am just surprised you can have status = None with application status = ""
Shouldn't JobStatus be mandatory?

Suggested change
self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str = ""
self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str | None = None

if appStatus:
self.job_status_info.append(("", "", (appStatus.strip(' "' + "'"), timeStamp)))

def setJobParameter(self, par_name, par_value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about?

Suggested change
def setJobParameter(self, par_name, par_value):
def setJobParameter(self, key: str, value: str):

:param par_name: name of the parameter
:param par_value: value of the parameter
"""
self.job_parameters.append((par_name, par_value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason of having a list of tuples instead of a dictionary here? Because then I see you are building a dictionary from that when you send them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept how it was in DIRAC. A dictionary sounds better indeed.

Comment on lines +170 to +182
def dump(self):
"""Print out the contents of the internal cached information."""
print("Job status info:")
for status, minor, app, timeStamp in self.job_status_info:
if not status:
status = ""
if not minor:
minor = ""
print(status.ljust(20), minor.ljust(30), app.ljust(30), timeStamp)

print("Job parameters:")
for pname, pvalue in self.job_parameters:
print(pname.ljust(20), pvalue.ljust(30))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want that function here? At least not for now and not with print statements.

Suggested change
def dump(self):
"""Print out the contents of the internal cached information."""
print("Job status info:")
for status, minor, app, timeStamp in self.job_status_info:
if not status:
status = ""
if not minor:
minor = ""
print(status.ljust(20), minor.ljust(30), app.ljust(30), timeStamp)
print("Job parameters:")
for pname, pvalue in self.job_parameters:
print(pname.ljust(20), pvalue.ljust(30))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it from DIRAC implementation but I agree it should probably go.

Comment on lines +69 to +72
async def initialize(self) -> None:
"""Initialize the JobWrapper."""
self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION)
await self.job_report.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say there is no initialize for now (I want to see if we will need it in the future). The job_report call is then made in __init__.

Suggested change
async def initialize(self) -> None:
"""Initialize the JobWrapper."""
self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION)
await self.job_report.commit()
self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION)
await self.job_report.commit()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need an await to commit the job report, await needs to be in an async method and __init__ can't be one afaik. I guess we could just remove the problem entirely by not committing here. I need to check when it's needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok I see. I think it's better to do the commit at the end of every public methods.
So I don't say anything wrong, that would be at the end of preprocess/execute/postprocess.
What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should probably work out. Maybe we could also add a commit just after the JobMinorStatus.APPLICATION so the user can see the payload is running in case it takes a while.

assert arguments.sandbox is not None
self.job_report.setJobStatus(minor_status=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
if not self.execution_hooks_plugin:
self.job_report.setJobStatus(minor_status=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to commit here?
Unless you want to catch exceptions in the JobWrapperTemplate and then commit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think it's fine not to commit here because Exceptions are caught in a try catch in run_job() and there is a commit there.

        except Exception:
            logger.exception("JobWrapper: Failed to execute workflow")
            self.job_report.setJobStatus(JobStatus.FAILED)
            await self.job_report.commit()
            return False

I might also just move this commit to the finally block, to not have one for each return of the method too.

else:
self.job_report = JobReport(self.job_id, src, AsyncDiracClient())

async def initialize(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing to make everything async! (I think the whole code should be async to fit with the diracx code base)

await self.execution_hooks_plugin.store_output(outputs)
self.job_report.setJobStatus(status=JobStatus.COMPLETING, minor_status=JobMinorStatus.OUTPUT_DATA_UPLOADED)
except RuntimeError as err:
self.job_report.setJobStatus(status=JobStatus.FAILED, minor_status=JobMinorStatus.UPLOADING_OUTPUT_DATA)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no commit?

@Loxeris Loxeris force-pushed the diracx-job-statuses branch from 4e793fe to ae4abc4 Compare March 3, 2026 15:41
@Loxeris Loxeris force-pushed the diracx-job-statuses branch from a6ac83f to c7b7188 Compare March 4, 2026 10:19
Comment on lines +10 to +50
class JobMinorStatus(StrEnum):
"""List of all available job minor statuses."""

APPLICATION = "Executing Payload"
APP_ERRORS = "Application Finished With Errors"
# APP_NOT_FOUND = "Application not found"
APP_SUCCESS = "Application Finished Successfully"
# APP_THREAD_FAILED = "Application thread failed"
# APP_THREAD_NOT_COMPLETE = "Application thread did not complete"
DOWNLOADING_INPUT_SANDBOX = "Downloading InputSandbox"
# DOWNLOADING_INPUT_SANDBOX_LFN = "Downloading InputSandbox LFN(s)"
# EXCEPTION_DURING_EXEC = "Exception During Execution"
EXEC_COMPLETE = "Execution Complete"
FAILED_DOWNLOADING_INPUT_SANDBOX = "Failed Downloading InputSandbox"
# FAILED_DOWNLOADING_INPUT_SANDBOX_LFN = "Failed Downloading InputSandbox LFN(s)"
# FAILED_SENDING_REQUESTS = "Failed sending requests"
# GOING_RESCHEDULE = "Going to reschedule job"
# ILLEGAL_JOB_JDL = "Illegal Job JDL"
INPUT_DATA_RESOLUTION = "Resolving Input Data"
# INPUT_NOT_AVAILABLE = "Input Data Not Available"
# JOB_EXCEEDED_CPU = "Job has reached the CPU limit of the queue"
# JOB_EXCEEDED_WALL_CLOCK = "Job has exceeded maximum wall clock time"
JOB_INITIALIZATION = "Initializing Job"
# JOB_INSUFFICIENT_DISK = "Job has insufficient disk space to continue"
# JOB_WRAPPER_EXECUTION = "JobWrapper execution"
# JOB_WRAPPER_INITIALIZATION = "Job Wrapper Initialization"
# MARKED_FOR_TERMINATION = "Marked for termination"
# NO_CANDIDATE_SITE_FOUND = "No candidate sites available"
OUTPUT_DATA_UPLOADED = "Output Data Uploaded"
OUTPUT_SANDBOX_UPLOADED = "Output Sandbox Uploaded"
# PENDING_REQUESTS = "Pending Requests"
# PILOT_AGENT_SUBMISSION = "Pilot Agent Submission"
# RECEIVED_KILL_SIGNAL = "Received Kill signal"
# REQUESTS_DONE = "Requests done"
# RESCHEDULED = "Job Rescheduled"
RESOLVING_OUTPUT_SANDBOX = "Resolving Output Sandbox"
# STALLED_PILOT_NOT_RUNNING = "Job stalled: pilot not running"
# UPLOADING_JOB_OUTPUTS = "Uploading Outputs"
UPLOADING_OUTPUT_DATA = "Uploading Output Data"
UPLOADING_OUTPUT_SANDBOX = "Uploading Output Sandbox"
# WATCHDOG_STALLED = "Watchdog identified this job as stalled"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented all minor statuses unused in the job wrapper when working on it. I'm wondering if I should restore all these minor statuses, keep them commented, or delete them entirely from the enum.

@Loxeris Loxeris marked this pull request as ready for review March 4, 2026 10:54
@Loxeris Loxeris requested a review from aldbr March 4, 2026 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

The job wrapper should report the status of the job to DiracX

2 participants