Skip to content

Commit ca2a539

Browse files
committed
Add keep-going feature
1 parent a58f483 commit ca2a539

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
3535
parser.add_argument('-jmax','--maxjobs', help='Number of maximal parallel tasks.', default=100)
36+
parser.add_argument('-k','--keep-going', action='store_true', help='Keep executing the pipeline as far possibe (not stopping on first failure)')
3637
parser.add_argument('--dry-run', action='store_true', help='Show what you would do.')
3738
parser.add_argument('--visualize-workflow', action='store_true', help='Saves a graph visualization of workflow.')
3839
parser.add_argument('--target-labels', nargs='+', help='Runs the pipeline by target labels (example "TPC" or "DIGI").\
@@ -448,7 +449,8 @@ def __init__(self, workflowfile, args, jmax=100):
448449
self.cpulimit = float(args.cpu_limit)
449450
self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
450451
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }
451-
self.stoponfailure = True
452+
self.stoponfailure = not (args.keep_going == True)
453+
print ("Stop on failure ",self.stoponfailure)
452454
self.max_jobs_parallel = int(jmax)
453455
self.scheduling_iteration = 0
454456
self.process_list = [] # list of currently scheduled tasks with normal priority
@@ -774,10 +776,9 @@ def monitor(self, process_list):
774776
# --> We could use this for corrective actions such as killing jobs currently back-filling
775777
# (or better hibernating)
776778

777-
def waitforany(self, process_list, finished):
779+
def waitforany(self, process_list, finished, failingtasks):
778780
failuredetected = False
779781
failingpids = []
780-
failingtasks = []
781782
if len(process_list)==0:
782783
return False
783784

@@ -1018,6 +1019,7 @@ def execute(self):
10181019
starttime = time.perf_counter()
10191020
psutil.cpu_percent(interval=None)
10201021
os.environ['JOBUTILS_SKIPDONE'] = "ON"
1022+
errorencountered = False
10211023

10221024
def speedup_ROOT_Init():
10231025
"""initialize some env variables that speed up ROOT init
@@ -1104,7 +1106,8 @@ def speedup_ROOT_Init():
11041106
break
11051107

11061108
finished_from_started = [] # to account for finished when actually started
1107-
while self.waitforany(self.process_list, finished_from_started):
1109+
failing = []
1110+
while self.waitforany(self.process_list, finished_from_started, failing):
11081111
if not args.dry_run:
11091112
self.monitor(self.process_list) # ---> make async to normal operation?
11101113
time.sleep(1) # <--- make this incremental (small wait at beginning)
@@ -1115,6 +1118,16 @@ def speedup_ROOT_Init():
11151118
actionlogger.debug("finished now :" + str(finished_from_started))
11161119
finishedtasks = finishedtasks + finished
11171120

1121+
# if a task was marked "failed" and we come here (because
1122+
# we use --keep-going) ... we need to take out the pid from finished
1123+
if len(failing) > 0:
1124+
# remove these from those marked finished in order
1125+
# not to continue with their children
1126+
errorencountered = True
1127+
for t in failing:
1128+
finished = [ x for x in finished if x != t ]
1129+
finishedtasks = [ x for x in finishedtasks if x != t ]
1130+
11181131
# if a task was marked as "retry" we simply put it back into the candidate list
11191132
if len(self.tids_marked_toretry) > 0:
11201133
# we need to remove these first of all from those marked finished
@@ -1150,10 +1163,12 @@ def speedup_ROOT_Init():
11501163
self.SIGHandler(0,0)
11511164

11521165
endtime = time.perf_counter()
1153-
print ('\n**** Pipeline done (global_runtime : {:.3f}s) *****\n'.format(endtime-starttime))
1154-
1155-
1166+
statusmsg = "success"
1167+
if errorencountered:
1168+
statusmsg = "with failures"
11561169

1170+
print ('\n**** Pipeline done ' + statusmsg + ' (global_runtime : {:.3f}s) *****\n'.format(endtime-starttime))
1171+
return errorencountered
11571172

11581173

11591174
if args.cgroup!=None:
@@ -1163,4 +1178,4 @@ def speedup_ROOT_Init():
11631178
os.system(command)
11641179

11651180
executor=WorkflowExecutor(args.workflowfile,jmax=args.maxjobs,args=args)
1166-
executor.execute()
1181+
exit (executor.execute())

0 commit comments

Comments
 (0)