Skip to content

Commit 687e368

Browse files
committed
Pipeline runner: Ability to sync tasks with a 'semaphore' mechanism
Tasks can now specify a named semaphore object, meaning that no two tasks refering to the same name may be executed in parallel. This is useful to achieve basic "critical section" protection, for instance for merging operations (AOD).
1 parent 2c6170d commit 687e368

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ def __init__(self, workflowfile, args, jmax=100):
404404
self.internalmonitorid = 0 # internal use
405405
self.tids_marked_toretry = [] # sometimes we might want to retry a failed task (simply because it was "unlucky") and we put them here
406406
self.retry_counter = [ 0 for tid in range(len(self.taskuniverse)) ] # we keep track of many times retried already
407+
self.semaphore_values = { self.workflowspec['stages'][tid].get('semaphore'):0 for tid in range(len(self.taskuniverse)) if self.workflowspec['stages'][tid].get('semaphore')!=None } # keeps current count of semaphores (defined in the json workflow). used to achieve user-defined "critical sections".
407408

408409
def SIGHandler(self, signum, frame):
409410
# basically forcing shut down of all child processes
@@ -498,6 +499,13 @@ def ok_to_submit(self, tid, backfill=False):
498499
softcpufactor=1.5
499500
sotmemfactor=1.5
500501

502+
# check semaphore
503+
sem = self.workflowspec['stages'][tid].get('semaphore')
504+
if sem != None:
505+
if self.semaphore_values[sem] > 0:
506+
return False
507+
508+
# check other resources
501509
if not backfill:
502510
# analyse CPU
503511
okcpu = (self.curcpubooked + float(self.cpuperid[tid]) <= self.cpulimit)
@@ -529,6 +537,37 @@ def ok_to_skip(self, tid):
529537
return True
530538
return False
531539

540+
def book_resources(self, tid, backfill = False):
541+
# books the resources used by a certain task
542+
# semaphores
543+
sem = self.workflowspec['stages'][tid].get('semaphore')
544+
if sem != None:
545+
self.semaphore_values[sem]+=1
546+
547+
# CPU + MEM
548+
if not backfill:
549+
self.curmembooked+=float(self.maxmemperid[tid])
550+
self.curcpubooked+=float(self.cpuperid[tid])
551+
else:
552+
self.curmembooked_backfill+=float(self.maxmemperid[tid])
553+
self.curcpubooked_backfill+=float(self.cpuperid[tid])
554+
555+
def unbook_resources(self, tid, backfill = False):
556+
# "frees" the nominal resources used by a certain task from the accounting
557+
# so that other jobs can be scheduled
558+
sem = self.workflowspec['stages'][tid].get('semaphore')
559+
if sem != None:
560+
self.semaphore_values[sem]-=1
561+
562+
# CPU + MEM
563+
if not backfill:
564+
self.curmembooked-=float(self.maxmemperid[tid])
565+
self.curcpubooked-=float(self.cpuperid[tid])
566+
else:
567+
self.curmembooked_backfill-=float(self.maxmemperid[tid])
568+
self.curcpubooked_backfill-=float(self.cpuperid[tid])
569+
570+
532571
def try_job_from_candidates(self, taskcandidates, process_list, finished):
533572
self.scheduling_iteration = self.scheduling_iteration + 1
534573

@@ -551,8 +590,7 @@ def try_job_from_candidates(self, taskcandidates, process_list, finished):
551590
if (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid):
552591
p=self.submit(tid)
553592
if p!=None:
554-
self.curmembooked+=float(self.maxmemperid[tid])
555-
self.curcpubooked+=float(self.cpuperid[tid])
593+
self.book_resources(tid)
556594
self.process_list.append((tid,p))
557595
taskcandidates.remove(tid)
558596
# minimal delay
@@ -568,8 +606,7 @@ def try_job_from_candidates(self, taskcandidates, process_list, finished):
568606
if (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid, backfill=True):
569607
p=self.submit(tid, 19)
570608
if p!=None:
571-
self.curmembooked_backfill+=float(self.maxmemperid[tid])
572-
self.curcpubooked_backfill+=float(self.cpuperid[tid])
609+
self.book_resources(tid, backfill=True)
573610
self.process_list.append((tid,p))
574611
taskcandidates.remove(tid) #-> not sure about this one
575612
# minimal delay
@@ -694,12 +731,7 @@ def waitforany(self, process_list, finished):
694731
if returncode!=None:
695732
actionlogger.info ('Task ' + str(pid) + ' ' + str(tid)+':'+str(self.idtotask[tid]) + ' finished with status ' + str(returncode))
696733
# account for cleared resources
697-
if self.nicevalues[tid]==os.nice(0):
698-
self.curmembooked-=float(self.maxmemperid[tid])
699-
self.curcpubooked-=float(self.cpuperid[tid])
700-
else:
701-
self.curmembooked_backfill-=float(self.maxmemperid[tid])
702-
self.curcpubooked_backfill-=float(self.cpuperid[tid])
734+
self.unbook_resources(tid, backfill = self.nicevalues[tid]!=os.nice(0) )
703735
self.procstatus[tid]='Done'
704736
finished.append(tid)
705737
process_list.remove(p)

MC/doc/WorkflowRunner.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,12 @@ still in development. Currently, it follows the following scheme:
6363
"needs": [ "task1" ],
6464
"resources": {
6565
"cpu": -1,
66-
"mem": -1
66+
"mem": -1,
6767
},
6868
"timeframe": 1,
6969
"labels": [ "DIGI", "ITS" ],
70-
"cwd": "tf1"
70+
"cwd": "tf1",
71+
"semaphore" : "sem1"
7172
}]
7273
"comments" : "A DPG MC workflow for production FOO"
7374
}
@@ -83,6 +84,7 @@ Further keys in this format are:
8384
| `cwd` | the workding directory where this is to be executed |
8485
| `label` | a list labels, describing this stage. Can be used to execute workfow in stages (such as 'do all digitization', 'run everthing for ITS' |
8586
| `env` | local environment variables needed by the task |
87+
| `semaphore` | (optional) Tasks are synchronized by a semaphore of that name. This can be used to exclude parallel execution of tasks (using the same semaphore name). |
8688

8789
While a workflow may be written by hand, it's more pratical to have it programmatically generated by sripts, that is sensitive to configuration and options. A current example following the PWGHF embedding exercise can be found here [create_embedding_workflow](https://github.com/AliceO2Group/O2DPG/blob/master/MC/run/PWGHF/create_embedding_workflow.py)
8890

0 commit comments

Comments
 (0)