Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 108 additions & 109 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import json
import re
import tempfile
import subprocess
Expand All @@ -25,6 +26,8 @@

HOLD_REASON_SUBCODE = "55"

STATE_ATTRIBUTES = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason"

subTemplate = """
# Environment
# -----------
Expand Down Expand Up @@ -62,6 +65,7 @@
# Requirements
# ------------
request_cpus = %(processors)s
requirements = NumJobStarts == 0

# Exit options
# ------------
Expand All @@ -73,7 +77,8 @@
# A subcode of our choice to identify who put the job on hold
on_exit_hold_subcode = %(holdReasonSubcode)s
# Jobs are then deleted from the system after N days if they are not idle or running
periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))
periodic_remove = ((JobStatus == 1) && (NumJobStarts > 0)) || \
((JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))

# Specific options
# ----------------
Expand All @@ -87,63 +92,34 @@
"""


def parseCondorStatus(lines, jobID):
def getCondorStatus(jobMetadata):
"""parse the condor_q or condor_history output for the job status

:param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode
:type lines: python:list
:param str jobID: jobID of condor job, e.g.: 123.53
:param jobMetadata: dict with job metadata
:type jobMetadata: dict[str, str | int]
:returns: Status as known by DIRAC, and a reason if the job is being held
"""
jobID = str(jobID)

holdReason = ""
status = None
for line in lines:
l = line.strip().split()

# Make sure the job ID exists
if len(l) < 1 or l[0] != jobID:
continue

# Make sure the status is present and is an integer
try:
status = int(l[1])
except (ValueError, IndexError):
break

# Stop here if the status is not held (5): result should be found in STATES_MAP
if status != 5:
break

# A job can be held for various reasons,
# we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
status = 3
try:
holdReasonCode = l[2]
holdReasonSubcode = l[3]
holdReason = " ".join(l[4:])
except IndexError:
# This should not happen in theory
# Just set the status to unknown such as
status = None
holdReasonCode = "undefined"
holdReasonSubcode = "undefined"
break

# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
elif holdReasonCode == "16":
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)
if jobMetadata["JobStatus"] != 5:
# If the job is not held, we can return the status directly
return (STATES_MAP.get(jobMetadata["JobStatus"], "Unknown"), "")

# A job can be held for various reasons,
# we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
status = 3

# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
if jobMetadata["HoldReasonCode"] == 3 and jobMetadata["HoldReasonSubCode"] == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
elif jobMetadata["HoldReasonCode"] == 16:
status = 1

return (STATES_MAP.get(status, "Unknown"), jobMetadata["HoldReason"])


class Condor(object):
Expand Down Expand Up @@ -171,8 +147,6 @@ def submitJob(self, **kwargs):
preamble = kwargs.get("Preamble")

jdlFile = tempfile.NamedTemporaryFile(dir=outputDir, suffix=".jdl", mode="wt")
scheddOptions = 'requirements = OpSys == "LINUX"\n'
scheddOptions += "gentenv = False"
jdlFile.write(
subTemplate
% dict(
Expand All @@ -185,15 +159,15 @@ def submitJob(self, **kwargs):
holdReasonSubcode=HOLD_REASON_SUBCODE,
daysToKeepRemoteLogs=1,
scheddOptions="",
extraString="",
extraString=submitOptions,
pilotStampList=",".join(stamps),
)
)

jdlFile.flush()

