Skip to content

Commit af4f047

Browse files
committed
Target refactor in anchoredMC; Improvements for early file removal
Changes the way we construct targets in anchorMC: Instead of calling different workflows in order (AOD, TPC-time-series, QC), we now execute one single workflow with all necessary targets in one go. This has the advantage that we can apply "early-file-removal" more easily and should also lead to CPU efficiency improvements. Some other improvements in anchoredMC: - allow to create just the workflow.json file - allow to inject additional options for the workflow runner The changes in o2dpg_workflow_runner.py concern mostly improvements for the early-artefact removal. E.g.: - do not remove artefacts when they belong to a final target task - add global disc space monitoring to measure benefit of early-file removal
1 parent 93ff9d0 commit af4f047

File tree

4 files changed

+73
-34
lines changed

4 files changed

+73
-34
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def setup_logger(name, log_file, level=logging.INFO):
100100
meta["cpu_limit"] = args.cpu_limit
101101
meta["mem_limit"] = args.mem_limit
102102
meta["workflow_file"] = os.path.abspath(args.workflowfile)
103+
args.target_tasks = [f.strip('"').strip("'") for f in args.target_tasks] # strip quotes from the shell
103104
meta["target_task"] = args.target_tasks
104105
meta["rerun_from"] = args.rerun_from
105106
meta["target_labels"] = args.target_labels
@@ -321,20 +322,20 @@ def load_json(workflowfile):
321322

322323

323324
# filters the original workflowspec according to wanted targets or labels
324-
# returns a new workflowspec
325+
# returns a new workflowspec and the list of "final" workflowtargets
325326
def filter_workflow(workflowspec, targets=[], targetlabels=[]):
326327
if len(targets)==0:
327-
return workflowspec
328+
return workflowspec, []
328329
if len(targetlabels)==0 and len(targets)==1 and targets[0]=="*":
329-
return workflowspec
330+
return workflowspec, []
330331

331332
transformedworkflowspec = workflowspec
332333

333334
def task_matches(t):
334335
for filt in targets:
335336
if filt=="*":
336337
return True
337-
if re.match(filt, t)!=None:
338+
if re.match(filt, t) != None:
338339
return True
339340
return False
340341

@@ -372,6 +373,8 @@ def canBeDone(t,cache={}):
372373
ok = False
373374
break
374375
cache[t['name']] = ok
376+
if ok == False:
377+
print (f"Disabling target {t['name']} due to unsatisfied requirements")
375378
return ok
376379

377380
okcache = {}
@@ -404,7 +407,7 @@ def needed_by_targets(name):
404407
# we finaly copy everything matching the targets as well
405408
# as all their requirements
406409
transformedworkflowspec['stages']=[ l for l in workflowspec['stages'] if needed_by_targets(l['name']) ]
407-
return transformedworkflowspec
410+
return transformedworkflowspec, full_target_name_list
408411

409412

410413
# builds topological orderings (for each timeframe)
@@ -898,7 +901,7 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
898901
break
899902

900903

