Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def setup_logger(name, log_file, level=logging.INFO):
meta["cpu_limit"] = args.cpu_limit
meta["mem_limit"] = args.mem_limit
meta["workflow_file"] = os.path.abspath(args.workflowfile)
args.target_tasks = [f.strip('"').strip("'") for f in args.target_tasks] # strip quotes from the shell
meta["target_task"] = args.target_tasks
meta["rerun_from"] = args.rerun_from
meta["target_labels"] = args.target_labels
Expand Down Expand Up @@ -321,20 +322,20 @@ def load_json(workflowfile):


# filters the original workflowspec according to wanted targets or labels
# returns a new workflowspec
# returns a new workflowspec and the list of "final" workflowtargets
def filter_workflow(workflowspec, targets=[], targetlabels=[]):
if len(targets)==0:
return workflowspec
return workflowspec, []
if len(targetlabels)==0 and len(targets)==1 and targets[0]=="*":
return workflowspec
return workflowspec, []

transformedworkflowspec = workflowspec

def task_matches(t):
for filt in targets:
if filt=="*":
return True
if re.match(filt, t)!=None:
if re.match(filt, t) != None:
return True
return False

Expand Down Expand Up @@ -372,6 +373,8 @@ def canBeDone(t,cache={}):
ok = False
break
cache[t['name']] = ok
if ok == False:
print (f"Disabling target {t['name']} due to unsatisfied requirements")
return ok

okcache = {}
Expand Down Expand Up @@ -404,7 +407,7 @@ def needed_by_targets(name):
# we finaly copy everything matching the targets as well
# as all their requirements
transformedworkflowspec['stages']=[ l for l in workflowspec['stages'] if needed_by_targets(l['name']) ]
return transformedworkflowspec
return transformedworkflowspec, full_target_name_list


# builds topological orderings (for each timeframe)
Expand Down Expand Up @@ -898,7 +901,7 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
break


