Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import traceback
import platform
import tarfile
from copy import deepcopy
try:
from graphviz import Digraph
havegraphviz=True
Expand Down Expand Up @@ -65,6 +66,9 @@
parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried
parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup

parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)")


# Logging
parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used')
parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used')
Expand Down Expand Up @@ -894,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
break


def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict:
"""
A utility function for the fileaccess logic. Takes a template and duplicates
for the multi-timeframe structure.
"""
tf_entries = [
entry for entry in data.get("file_report", [])
if re.match(r"^\./tf\d+/", entry["file"])
]

result = {}
for i in timeframes:
if i == -1:
continue
# Deepcopy to avoid modifying original
new_entries = deepcopy(tf_entries)
for entry in new_entries:
# Fix filepath
entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"])
# Fix written_by and read_by (preserve prefix, change numeric suffix)
entry["written_by"] = [
re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"]
]
entry["read_by"] = [
re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"]
]
result[f"timeframe-{i}"] = new_entries

return result



class WorkflowExecutor:
# Constructor
def __init__(self, workflowfile, args, jmax=100):
Expand Down Expand Up @@ -929,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100):
# construct task ID <-> task name lookup
self.idtotask = [ 0 for _ in self.taskuniverse ]
self.tasktoid = {}
self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ]
for i, name in enumerate(self.taskuniverse):
self.tasktoid[name]=i
self.idtotask[i]=name
Expand Down Expand Up @@ -970,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100):
# init alternative software environments
self.init_alternative_software_environments()

# initialize container to keep track of file-task relationsships
self.file_removal_candidates = {}
self.do_early_file_removal = False
self.timeframeset = set([ task["timeframe"] for task in self.workflowspec['stages'] ])
if args.remove_files_early != "":
with open(args.remove_files_early) as f:
filegraph_data = json.load(f)
self.do_early_file_removal = True
self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset)


def perform_early_file_removal(self, taskids):
"""
This function checks which files can be deleted upon completion of task
and optionally does so.
"""

def remove_if_exists(filepath: str) -> None:
"""
Check if a file exists, and remove it if found.
"""
if os.path.exists(filepath):
fsize = os.path.getsize(filepath)
os.remove(filepath)
actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.")
return True

return False

def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes):
marked_for_removal = []

timeframestoscan = [ timeframe_id ]
if timeframe_id == -1:
timeframestoscan = [ i for i in listofalltimeframes if i != -1 ]

# TODO: Note that this traversal of files is not certainly not optimal
# We should (and will) keep an mapping of tasks->potential files and just
# scan these. This is already provided by the FileIOGraph analysis tool.
for tid in timeframestoscan:
for i,file_entry in enumerate(file_dict[f"timeframe-{tid}"]):
filename = file_entry['file']
read_by = file_entry['read_by']
written_by = file_entry['written_by']
if taskname in read_by:
file_entry['read_by'].remove(taskname)
if taskname in written_by:
file_entry['written_by'].remove(taskname)

# TODO: in principle the written_by criterion might not be needed
if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0:
# the filename mentioned here is no longer needed and we can remove it
# make sure it is there and then delete it
if remove_if_exists(filename):
# also take out the file entry from the dict altogether
marked_for_removal.append(file_entry)

#for k in marked_for_removal:
# file_dict[f"timeframe-{tid}"].remove(k)

for tid in taskids:
taskname = self.idtotask[tid]
timeframe_id = self.idtotf[tid]
remove_for_task_id(taskname, self.file_removal_candidates, timeframe_id, self.timeframeset)


def SIGHandler(self, signum, frame):
"""
basically forcing shut down of all child processes
Expand Down Expand Up @@ -1737,6 +1840,10 @@ def speedup_ROOT_Init():
actionlogger.debug("finished now :" + str(finished_from_started))
finishedtasks = finishedtasks + finished

# perform file cleanup
if self.do_early_file_removal:
self.perform_early_file_removal(finished_from_started)

if self.is_productionmode:
# we can do some generic cleanup of finished tasks in non-interactive/GRID mode
# TODO: this can run asynchronously
Expand Down