@@ -40,6 +40,12 @@ def setup_logger(name, log_file, level=logging.INFO):
4040# second file logger
4141metriclogger = setup_logger ('pipeline_metric_logger' , 'pipeline_metric.log' )
4242
43+ # for debugging without terminal access
44+ # TODO: integrate into standard logger
45+ def send_webhook (hook , t ):
46+ if hook != None :
47+ command = "curl -X POST -H 'Content-type: application/json' --data '{\" text\" :\" " + str (t ) + "\" }' " + str (hook ) + " &> /dev/null"
48+ os .system (command )
4349
4450# A fallback solution to getting all child procs
4551# in case psutil has problems (PermissionError).
@@ -570,7 +576,6 @@ def stop_pipeline_and_exit(self, process_list):
570576
571577 exit (1 )
572578
573-
574579 def monitor (self , process_list ):
575580 self .internalmonitorcounter += 1
576581 if self .internalmonitorcounter % 5 != 0 :
@@ -650,6 +655,7 @@ def monitor(self, process_list):
650655
651656 resources_per_task [tid ]= {'iter' :self .internalmonitorid , 'name' :self .idtotask [tid ], 'cpu' :totalCPU , 'uss' :totalUSS / 1024. / 1024. , 'pss' :totalPSS / 1024. / 1024 , 'nice' :proc .nice (), 'swap' :totalSWAP , 'label' :self .workflowspec ['stages' ][tid ]['labels' ]}
652657 metriclogger .info (resources_per_task [tid ])
658+ send_webhook (self .args .webhook , resources_per_task )
653659
654660 for r in resources_per_task .values ():
655661 if r ['nice' ]== os .nice (0 ):
@@ -679,7 +685,7 @@ def waitforany(self, process_list, finished):
679685 if returncode != None :
680686 actionlogger .info ('Task ' + str (pid ) + ' ' + str (p [0 ])+ ':' + str (self .idtotask [p [0 ]]) + ' finished with status ' + str (returncode ))
681687 # account for cleared resources
682- if self .nicevalues [p [0 ]]== 0 : # --> change for a more robust way
688+ if self .nicevalues [p [0 ]]== os . nice ( 0 ):
683689 self .curmembooked -= float (self .maxmemperid [p [0 ]])
684690 self .curcpubooked -= float (self .cpuperid [p [0 ]])
685691 else :
@@ -865,6 +871,10 @@ def execute(self):
865871 finished = []
866872 actionlogger .debug ('Sorted current candidates: ' + str ([(c ,self .idtotask [c ]) for c in candidates ]))
867873 self .try_job_from_candidates (candidates , self .process_list , finished )
874+ if len (candidates ) > 0 and len (self .process_list ) == 0 :
875+ actionlogger .info ("Not able to make progress: Nothing scheduled although non-zero candidate set" )
876+ send_webhook (self .args .webhook ,"Unable to make further progress: Quitting" )
877+ break
868878
869879 finished_from_started = []
870880 while self .waitforany (self .process_list , finished_from_started ):
@@ -889,6 +899,7 @@ def execute(self):
889899 candidates .append (candid )
890900
891901 actionlogger .debug ("New candidates " + str ( candidates ))
902+ send_webhook (self .args .webhook , "New candidates " + str (candidates ))
892903
893904 if len (candidates )== 0 and len (self .process_list )== 0 :
894905 break
@@ -927,6 +938,7 @@ def execute(self):
927938parser .add_argument ('--cgroup' , help = 'Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\
928939 ust exist and the tasks file must be writable to with the current user.' )
929940parser .add_argument ('--stdout-on-failure' , action = 'store_true' , help = 'Print log files of failing tasks to stdout,' )
941+ parser .add_argument ('--webhook' , help = argparse .SUPPRESS ) # log some infos to this webhook channel
930942
931943args = parser .parse_args ()
932944print (args )
0 commit comments