Skip to content

Commit 152f660

Browse files
committed
O2DPG workflow_runner: New early-file removal feature
This implements a major new feature in the O2DPG workflow/pipeline runner. The runner can now auto-delete artefacts from intermediate stages as soon as these artefacts are no longer needed. For example, we can delete TPC hits, as soon as TPC digitization finishes. This allows then to operate on smaller disc spaces or to simulate more timeframes within a job. To use the feature, one needs to provide a "file-access" report with `--remove-files-early access_report.json`. This report is a "learned" structure containing the list of files that are written/consumed in a workflow and by which task. Such report needs to be generated, in a prior pilot job with the same workflow, by a user with sudo rights. See here #2126. This is primarily useful for productions on the GRID, and the idea would be to (a) for each new MC production, we produce the file-access file in a pilot job or github actions when releasing software (b) we then use this file to optimize the disc space in MC productions on the GRID This development is related to https://its.cern.ch/jira/browse/O2-4365
1 parent 17810b6 commit 152f660

File tree

1 file changed

+107
-0
lines changed

1 file changed

+107
-0
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import traceback
1515
import platform
1616
import tarfile
17+
from copy import deepcopy
1718
try:
1819
from graphviz import Digraph
1920
havegraphviz=True
@@ -65,6 +66,9 @@
6566
parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried
6667
parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup
6768

69+
parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)")
70+
71+
6872
# Logging
6973
parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used')
7074
parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used')
@@ -894,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
894898
break
895899

896900

901+
def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
902+
"""
903+
A utility function for the fileaccess logic. Takes a template and duplicates
904+
for the multi-timeframe structure.
905+
"""
906+
tf_entries = [
907+
entry for entry in data.get("file_report", [])
908+
if re.match(r"^\./tf\d+/", entry["file"])
909+
]
910+
911+
result = {}
912+
for i in timeframes:
913+
if i == -1:
914+
continue
915+
# Deepcopy to avoid modifying original
916+
new_entries = deepcopy(tf_entries)
917+
for entry in new_entries:
918+
# Fix filepath
919+
entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"])
920+
# Fix written_by and read_by (preserve prefix, change numeric suffix)
921+
entry["written_by"] = [
922+
re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"]
923+
]
924+
entry["read_by"] = [
925+
re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"]
926+
]
927+
result[f"timeframe-{i}"] = new_entries
928+
929+
return result
930+
931+
932+
897933
class WorkflowExecutor:
898934
# Constructor
899935
def __init__(self, workflowfile, args, jmax=100):
@@ -929,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100):
929965
# construct task ID <-> task name lookup
930966
self.idtotask = [ 0 for _ in self.taskuniverse ]
931967
self.tasktoid = {}
968+
self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ]
932969
for i, name in enumerate(self.taskuniverse):
933970
self.tasktoid[name]=i
934971
self.idtotask[i]=name
@@ -970,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100):
9701007
# init alternative software environments
9711008
self.init_alternative_software_environments()
9721009

1010+
# initialize container to keep track of file-task relationsships
1011+
self.file_removal_candidates = {}
1012+
self.do_early_file_removal = False
1013+
self.timeframeset = set([ task["timeframe"] for task in self.workflowspec['stages'] ])
1014+
if args.remove_files_early != "":
1015+
with open(args.remove_files_early) as f:
1016+
filegraph_data = json.load(f)
1017+
self.do_early_file_removal = True
1018+
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset)
1019+
1020+
1021+
def perform_early_file_removal(self, taskids):
1022+
"""
1023+
This function checks which files can be deleted upon completion of task
1024+
and optionally does so.
1025+
"""
1026+
1027+
def remove_if_exists(filepath: str) -> None:
1028+
"""
1029+
Check if a file exists, and remove it if found.
1030+
"""
1031+
if os.path.exists(filepath):
1032+
fsize = os.path.getsize(filepath)
1033+
os.remove(filepath)
1034+
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.")
1035+
return True
1036+
1037+
return False
1038+
1039+
def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes):
1040+
marked_for_removal = []
1041+
1042+
timeframestoscan = [ timeframe_id ]
1043+
if timeframe_id == -1:
1044+
timeframestoscan = [ i for i in listofalltimeframes if i != -1 ]
1045+
1046+
# TODO: Note that this traversal of files is not certainly not optimal
1047+
# We should (and will) keep an mapping of tasks->potential files and just
1048+
# scan these. This is already provided by the FileIOGraph analysis tool.
1049+
for tid in timeframestoscan:
1050+
for i,file_entry in enumerate(file_dict[f"timeframe-{tid}"]):
1051+
filename = file_entry['file']
1052+
read_by = file_entry['read_by']
1053+
written_by = file_entry['written_by']
1054+
if taskname in read_by:
1055+
file_entry['read_by'].remove(taskname)
1056+
if taskname in written_by:
1057+
file_entry['written_by'].remove(taskname)
1058+
1059+
# TODO: in principle the written_by criterion might not be needed
1060+
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0:
1061+
# the filename mentioned here is no longer needed and we can remove it
1062+
# make sure it is there and then delete it
1063+
if remove_if_exists(filename):
1064+
# also take out the file entry from the dict altogether
1065+
marked_for_removal.append(file_entry)
1066+
1067+
#for k in marked_for_removal:
1068+
# file_dict[f"timeframe-{tid}"].remove(k)
1069+
1070+
for tid in taskids:
1071+
taskname = self.idtotask[tid]
1072+
timeframe_id = self.idtotf[tid]
1073+
remove_for_task_id(taskname, self.file_removal_candidates, timeframe_id, self.timeframeset)
1074+
1075+
9731076
def SIGHandler(self, signum, frame):
9741077
"""
9751078
basically forcing shut down of all child processes
@@ -1737,6 +1840,10 @@ def speedup_ROOT_Init():
17371840
actionlogger.debug("finished now :" + str(finished_from_started))
17381841
finishedtasks = finishedtasks + finished
17391842

1843+
# perform file cleanup
1844+
if self.do_early_file_removal:
1845+
self.perform_early_file_removal(finished_from_started)
1846+
17401847
if self.is_productionmode:
17411848
# we can do some generic cleanup of finished tasks in non-interactive/GRID mode
17421849
# TODO: this can run asynchronously

0 commit comments

Comments
 (0)