Skip to content

Commit 8edaf23

Browse files
committed
Introduce retry_count as a task property
Can now specify in the workflow json which tasks merit a "retry". This is useful for tasks that suffer from random (e.g., race condition) failures. Retrying such tasks might save compute resources, which would otherwise be lost when leading to a complete exit of the workflow. The feature complements the global option "--retry-on-failure" in the workflow runner. For the moment using the feature for TPC clusterization until https://alice.its.cern.ch/jira/browse/O2-3069 is fixed.
1 parent 0b7bc6f commit 8edaf23

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ def __init__(self, workflowfile, args, jmax=100):
466466
self.internalmonitorid = 0 # internal use
467467
self.tids_marked_toretry = [] # sometimes we might want to retry a failed task (simply because it was "unlucky") and we put them here
468468
self.retry_counter = [ 0 for tid in range(len(self.taskuniverse)) ] # we keep track of many times retried already
469+
self.task_retries = [ self.workflowspec['stages'][tid].get('retry_count',0) for tid in range(len(self.taskuniverse)) ] # the per task specific "retry" number -> needs to be parsed from the JSON
470+
469471
self.semaphore_values = { self.workflowspec['stages'][tid].get('semaphore'):0 for tid in range(len(self.taskuniverse)) if self.workflowspec['stages'][tid].get('semaphore')!=None } # keeps current count of semaphores (defined in the json workflow). used to achieve user-defined "critical sections".
470472

471473
def SIGHandler(self, signum, frame):
@@ -799,7 +801,7 @@ def waitforany(self, process_list, finished, failingtasks):
799801
if returncode != 0:
800802
print (str(self.idtotask[tid]) + ' failed ... checking retry')
801803
# we inspect if this is something "unlucky" which could be resolved by a simple resubmit
802-
if self.is_worth_retrying(tid) and self.retry_counter[tid] < int(args.retry_on_failure):
804+
if self.is_worth_retrying(tid) and ((self.retry_counter[tid] < int(args.retry_on_failure)) or (self.retry_counter[tid] < int(self.task_retries[tid]))):
803805
print (str(self.idtotask[tid]) + ' to be retried')
804806
actionlogger.info ('Task ' + str(self.idtotask[tid]) + ' failed but marked to be retried ')
805807
self.tids_marked_toretry.append(tid)

MC/bin/o2dpg_sim_workflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,7 @@ def getDigiTaskName(det):
760760
tpcclussect['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS)
761761
tpcclussect['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type digitizer --output-type clusters,send-clusters-per-sector --outfile tpc-native-clusters-part' + str((int)(s/sectorpertask)) + '.root --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4})
762762
tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" }
763+
tpcclussect['retry_count'] = 2 # the task has a race condition --> makes sense to retry
763764
workflow['stages'].append(tpcclussect)
764765

765766
TPCCLUSMERGEtask=createTask(name='tpcclustermerge_'+str(tf), needs=tpcclustertasks, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='10000')

0 commit comments

Comments
 (0)