def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
def filegraph_expand_timeframes(data: dict, timeframes: set, target_namelist) -> dict:
"""
A utility function for the fileaccess logic. Takes a template and duplicates
for the multi-timeframe structure.
Expand All @@ -921,6 +924,12 @@ def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
entry["written_by"] = [
re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"]
]
# for now we mark some files as keep if they are written
# by a target in the runner targetlist. TODO: Add other mechanisms
# to ask for file keeping (such as via regex or the like)
for e in entry["written_by"]:
if e in target_namelist:
entry["keep"] = True
entry["read_by"] = [
re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"]
]
Expand All @@ -945,7 +954,8 @@ def __init__(self, workflowfile, args, jmax=100):
os.environ[e] = str(value)

# only keep those tasks that are necessary to be executed based on user's filters
self.workflowspec = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)
self.full_target_namelist = []
self.workflowspec, self.full_target_namelist = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)

if not self.workflowspec['stages']:
if args.target_tasks:
Expand Down Expand Up @@ -1015,7 +1025,7 @@ def __init__(self, workflowfile, args, jmax=100):
with open(args.remove_files_early) as f:
filegraph_data = json.load(f)
self.do_early_file_removal = True
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset)
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset, self.full_target_namelist)


def perform_early_file_removal(self, taskids):
Expand All @@ -1031,7 +1041,7 @@ def remove_if_exists(filepath: str) -> None:
if os.path.exists(filepath):
fsize = os.path.getsize(filepath)
os.remove(filepath)
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.")
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024./1024.} MB.")
return True

return False
Expand All @@ -1057,7 +1067,7 @@ def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes):
file_entry['written_by'].remove(taskname)

# TODO: in principle the written_by criterion might not be needed
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0:
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0 and file_entry.get('keep', False) == False:
# the filename mentioned here is no longer needed and we can remove it
# make sure it is there and then delete it
if remove_if_exists(filename):
Expand Down Expand Up @@ -1329,6 +1339,17 @@ def monitor(self, process_list):
globalPSS=0.
resources_per_task = {}

# On a global level, we are interested in total disc space used (not differential in tasks)
# We can call system "du" as the fastest impl
def disk_usage_du(path: str) -> int:
"""Use system du to get total size in bytes."""
out = subprocess.check_output(['du', '-sb', path], text=True)
return int(out.split()[0])

disc_usage = -1
if os.getenv("MONITOR_DISC_USAGE"):
disc_usage = disk_usage_du(os.getcwd()) / 1024. / 1024 # in MB

for tid, proc in process_list:

# proc is Popen object
Expand Down Expand Up @@ -1399,7 +1420,15 @@ def monitor(self, process_list):
totalUSS = totalUSS / 1024 / 1024
totalPSS = totalPSS / 1024 / 1024
nice_value = proc.nice()
resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS, 'pss':totalPSS, 'nice':nice_value, 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']}
resources_per_task[tid]={'iter':self.internalmonitorid,
'name':self.idtotask[tid],
'cpu':totalCPU,
'uss':totalUSS,
'pss':totalPSS,
'nice':nice_value,
'swap':totalSWAP,
'label':self.workflowspec['stages'][tid]['labels'],
'disc': disc_usage}
self.resource_manager.add_monitored_resources(tid, time_delta, totalCPU / 100, totalPSS)
if nice_value == self.resource_manager.nice_default:
globalCPU += totalCPU
Expand Down
2 changes: 1 addition & 1 deletion MC/bin/o2dpg_qc_finalization_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def add_QC_finalization(taskName, qcConfigPath, needs=None):
if standalone == True:
needs = []
elif needs == None:
needs = [taskName + '_local' + str(tf) for tf in range(1, ntimeframes + 1)]
needs = [taskName + '_local_' + str(tf) for tf in range(1, ntimeframes + 1)]

task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000')
def remove_json_prefix(path):
Expand Down
2 changes: 1 addition & 1 deletion MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,7 @@ def getDigiTaskName(det):
if includeFullQC or includeLocalQC:

def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
task = createTask(name=taskName + '_local' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000')
task = createTask(name=taskName + '_local_' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000')
objectsFile = objectsFile if len(objectsFile) > 0 else taskName + '.root'

def remove_json_prefix(path):
Expand Down
52 changes: 31 additions & 21 deletions MC/run/ANCHOR/anchorMC.sh
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ fi
TIMESTAMP=`grep "Determined timestamp to be" ${anchoringLogFile} | awk '//{print $6}'`
echo_info "TIMESTAMP IS ${TIMESTAMP}"

if [ "${ONLY_WORKFLOW_CREATION}" ]; then
exit 0
fi

# check if this job is exluded because it falls inside a bad data-taking period
ISEXCLUDED=$(grep "TIMESTAMP IS EXCLUDED IN RUN" ${anchoringLogFile})
if [ "${ISEXCLUDED}" ]; then
Expand Down Expand Up @@ -383,30 +387,36 @@ export FAIRMQ_IPC_PREFIX=./

echo_info "Ready to start main workflow"

${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${ALIEN_JDL_O2DPGWORKFLOWTARGET:-aod} --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources
MCRC=$? # <--- we'll report back this code
if [[ "${MCRC}" == "0" && "${ALIEN_JDL_ADDTIMESERIESINMC}" != "0" ]]; then
# Default value is 1 so this is run by default.
echo_info "Running TPC time series"
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpctimes
# Note: We could maybe avoid this if-else by including `tpctimes` directly in the workflow-targets above
fi

if [[ "${MCRC}" == "0" && "${ALIEN_JDL_DOTPCRESIDUALEXTRACTION}" = "1" ]]; then
echo_info "Running TPC residuals extraction, aggregation and merging"
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpcresidmerge
# Let us construct the workflow targets
targetString=""
if [ "${ALIEN_JDL_O2DPGWORKFLOWTARGET}" ]; then
# The user gave ${ALIEN_JDL_O2DPGWORKFLOWTARGET}. This is an expert mode not used in production.
# In this case, we will build just that. No QC, no TPC timeseries, ...
targetString=${ALIEN_JDL_O2DPGWORKFLOWTARGET}
else
targetString="'aodmerge.*'"
# Now add more targets depending on options
# -) The TPC timeseries targets
if [[ "${ALIEN_JDL_ADDTIMESERIESINMC}" == "1" ]]; then
targetString="${targetString} 'tpctimes.*'"
fi
# -) TPC residual calibration
if [ "${ALIEN_JDL_DOTPCRESIDUALSEXTRACTION}" ]; then
targetString="${targetString} 'tpcresidmerge.*'"
fi
# -) QC tasks
if [[ -z "${DISABLE_QC}" && "${remainingargs}" == *"--include-local-qc"* ]]; then
targetString="${targetString} '^.*QC.*'" # QC tasks should have QC in the name
fi
fi
echo_info "Workflow will run with target specification ${targetString}"

[[ -n "${DISABLE_QC}" ]] && echo_info "QC is disabled, skip it."
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${targetString} \
--cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources \
${ALIEN_O2DPG_FILEGRAPH:+--remove-files-early ${ALIEN_O2DPG_FILEGRAPH}} \
${ALIEN_O2DPG_ADDITIONAL_WORKFLOW_RUNNER_ARGS}

if [[ -z "${DISABLE_QC}" && "${MCRC}" == "0" && "${remainingargs}" == *"--include-local-qc"* ]] ; then
# do QC tasks
echo_info "Doing QC"
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-labels QC --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} -k
# NOTE that with the -k|--keep-going option, the runner will try to keep on executing even if some tasks fail.
# That means, even if there is a failing QC task, the return code will be 0
MCRC=$?
fi
MCRC=$? # <--- we'll report back this code

#
# full logs tar-ed for output, regardless the error code or validation - to catch also QC logs...
Expand Down