cmd = "%s; " % preamble if preamble else ""
cmd += "condor_submit %s %s" % (submitOptions, jdlFile.name)
cmd += "condor_submit %s" % jdlFile.name
sp = subprocess.Popen(
cmd,
shell=True,
Expand Down Expand Up @@ -283,7 +257,6 @@ def killJob(self, **kwargs):

def getJobStatus(self, **kwargs):
"""Get status of the jobs in the given list"""

resultDict = {}

MANDATORY_PARAMETERS = ["JobIDList"]
Expand All @@ -299,15 +272,11 @@ def getJobStatus(self, **kwargs):
resultDict["Message"] = "Empty job list"
return resultDict

user = kwargs.get("User")
if not user:
user = os.environ.get("USER")
if not user:
resultDict["Status"] = -1
resultDict["Message"] = "No user name"
return resultDict
# Prepare the command to get the status of the jobs
cmdJobs = " ".join(str(jobID) for jobID in jobIDList)

cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user
# Get the status of the jobs currently active
cmd = "condor_q %s -attributes %s -json" % (cmdJobs, STATE_ATTRIBUTES)
sp = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
Expand All @@ -317,16 +286,15 @@ def getJobStatus(self, **kwargs):
output, error = sp.communicate()
status = sp.returncode

if status != 0:
if status != 0 or not output:
resultDict["Status"] = status
resultDict["Message"] = error
return resultDict

qList = output.strip().split("\n")
jobsMetadata = json.loads(output)

condorHistCall = (
"condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
)
# Get the status of the jobs in the history
condorHistCall = "condor_history %s -attributes %s -json" % (cmdJobs, STATE_ATTRIBUTES)
sp = subprocess.Popen(
shlex.split(condorHistCall),
stdout=subprocess.PIPE,
Expand All @@ -335,15 +303,26 @@ def getJobStatus(self, **kwargs):
)
output, _ = sp.communicate()
status = sp.returncode
if status == 0:
for line in output.split("\n"):
qList.append(line)

if status != 0 or not output:
resultDict["Status"] = status
resultDict["Message"] = error
return resultDict

jobsMetadata += json.loads(output)

statusDict = {}
if len(qList):
for job in jobIDList:
job = str(job)
statusDict[job], _ = parseCondorStatus(qList, job)
# Build a set of job IDs found in jobsMetadata
foundJobIDs = set()
for jobDict in jobsMetadata:
jobID = "%s.%s" % (jobDict["ClusterId"], jobDict["ProcId"])
statusDict[jobID], _ = getCondorStatus(jobDict)
foundJobIDs.add(jobID)

# For job IDs not found, set status to "Unknown"
for jobID in jobIDList:
if str(jobID) not in foundJobIDs:
statusDict[str(jobID)] = "Unknown"

# Final output
status = 0
Expand All @@ -355,19 +334,30 @@ def getCEStatus(self, **kwargs):
"""Get the overall status of the CE"""
resultDict = {}

user = kwargs.get("User")
if not user:
user = os.environ.get("USER")
if not user:
cmd = "condor_q -totals -json"
sp = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode

if status != 0 or not output:
resultDict["Status"] = -1
resultDict["Message"] = "No user name"
resultDict["Message"] = error
return resultDict

waitingJobs = 0
runningJobs = 0
jresult = json.loads(output)
resultDict["Status"] = 0
resultDict["Waiting"] = jresult[0]["Idle"]
resultDict["Running"] = jresult[0]["Running"]

# We also need to check the hold jobs, some of them are actually waiting (e.g. for input files)
cmd = 'condor_q -json -constraint "JobStatus == 5" -attributes HoldReasonCode'
sp = subprocess.Popen(
shlex.split("condor_q -submitter %s" % user),
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
Expand All @@ -376,33 +366,42 @@ def getCEStatus(self, **kwargs):
status = sp.returncode

if status != 0:
if "no record" in output:
resultDict["Status"] = 0
resultDict["Waiting"] = waitingJobs
resultDict["Running"] = runningJobs
return resultDict
resultDict["Status"] = status
resultDict["Status"] = -1
resultDict["Message"] = error
return resultDict

if "no record" in output:
resultDict["Status"] = 0
resultDict["Waiting"] = waitingJobs
resultDict["Running"] = runningJobs
# If there are no held jobs, we can return the result
if not output:
return resultDict

if output:
lines = output.split("\n")
for line in lines:
if not line.strip():
continue
if " I " in line:
waitingJobs += 1
elif " R " in line:
runningJobs += 1
jresult = json.loads(output)
for job_metadata in jresult:
if job_metadata["HoldReasonCode"] == 16:
resultDict["Waiting"] += 1

return resultDict

def getJobOutputFiles(self, **kwargs):
"""Get output file names and templates for the specific CE"""
resultDict = {}

MANDATORY_PARAMETERS = ["JobIDList", "OutputDir", "ErrorDir"]
for argument in MANDATORY_PARAMETERS:
if argument not in kwargs:
resultDict["Status"] = -1
resultDict["Message"] = "No %s" % argument
return resultDict

outputDir = kwargs["OutputDir"]
errorDir = kwargs["ErrorDir"]
jobIDList = kwargs["JobIDList"]

jobDict = {}
for jobID in jobIDList:
jobDict[jobID] = {}
jobDict[jobID]["Output"] = "%s/%s.out" % (outputDir, jobID)
jobDict[jobID]["Error"] = "%s/%s.err" % (errorDir, jobID)

# Final output
resultDict["Status"] = 0
resultDict["Waiting"] = waitingJobs
resultDict["Running"] = runningJobs
resultDict["Jobs"] = jobDict
return resultDict
Loading
Loading