1414import traceback
1515import platform
1616import tarfile
17+ from copy import deepcopy
1718try :
1819 from graphviz import Digraph
1920 havegraphviz = True
6566parser .add_argument ('--retry-on-failure' , help = argparse .SUPPRESS , default = 0 ) # number of times a failing task is retried
6667parser .add_argument ('--no-rootinit-speedup' , help = argparse .SUPPRESS , action = 'store_true' ) # disable init of ROOT environment vars to speedup init/startup
6768
69+ parser .add_argument ('--remove-files-early' , type = str , default = "" , help = "Delete intermediate files early (using the file graph information in the given file)" )
70+
71+
6872# Logging
6973parser .add_argument ('--action-logfile' , help = 'Logfilename for action logs. If none given, pipeline_action_#PID.log will be used' )
7074parser .add_argument ('--metric-logfile' , help = 'Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used' )
@@ -85,7 +89,8 @@ def setup_logger(name, log_file, level=logging.INFO):
8589 return logger
8690
8791# first file logger
88- actionlogger = setup_logger ('pipeline_action_logger' , ('pipeline_action_' + str (os .getpid ()) + '.log' , args .action_logfile )[args .action_logfile != None ], level = logging .DEBUG )
92+ actionlogger_file = ('pipeline_action_' + str (os .getpid ()) + '.log' , args .action_logfile )[args .action_logfile != None ]
93+ actionlogger = setup_logger ('pipeline_action_logger' , actionlogger_file , level = logging .DEBUG )
8994
9095# second file logger
9196metriclogger = setup_logger ('pipeline_metric_logger' , ('pipeline_metric_' + str (os .getpid ()) + '.log' , args .action_logfile )[args .action_logfile != None ])
@@ -893,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
893898 break
894899
895900
901+ def filegraph_expand_timeframes (data : dict , timeframes : set ) -> dict :
902+ """
903+ A utility function for the fileaccess logic. Takes a template and duplicates
904+ for the multi-timeframe structure.
905+ """
906+ tf_entries = [
907+ entry for entry in data .get ("file_report" , [])
908+ if re .match (r"^\./tf\d+/" , entry ["file" ])
909+ ]
910+
911+ result = {}
912+ for i in timeframes :
913+ if i == - 1 :
914+ continue
915+ # Deepcopy to avoid modifying original
916+ new_entries = deepcopy (tf_entries )
917+ for entry in new_entries :
918+ # Fix filepath
919+ entry ["file" ] = re .sub (r"^\./tf\d+/" , f"./tf{ i } /" , entry ["file" ])
920+ # Fix written_by and read_by (preserve prefix, change numeric suffix)
921+ entry ["written_by" ] = [
922+ re .sub (r"_\d+$" , f"_{ i } " , w ) for w in entry ["written_by" ]
923+ ]
924+ entry ["read_by" ] = [
925+ re .sub (r"_\d+$" , f"_{ i } " , r ) for r in entry ["read_by" ]
926+ ]
927+ result [f"timeframe-{ i } " ] = new_entries
928+
929+ return result
930+
931+
932+
896933class WorkflowExecutor :
897934 # Constructor
898935 def __init__ (self , workflowfile , args , jmax = 100 ):
@@ -928,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100):
928965 # construct task ID <-> task name lookup
929966 self .idtotask = [ 0 for _ in self .taskuniverse ]
930967 self .tasktoid = {}
968+ self .idtotf = [ l ['timeframe' ] for l in self .workflowspec ['stages' ] ]
931969 for i , name in enumerate (self .taskuniverse ):
932970 self .tasktoid [name ]= i
933971 self .idtotask [i ]= name
@@ -969,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100):
9691007 # init alternative software environments
9701008 self .init_alternative_software_environments ()
9711009
1010+ # initialize container to keep track of file-task relationsships
1011+ self .file_removal_candidates = {}
1012+ self .do_early_file_removal = False
1013+ self .timeframeset = set ([ task ["timeframe" ] for task in self .workflowspec ['stages' ] ])
1014+ if args .remove_files_early != "" :
1015+ with open (args .remove_files_early ) as f :
1016+ filegraph_data = json .load (f )
1017+ self .do_early_file_removal = True
1018+ self .file_removal_candidates = filegraph_expand_timeframes (filegraph_data , self .timeframeset )
1019+
1020+
1021+ def perform_early_file_removal (self , taskids ):
1022+ """
1023+ This function checks which files can be deleted upon completion of task
1024+ and optionally does so.
1025+ """
1026+
1027+ def remove_if_exists (filepath : str ) -> None :
1028+ """
1029+ Check if a file exists, and remove it if found.
1030+ """
1031+ if os .path .exists (filepath ):
1032+ fsize = os .path .getsize (filepath )
1033+ os .remove (filepath )
1034+ actionlogger .info (f"Removing { filepath } since no longer needed. Freeing { fsize / 1024. } MB." )
1035+ return True
1036+
1037+ return False
1038+
1039+ def remove_for_task_id (taskname , file_dict , timeframe_id , listofalltimeframes ):
1040+ marked_for_removal = []
1041+
1042+ timeframestoscan = [ timeframe_id ]
1043+ if timeframe_id == - 1 :
1044+ timeframestoscan = [ i for i in listofalltimeframes if i != - 1 ]
1045+
1046+ # TODO: Note that this traversal of files is not certainly not optimal
1047+ # We should (and will) keep an mapping of tasks->potential files and just
1048+ # scan these. This is already provided by the FileIOGraph analysis tool.
1049+ for tid in timeframestoscan :
1050+ for i ,file_entry in enumerate (file_dict [f"timeframe-{ tid } " ]):
1051+ filename = file_entry ['file' ]
1052+ read_by = file_entry ['read_by' ]
1053+ written_by = file_entry ['written_by' ]
1054+ if taskname in read_by :
1055+ file_entry ['read_by' ].remove (taskname )
1056+ if taskname in written_by :
1057+ file_entry ['written_by' ].remove (taskname )
1058+
1059+ # TODO: in principle the written_by criterion might not be needed
1060+ if len (file_entry ['read_by' ]) == 0 and len (file_entry ['written_by' ]) == 0 :
1061+ # the filename mentioned here is no longer needed and we can remove it
1062+ # make sure it is there and then delete it
1063+ if remove_if_exists (filename ):
1064+ # also take out the file entry from the dict altogether
1065+ marked_for_removal .append (file_entry )
1066+
1067+ #for k in marked_for_removal:
1068+ # file_dict[f"timeframe-{tid}"].remove(k)
1069+
1070+ for tid in taskids :
1071+ taskname = self .idtotask [tid ]
1072+ timeframe_id = self .idtotf [tid ]
1073+ remove_for_task_id (taskname , self .file_removal_candidates , timeframe_id , self .timeframeset )
1074+
1075+
9721076 def SIGHandler (self , signum , frame ):
9731077 """
9741078 basically forcing shut down of all child processes
@@ -1736,6 +1840,10 @@ def speedup_ROOT_Init():
17361840 actionlogger .debug ("finished now :" + str (finished_from_started ))
17371841 finishedtasks = finishedtasks + finished
17381842
1843+ # perform file cleanup
1844+ if self .do_early_file_removal :
1845+ self .perform_early_file_removal (finished_from_started )
1846+
17391847 if self .is_productionmode :
17401848 # we can do some generic cleanup of finished tasks in non-interactive/GRID mode
17411849 # TODO: this can run asynchronously
@@ -1808,5 +1916,46 @@ def speedup_ROOT_Init():
18081916 exit (code )
18091917 actionlogger .info ("Running in cgroup" )
18101918
1811- executor = WorkflowExecutor (args .workflowfile ,jmax = int (args .maxjobs ),args = args )
1812- exit (executor .execute ())
1919+
1920+ # This starts the fanotify fileaccess monitoring process
1921+ # if asked for
1922+ o2dpg_filegraph_exec = os .getenv ("O2DPG_PRODUCE_FILEGRAPH" ) # switches filegraph monitoring on and contains the executable name
1923+ if o2dpg_filegraph_exec :
1924+ env = os .environ .copy ()
1925+ env ["FILEACCESS_MON_ROOTPATH" ] = os .getcwd ()
1926+ env ["MAXMOTHERPID" ] = f"{ os .getpid ()} "
1927+
1928+ fileaccess_log_file_name = f"pipeline_fileaccess_{ os .getpid ()} .log"
1929+ fileaccess_log_file = open (fileaccess_log_file_name , "w" )
1930+ fileaccess_monitor_proc = subprocess .Popen (
1931+ [o2dpg_filegraph_exec ],
1932+ stdout = fileaccess_log_file ,
1933+ stderr = subprocess .STDOUT ,
1934+ env = env )
1935+ else :
1936+ fileaccess_monitor_proc = None
1937+
1938+ try :
1939+ # This is core workflow runner invocation
1940+ executor = WorkflowExecutor (args .workflowfile ,jmax = int (args .maxjobs ),args = args )
1941+ rc = executor .execute ()
1942+ finally :
1943+ if fileaccess_monitor_proc :
1944+ fileaccess_monitor_proc .terminate () # sends SIGTERM
1945+ try :
1946+ fileaccess_monitor_proc .wait (timeout = 5 )
1947+ except subprocess .TimeoutExpired :
1948+ fileaccess_monitor_proc .kill () # force kill if not stopping
1949+ # now produce the final filegraph output
1950+ o2dpg_root = os .getenv ("O2DPG_ROOT" )
1951+ analyse_cmd = [
1952+ sys .executable , # runs with same Python interpreter
1953+ f"{ o2dpg_root } /UTILS/FileIOGraph/analyse_FileIO.py" ,
1954+ "--actionFile" , actionlogger_file ,
1955+ "--monitorFile" , fileaccess_log_file_name ,
1956+ "-o" , f"pipeline_fileaccess_report_{ os .getpid ()} .json" ,
1957+ "--basedir" , os .getcwd () ]
1958+ print (f"Producing FileIOGraph with command { analyse_cmd } " )
1959+ subprocess .run (analyse_cmd , check = True )
1960+
1961+ sys .exit (rc )
0 commit comments