1010import logging
1111import os
1212import signal
13+ import socket
1314import sys
1415import traceback
1516try :
@@ -401,6 +402,8 @@ def __init__(self, workflowfile, args, jmax=100):
401402 self .nicevalues = [ os .nice (0 ) for tid in range (len (self .taskuniverse )) ]
402403 self .internalmonitorcounter = 0 # internal use
403404 self .internalmonitorid = 0 # internal use
405+ self .tids_marked_toretry = [] # sometimes we might want to retry a failed task (simply because it was "unlucky") and we put them here
406+ self .retry_counter = [ 0 for tid in range (len (self .taskuniverse )) ] # we keep track of many times retried already
404407
405408 def SIGHandler (self , signum , frame ):
406409 # basically forcing shut down of all child processes
@@ -529,18 +532,23 @@ def ok_to_skip(self, tid):
529532 def try_job_from_candidates (self , taskcandidates , process_list , finished ):
530533 self .scheduling_iteration = self .scheduling_iteration + 1
531534
532- # the ordinary process list part
533- initialcandidates = taskcandidates .copy ()
534- for tid in initialcandidates :
535- actionlogger .debug ("trying to submit " + str (tid ) + ':' + str (self .idtotask [tid ]))
536- # check early if we could skip
537- # better to do it here (instead of relying on taskwrapper)
535+ # remove "done / skippable" tasks immediately
536+ tasks_skipped = False
537+ for tid in taskcandidates .copy (): # <--- the copy is important !! otherwise this loop is not doing what you think
538538 if self .ok_to_skip (tid ):
539539 finished .append (tid )
540540 taskcandidates .remove (tid )
541- break #---> we break in order to preserve some ordering (the next candidate tried should be daughters of skipped job)
541+ tasks_skipped = True
542+ actionlogger .info ("Skipping task " + str (self .idtotask [tid ]))
543+
544+ # if tasks_skipped:
545+ # return # ---> we return early in order to preserve some ordering (the next candidate tried should be daughters of skipped jobs)
542546
543- elif (len (self .process_list ) + len (self .backfill_process_list ) < self .max_jobs_parallel ) and self .ok_to_submit (tid ):
547+ # the ordinary process list part
548+ initialcandidates = taskcandidates .copy ()
549+ for tid in initialcandidates :
550+ actionlogger .debug ("trying to submit " + str (tid ) + ':' + str (self .idtotask [tid ]))
551+ if (len (self .process_list ) + len (self .backfill_process_list ) < self .max_jobs_parallel ) and self .ok_to_submit (tid ):
544552 p = self .submit (tid )
545553 if p != None :
546554 self .curmembooked += float (self .maxmemperid [tid ])
@@ -679,49 +687,120 @@ def waitforany(self, process_list, finished):
679687
680688 for p in list (process_list ):
681689 pid = p [1 ].pid
690+ tid = p [0 ] # the task id of this process
682691 returncode = 0
683692 if not self .args .dry_run :
684693 returncode = p [1 ].poll ()
685694 if returncode != None :
686- actionlogger .info ('Task ' + str (pid ) + ' ' + str (p [ 0 ] )+ ':' + str (self .idtotask [p [ 0 ] ]) + ' finished with status ' + str (returncode ))
695+ actionlogger .info ('Task ' + str (pid ) + ' ' + str (tid )+ ':' + str (self .idtotask [tid ]) + ' finished with status ' + str (returncode ))
687696 # account for cleared resources
688- if self .nicevalues [p [ 0 ] ]== os .nice (0 ):
689- self .curmembooked -= float (self .maxmemperid [p [ 0 ] ])
690- self .curcpubooked -= float (self .cpuperid [p [ 0 ] ])
697+ if self .nicevalues [tid ]== os .nice (0 ):
698+ self .curmembooked -= float (self .maxmemperid [tid ])
699+ self .curcpubooked -= float (self .cpuperid [tid ])
691700 else :
692- self .curmembooked_backfill -= float (self .maxmemperid [p [ 0 ] ])
693- self .curcpubooked_backfill -= float (self .cpuperid [p [ 0 ] ])
694- self .procstatus [p [ 0 ] ]= 'Done'
695- finished .append (p [ 0 ] )
701+ self .curmembooked_backfill -= float (self .maxmemperid [tid ])
702+ self .curcpubooked_backfill -= float (self .cpuperid [tid ])
703+ self .procstatus [tid ]= 'Done'
704+ finished .append (tid )
696705 process_list .remove (p )
697- if returncode != 0 :
698- failuredetected = True
699- failingpids .append (pid )
700- failingtasks .append (p [0 ])
706+ if returncode != 0 :
707+ print (str (tid ) + ' failed ... checking retry' )
708+ # we inspect if this is something "unlucky" which could be resolved by a simple rebsumit
709+ if self .is_worth_retrying (tid ) and self .retry_counter [tid ] < 2 :
710+ print (str (tid ) + ' to be retried' )
711+ actionlogger .info ('Task ' + str (self .idtotask [tid ]) + ' failed but marked to be retried ' )
712+ self .tids_marked_toretry .append (tid )
713+ self .retry_counter [tid ] += 1
714+
715+ else :
716+ failuredetected = True
717+ failingpids .append (pid )
718+ failingtasks .append (tid )
701719
702720 if failuredetected and self .stoponfailure :
703721 actionlogger .info ('Stoping pipeline due to failure in stages with PID ' + str (failingpids ))
704722 # self.analyse_files_and_connections()
705723 self .cat_logfiles_tostdout (failingtasks )
706-
724+ self . send_checkpoint ( failingtasks , self . args . checkpoint_on_failure )
707725 self .stop_pipeline_and_exit (process_list )
708726
709727 # empty finished means we have to wait more
710728 return len (finished )== 0
711729
730+
731+ def get_logfile (self , tid ):
732+ # determines the logfile name for this task
733+ taskspec = self .workflowspec ['stages' ][tid ]
734+ taskname = taskspec ['name' ]
735+ filename = taskname + '.log'
736+ directory = taskspec ['cwd' ]
737+ return directory + '/' + filename
738+
739+
740+ def is_worth_retrying (self , tid ):
741+ # This checks for some signatures in logfiles that indicate that a retry of this task
742+ # might have a chance.
743+ # Ideally, this should be made user configurable. Either the user could inject a lambda
744+ # or a regular expression to use. For now we just put a hard coded list
745+ logfile = self .get_logfile (tid )
746+
747+ # 1) ZMQ_EVENT + interrupted system calls (DPL bug during shutdown)
748+ # Not sure if grep is faster than native Python text search ...
749+ status = os .system ('grep "failed setting ZMQ_EVENTS" ' + logfile + ' &> /dev/null' )
750+ if os .WEXITSTATUS (status ) == 0 :
751+ return True
752+
753+ return False
754+
755+
712756 def cat_logfiles_tostdout (self , taskids ):
713757 # In case of errors we can cat the logfiles for this taskname
714758 # to stdout. Assuming convention that "taskname" translates to "taskname.log" logfile.
715759 for tid in taskids :
716- taskspec = self .workflowspec ['stages' ][tid ]
717- taskname = taskspec ['name' ]
718- filename = taskname + '.log'
719- directory = taskspec ['cwd' ]
720- path = directory + '/' + filename
721- if os .path .exists (path ):
722- print (' ----> START OF LOGFILE ' , path , ' -----' )
723- os .system ('cat ' + path )
724- print (' <---- END OF LOGFILE ' , path , ' -----' )
760+ logfile = self .get_logfile (tid )
761+ if os .path .exists (logfile ):
762+ print (' ----> START OF LOGFILE ' , logfile , ' -----' )
763+ os .system ('cat ' + logfile )
764+ print (' <---- END OF LOGFILE ' , logfile , ' -----' )
765+
766+ def send_checkpoint (self , taskids , location ):
767+ # Makes a tarball containing all files in the base dir
768+ # (timeframe independent) and the dir with corrupted timeframes
769+ # and copies it to a specific ALIEN location. Not are core function
770+ # just some tool get hold on error conditions appearing on the GRID.
771+
772+ def get_tar_command (dir = './' , flags = 'cf' , filename = 'checkpoint.tar' ):
773+ return 'find ' + str (dir ) + ' -maxdepth 1 -type f -print0 | xargs -0 tar ' + str (flags ) + ' ' + str (filename )
774+
775+ if location != None :
776+ print ('Making a failure checkpoint' )
777+ # let's determine a filename from ALIEN_PROC_ID - hostname - and PID
778+
779+ aliprocid = os .environ .get ('ALIEN_PROC_ID' )
780+ if aliprocid == None :
781+ aliprocid = 0
782+
783+ fn = 'pipeline_checkpoint_ALIENPROC' + str (aliprocid ) + '_PID' + str (os .getpid ()) + '_HOST' + socket .gethostname () + '.tar'
784+ actionlogger .info ("Checkpointing to file " + fn )
785+ tarcommand = get_tar_command (filename = fn )
786+ actionlogger .info ("Taring " + tarcommand )
787+
788+ # first of all the base directory
789+ os .system (tarcommand )
790+ # then we add stuff for the specific timeframes ids if any
791+ for tid in taskids :
792+ taskspec = self .workflowspec ['stages' ][tid ]
793+ directory = taskspec ['cwd' ]
794+ if directory != "./" :
795+ tarcommand = get_tar_command (dir = directory , flags = 'rf' , filename = fn )
796+ actionlogger .info ("Tar command is " + tarcommand )
797+ os .system (tarcommand )
798+
799+ # location needs to be an alien path of the form alien:///foo/bar/
800+ copycommand = 'alien.py cp ' + fn + ' ' + str (location ) + '@disk:1'
801+ actionlogger .info ("Copying to alien " + copycommand )
802+ os .system (copycommand )
803+
725804
726805 def analyse_files_and_connections (self ):
727806 for p ,s in self .pid_to_files .items ():
@@ -835,8 +914,8 @@ def execute(self):
835914 exit (0 )
836915
837916 if args .produce_script != None :
838- self .produce_script (args .produce_script )
839- exit (0 )
917+ self .produce_script (args .produce_script )
918+ exit (0 )
840919
841920 if args .rerun_from :
842921 reruntaskfound = False
@@ -858,7 +937,7 @@ def execute(self):
858937
859938 self .process_list = [] # list of tuples of nodes ids and Popen subprocess instances
860939
861- finishedtasks = []
940+ finishedtasks = [] # global list of finished tasks
862941 try :
863942
864943 while True :
@@ -868,15 +947,15 @@ def execute(self):
868947 # remove weights
869948 candidates = [ tid for tid ,_ in candidates ]
870949
871- finished = []
950+ finished = [] # --> to account for finished because already done or skipped
872951 actionlogger .debug ('Sorted current candidates: ' + str ([(c ,self .idtotask [c ]) for c in candidates ]))
873952 self .try_job_from_candidates (candidates , self .process_list , finished )
874953 if len (candidates ) > 0 and len (self .process_list ) == 0 :
875954 actionlogger .info ("Not able to make progress: Nothing scheduled although non-zero candidate set" )
876955 send_webhook (self .args .webhook ,"Unable to make further progress: Quitting" )
877956 break
878957
879- finished_from_started = []
958+ finished_from_started = [] # to account for finished when actually started
880959 while self .waitforany (self .process_list , finished_from_started ):
881960 if not args .dry_run :
882961 self .monitor (self .process_list ) # ---> make async to normal operation?
@@ -886,9 +965,13 @@ def execute(self):
886965
887966 finished = finished + finished_from_started
888967 actionlogger .debug ("finished now :" + str (finished_from_started ))
889- finishedtasks = finishedtasks + finished
890-
891- # someone returned
968+ finishedtasks = finishedtasks + finished
969+
970+ # if a task was marked as "retry" we simply put it back into the candidate list
971+ if len (self .tids_marked_toretry ) > 0 :
972+ candidates = candidates + self .tids_marked_toretry
973+ self .tids_marked_toretry = []
974+
892975 # new candidates
893976 for tid in finished :
894977 if self .possiblenexttask .get (tid )!= None :
@@ -939,6 +1022,8 @@ def execute(self):
9391022 ust exist and the tasks file must be writable to with the current user.' )
9401023parser .add_argument ('--stdout-on-failure' , action = 'store_true' , help = 'Print log files of failing tasks to stdout,' )
9411024parser .add_argument ('--webhook' , help = argparse .SUPPRESS ) # log some infos to this webhook channel
1025+ parser .add_argument ('--checkpoint-on-failure' , help = argparse .SUPPRESS ) # debug option making a debug-tarball and sending to specified address
1026+ # argument is alien-path
9421027
9431028args = parser .parse_args ()
9441029print (args )
0 commit comments