901-
def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
904+
def filegraph_expand_timeframes(data: dict, timeframes: set, target_namelist) -> dict:
902905
"""
903906
A utility function for the fileaccess logic. Takes a template and duplicates
904907
for the multi-timeframe structure.
@@ -921,6 +924,12 @@ def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
921924
entry["written_by"] = [
922925
re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"]
923926
]
927+
# for now we mark some files as keep if they are written
928+
# by a target in the runner targetlist. TODO: Add other mechanisms
929+
# to ask for file keeping (such as via regex or the like)
930+
for e in entry["written_by"]:
931+
if e in target_namelist:
932+
entry["keep"] = True
924933
entry["read_by"] = [
925934
re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"]
926935
]
@@ -945,7 +954,8 @@ def __init__(self, workflowfile, args, jmax=100):
945954
os.environ[e] = str(value)
946955

947956
# only keep those tasks that are necessary to be executed based on user's filters
948-
self.workflowspec = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)
957+
self.full_target_namelist = []
958+
self.workflowspec, self.full_target_namelist = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)
949959

950960
if not self.workflowspec['stages']:
951961
if args.target_tasks:
@@ -1015,7 +1025,7 @@ def __init__(self, workflowfile, args, jmax=100):
10151025
with open(args.remove_files_early) as f:
10161026
filegraph_data = json.load(f)
10171027
self.do_early_file_removal = True
1018-
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset)
1028+
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset, self.full_target_namelist)
10191029

10201030

10211031
def perform_early_file_removal(self, taskids):
@@ -1031,7 +1041,7 @@ def remove_if_exists(filepath: str) -> None:
10311041
if os.path.exists(filepath):
10321042
fsize = os.path.getsize(filepath)
10331043
os.remove(filepath)
1034-
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.")
1044+
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024./1024.} MB.")
10351045
return True
10361046

10371047
return False
@@ -1057,7 +1067,7 @@ def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes):
10571067
file_entry['written_by'].remove(taskname)
10581068

10591069
# TODO: in principle the written_by criterion might not be needed
1060-
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0:
1070+
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0 and file_entry.get('keep', False) == False:
10611071
# the filename mentioned here is no longer needed and we can remove it
10621072
# make sure it is there and then delete it
10631073
if remove_if_exists(filename):
@@ -1329,6 +1339,17 @@ def monitor(self, process_list):
13291339
globalPSS=0.
13301340
resources_per_task = {}
13311341

1342+
# On a global level, we are interested in total disc space used (not differential in tasks)
1343+
# We can call system "du" as the fastest impl
1344+
def disk_usage_du(path: str) -> int:
1345+
"""Use system du to get total size in bytes."""
1346+
out = subprocess.check_output(['du', '-sb', path], text=True)
1347+
return int(out.split()[0])
1348+
1349+
disc_usage = -1
1350+
if os.getenv("MONITOR_DISC_USAGE"):
1351+
disc_usage = disk_usage_du(os.getcwd()) / 1024. / 1024 # in MB
1352+
13321353
for tid, proc in process_list:
13331354

13341355
# proc is Popen object
@@ -1399,7 +1420,15 @@ def monitor(self, process_list):
13991420
totalUSS = totalUSS / 1024 / 1024
14001421
totalPSS = totalPSS / 1024 / 1024
14011422
nice_value = proc.nice()
1402-
resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS, 'pss':totalPSS, 'nice':nice_value, 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']}
1423+
resources_per_task[tid]={'iter':self.internalmonitorid,
1424+
'name':self.idtotask[tid],
1425+
'cpu':totalCPU,
1426+
'uss':totalUSS,
1427+
'pss':totalPSS,
1428+
'nice':nice_value,
1429+
'swap':totalSWAP,
1430+
'label':self.workflowspec['stages'][tid]['labels'],
1431+
'disc': disc_usage}
14031432
self.resource_manager.add_monitored_resources(tid, time_delta, totalCPU / 100, totalPSS)
14041433
if nice_value == self.resource_manager.nice_default:
14051434
globalCPU += totalCPU

MC/bin/o2dpg_qc_finalization_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def add_QC_finalization(taskName, qcConfigPath, needs=None):
4646
if standalone == True:
4747
needs = []
4848
elif needs == None:
49-
needs = [taskName + '_local' + str(tf) for tf in range(1, ntimeframes + 1)]
49+
needs = [taskName + '_local_' + str(tf) for tf in range(1, ntimeframes + 1)]
5050

5151
task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000')
5252
def remove_json_prefix(path):

MC/bin/o2dpg_sim_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1726,7 +1726,7 @@ def getDigiTaskName(det):
17261726
if includeFullQC or includeLocalQC:
17271727

17281728
def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
1729-
task = createTask(name=taskName + '_local' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000')
1729+
task = createTask(name=taskName + '_local_' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000')
17301730
objectsFile = objectsFile if len(objectsFile) > 0 else taskName + '.root'
17311731

17321732
def remove_json_prefix(path):

MC/run/ANCHOR/anchorMC.sh

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,10 @@ fi
338338
TIMESTAMP=`grep "Determined timestamp to be" ${anchoringLogFile} | awk '//{print $6}'`
339339
echo_info "TIMESTAMP IS ${TIMESTAMP}"
340340

341+
if [ "${ONLY_WORKFLOW_CREATION}" ]; then
342+
exit 0
343+
fi
344+
341345
# check if this job is exluded because it falls inside a bad data-taking period
342346
ISEXCLUDED=$(grep "TIMESTAMP IS EXCLUDED IN RUN" ${anchoringLogFile})
343347
if [ "${ISEXCLUDED}" ]; then
@@ -383,30 +387,36 @@ export FAIRMQ_IPC_PREFIX=./
383387

384388
echo_info "Ready to start main workflow"
385389

386-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${ALIEN_JDL_O2DPGWORKFLOWTARGET:-aod} --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources
387-
MCRC=$? # <--- we'll report back this code
388-
if [[ "${MCRC}" == "0" && "${ALIEN_JDL_ADDTIMESERIESINMC}" != "0" ]]; then
389-
# Default value is 1 so this is run by default.
390-
echo_info "Running TPC time series"
391-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpctimes
392-
# Note: We could maybe avoid this if-else by including `tpctimes` directly in the workflow-targets above
393-
fi
394-
395-
if [[ "${MCRC}" == "0" && "${ALIEN_JDL_DOTPCRESIDUALEXTRACTION}" = "1" ]]; then
396-
echo_info "Running TPC residuals extraction, aggregation and merging"
397-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpcresidmerge
390+
# Let us construct the workflow targets
391+
targetString=""
392+
if [ "${ALIEN_JDL_O2DPGWORKFLOWTARGET}" ]; then
393+
# The user gave ${ALIEN_JDL_O2DPGWORKFLOWTARGET}. This is an expert mode not used in production.
394+
# In this case, we will build just that. No QC, no TPC timeseries, ...
395+
targetString=${ALIEN_JDL_O2DPGWORKFLOWTARGET}
396+
else
397+
targetString="'aodmerge.*'"
398+
# Now add more targets depending on options
399+
# -) The TPC timeseries targets
400+
if [[ "${ALIEN_JDL_ADDTIMESERIESINMC}" == "1" ]]; then
401+
targetString="${targetString} 'tpctimes.*'"
402+
fi
403+
# -) TPC residual calibration
404+
if [ "${ALIEN_JDL_DOTPCRESIDUALSEXTRACTION}" ]; then
405+
targetString="${targetString} 'tpcresidmerge.*'"
406+
fi
407+
# -) QC tasks
408+
if [[ -z "${DISABLE_QC}" && "${remainingargs}" == *"--include-local-qc"* ]]; then
409+
targetString="${targetString} '^.*QC.*'" # QC tasks should have QC in the name
410+
fi
398411
fi
412+
echo_info "Workflow will run with target specification ${targetString}"
399413

400-
[[ -n "${DISABLE_QC}" ]] && echo_info "QC is disabled, skip it."
414+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${targetString} \
415+
--cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources \
416+
${ALIEN_O2DPG_FILEGRAPH:+--remove-files-early ${ALIEN_O2DPG_FILEGRAPH}} \
417+
${ALIEN_O2DPG_ADDITIONAL_WORKFLOW_RUNNER_ARGS}
401418

402-
if [[ -z "${DISABLE_QC}" && "${MCRC}" == "0" && "${remainingargs}" == *"--include-local-qc"* ]] ; then
403-
# do QC tasks
404-
echo_info "Doing QC"
405-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-labels QC --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} -k
406-
# NOTE that with the -k|--keep-going option, the runner will try to keep on executing even if some tasks fail.
407-
# That means, even if there is a failing QC task, the return code will be 0
408-
MCRC=$?
409-
fi
419+
MCRC=$? # <--- we'll report back this code
410420

411421
#
412422
# full logs tar-ed for output, regardless the error code or validation - to catch also QC logs...

0 commit comments

Comments
 (0)