Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def setup_logger(name, log_file, level=logging.INFO):
return logger

# first file logger
actionlogger = setup_logger('pipeline_action_logger', ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None], level=logging.DEBUG)
actionlogger_file = ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None]
actionlogger = setup_logger('pipeline_action_logger', actionlogger_file, level=logging.DEBUG)

# second file logger
metriclogger = setup_logger('pipeline_metric_logger', ('pipeline_metric_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None])
Expand Down Expand Up @@ -1808,5 +1809,46 @@ def speedup_ROOT_Init():
exit(code)
actionlogger.info("Running in cgroup")

executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args)
exit (executor.execute())

# This starts the fanotify fileaccess monitoring process
# if asked for
o2dpg_filegraph_exec = os.getenv("O2DPG_PRODUCE_FILEGRAPH") # switches filegraph monitoring on and contains the executable name
if o2dpg_filegraph_exec:
env = os.environ.copy()
env["FILEACCESS_MON_ROOTPATH"] = os.getcwd()
env["MAXMOTHERPID"] = f"{os.getpid()}"

fileaccess_log_file_name = f"pipeline_fileaccess_{os.getpid()}.log"
fileaccess_log_file = open(fileaccess_log_file_name, "w")
fileaccess_monitor_proc = subprocess.Popen(
[o2dpg_filegraph_exec],
stdout=fileaccess_log_file,
stderr=subprocess.STDOUT,
env=env)
else:
fileaccess_monitor_proc = None

try:
# This is core workflow runner invocation
executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args)
rc = executor.execute()
finally:
if fileaccess_monitor_proc:
fileaccess_monitor_proc.terminate() # sends SIGTERM
try:
fileaccess_monitor_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
fileaccess_monitor_proc.kill() # force kill if not stopping
# now produce the final filegraph output
o2dpg_root = os.getenv("O2DPG_ROOT")
analyse_cmd = [
sys.executable, # runs with same Python interpreter
f"{o2dpg_root}/UTILS/FileIOGraph/analyse_FileIO.py",
"--actionFile", actionlogger_file,
"--monitorFile", fileaccess_log_file_name,
"-o", f"pipeline_fileaccess_report_{os.getpid()}.json",
"--basedir", os.getcwd() ]
print (f"Producing FileIOGraph with command {analyse_cmd}")
subprocess.run(analyse_cmd, check=True)

sys.exit(rc)
12 changes: 10 additions & 2 deletions UTILS/FileIOGraph/analyse_FileIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# the run-number of data taking or default if unanchored
parser.add_argument('--actionFile', type=str, help="O2DPG pipeline runner action file")
parser.add_argument('--monitorFile', type=str, help="monitoring file provided by fanotify tool. See O2DPG/UTILS/FileIOGraph.")
parser.add_argument('--basedir', type=str, help="O2DPG workflow dir")
parser.add_argument('--basedir', default="/", type=str, help="O2DPG workflow dir")
parser.add_argument('--file-filters', nargs='+', default=[r'.*'], help="Filters (regular expressions) to select files (default all = '.*')")
parser.add_argument('--graphviz', type=str, help="Produce a graphviz plot")
parser.add_argument('-o','--output', type=str, help="Output JSON report")
Expand Down Expand Up @@ -60,7 +60,8 @@
file_written_task = {}
file_consumed_task = {}

pattern = re.compile(args.basedir + r'([^,]+),((?:read|write)),(.*)')
pattern = re.compile(r'"?([^"]+)"?,((?:read|write)),(.*)')
basedir_pattern = re.compile("^" + args.basedir)
# neglecting some framework file names
file_exclude_filter = re.compile(r'(.*)\.log(.*)|(ccdb/log)|(.*)dpl-config\.json')

Expand All @@ -76,6 +77,13 @@
mode = match.group(2)
pids = match.group(3).split(";")

# see if matches the workdir
if not basedir_pattern.match(file_name):
continue

# remove basedir from file_name
file_name = file_name.replace(args.basedir + '/', "./", 1)

# implement file name filter
if file_exclude_filter.match(file_name):
continue
Expand Down
2 changes: 1 addition & 1 deletion UTILS/FileIOGraph/monitor_fileaccess_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ int main(int argc, char **argv)
auto ROOT_PATH_ENV = getenv("FILEACCESS_MON_ROOTPATH");
std::string root_path = "/";
if (ROOT_PATH_ENV) {
std::cerr << "Observing file access below " << root_path << "\n";
root_path = std::string(ROOT_PATH_ENV);
std::cerr << "Observing file access below " << root_path << "\n";
}

CHK(fan = fanotify_init(FAN_CLASS_NOTIF, O_RDONLY), -1);
Expand Down