|
14 | 14 | import traceback |
15 | 15 | import platform |
16 | 16 | import tarfile |
| 17 | +from copy import deepcopy |
17 | 18 | try: |
18 | 19 | from graphviz import Digraph |
19 | 20 | havegraphviz=True |
|
65 | 66 | parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried |
66 | 67 | parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup |
67 | 68 |
|
| 69 | +parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)") |
| 70 | + |
| 71 | + |
68 | 72 | # Logging |
69 | 73 | parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used') |
70 | 74 | parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used') |
@@ -894,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5) |
894 | 898 | break |
895 | 899 |
|
896 | 900 |
|
| 901 | +def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict: |
| 902 | + """ |
| 903 | + A utility function for the fileaccess logic. Takes a template and duplicates |
| 904 | + for the multi-timeframe structure. |
| 905 | + """ |
| 906 | + tf_entries = [ |
| 907 | + entry for entry in data.get("file_report", []) |
| 908 | + if re.match(r"^\./tf\d+/", entry["file"]) |
| 909 | + ] |
| 910 | + |
| 911 | + result = {} |
| 912 | + for i in timeframes: |
| 913 | + if i == -1: |
| 914 | + continue |
| 915 | + # Deepcopy to avoid modifying original |
| 916 | + new_entries = deepcopy(tf_entries) |
| 917 | + for entry in new_entries: |
| 918 | + # Fix filepath |
| 919 | + entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"]) |
| 920 | + # Fix written_by and read_by (preserve prefix, change numeric suffix) |
| 921 | + entry["written_by"] = [ |
| 922 | + re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"] |
| 923 | + ] |
| 924 | + entry["read_by"] = [ |
| 925 | + re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"] |
| 926 | + ] |
| 927 | + result[f"timeframe-{i}"] = new_entries |
| 928 | + |
| 929 | + return result |
| 930 | + |
| 931 | + |
| 932 | + |
897 | 933 | class WorkflowExecutor: |
898 | 934 | # Constructor |
899 | 935 | def __init__(self, workflowfile, args, jmax=100): |
@@ -929,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100): |
929 | 965 | # construct task ID <-> task name lookup |
930 | 966 | self.idtotask = [ 0 for _ in self.taskuniverse ] |
931 | 967 | self.tasktoid = {} |
| 968 | + self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ] |
932 | 969 | for i, name in enumerate(self.taskuniverse): |
933 | 970 | self.tasktoid[name]=i |
934 | 971 | self.idtotask[i]=name |
@@ -970,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100): |
970 | 1007 | # init alternative software environments |
971 | 1008 | self.init_alternative_software_environments() |
972 | 1009 |
|
| 1010 | + # initialize container to keep track of file-task relationsships |
| 1011 | + self.file_removal_candidates = {} |
| 1012 | + self.do_early_file_removal = False |
| 1013 | + self.timeframeset = set([ task["timeframe"] for task in self.workflowspec['stages'] ]) |
| 1014 | + if args.remove_files_early != "": |
| 1015 | + with open(args.remove_files_early) as f: |
| 1016 | + filegraph_data = json.load(f) |
| 1017 | + self.do_early_file_removal = True |
| 1018 | + self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset) |
| 1019 | + |
| 1020 | + |
| 1021 | + def perform_early_file_removal(self, taskids): |
| 1022 | + """ |
| 1023 | + This function checks which files can be deleted upon completion of task |
| 1024 | + and optionally does so. |
| 1025 | + """ |
| 1026 | + |
| 1027 | + def remove_if_exists(filepath: str) -> None: |
| 1028 | + """ |
| 1029 | + Check if a file exists, and remove it if found. |
| 1030 | + """ |
| 1031 | + if os.path.exists(filepath): |
| 1032 | + fsize = os.path.getsize(filepath) |
| 1033 | + os.remove(filepath) |
| 1034 | + actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.") |
| 1035 | + return True |
| 1036 | + |
| 1037 | + return False |
| 1038 | + |
| 1039 | + def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes): |
| 1040 | + marked_for_removal = [] |
| 1041 | + |
| 1042 | + timeframestoscan = [ timeframe_id ] |
| 1043 | + if timeframe_id == -1: |
| 1044 | + timeframestoscan = [ i for i in listofalltimeframes if i != -1 ] |
| 1045 | + |
| 1046 | + # TODO: Note that this traversal of files is not certainly not optimal |
| 1047 | + # We should (and will) keep an mapping of tasks->potential files and just |
| 1048 | + # scan these. This is already provided by the FileIOGraph analysis tool. |
| 1049 | + for tid in timeframestoscan: |
| 1050 | + for i,file_entry in enumerate(file_dict[f"timeframe-{tid}"]): |
| 1051 | + filename = file_entry['file'] |
| 1052 | + read_by = file_entry['read_by'] |
| 1053 | + written_by = file_entry['written_by'] |
| 1054 | + if taskname in read_by: |
| 1055 | + file_entry['read_by'].remove(taskname) |
| 1056 | + if taskname in written_by: |
| 1057 | + file_entry['written_by'].remove(taskname) |
| 1058 | + |
| 1059 | + # TODO: in principle the written_by criterion might not be needed |
| 1060 | + if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0: |
| 1061 | + # the filename mentioned here is no longer needed and we can remove it |
| 1062 | + # make sure it is there and then delete it |
| 1063 | + if remove_if_exists(filename): |
| 1064 | + # also take out the file entry from the dict altogether |
| 1065 | + marked_for_removal.append(file_entry) |
| 1066 | + |
| 1067 | + #for k in marked_for_removal: |
| 1068 | + # file_dict[f"timeframe-{tid}"].remove(k) |
| 1069 | + |
| 1070 | + for tid in taskids: |
| 1071 | + taskname = self.idtotask[tid] |
| 1072 | + timeframe_id = self.idtotf[tid] |
| 1073 | + remove_for_task_id(taskname, self.file_removal_candidates, timeframe_id, self.timeframeset) |
| 1074 | + |
| 1075 | + |
973 | 1076 | def SIGHandler(self, signum, frame): |
974 | 1077 | """ |
975 | 1078 | basically forcing shut down of all child processes |
@@ -1737,6 +1840,10 @@ def speedup_ROOT_Init(): |
1737 | 1840 | actionlogger.debug("finished now :" + str(finished_from_started)) |
1738 | 1841 | finishedtasks = finishedtasks + finished |
1739 | 1842 |
|
| 1843 | + # perform file cleanup |
| 1844 | + if self.do_early_file_removal: |
| 1845 | + self.perform_early_file_removal(finished_from_started) |
| 1846 | + |
1740 | 1847 | if self.is_productionmode: |
1741 | 1848 | # we can do some generic cleanup of finished tasks in non-interactive/GRID mode |
1742 | 1849 | # TODO: this can run asynchronously |
|
0 commit comments