Skip to content

Commit 8eb8228

Browse files
committed
O2DPG workflow: Integration of fileaccess reporting
* improvements to analyse_FileIO.py * o2dpg_workflow_runner.py: Integrated option to produce fileaccess reports
1 parent 9d0149b commit 8eb8228

File tree

3 files changed

+56
-6
lines changed

3 files changed

+56
-6
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ def setup_logger(name, log_file, level=logging.INFO):
8585
return logger
8686

8787
# first file logger
88-
actionlogger = setup_logger('pipeline_action_logger', ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None], level=logging.DEBUG)
88+
actionlogger_file = ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None]
89+
actionlogger = setup_logger('pipeline_action_logger', actionlogger_file, level=logging.DEBUG)
8990

9091
# second file logger
9192
metriclogger = setup_logger('pipeline_metric_logger', ('pipeline_metric_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None])
@@ -1808,5 +1809,46 @@ def speedup_ROOT_Init():
18081809
exit(code)
18091810
actionlogger.info("Running in cgroup")
18101811

1811-
executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args)
1812-
exit (executor.execute())
1812+
1813+
# This starts the fanotify fileaccess monitoring process
1814+
# if asked for
1815+
o2dpg_filegraph_exec = os.getenv("O2DPG_PRODUCE_FILEGRAPH") # switches filegraph monitoring on and contains the executable name
1816+
if o2dpg_filegraph_exec:
1817+
env = os.environ.copy()
1818+
env["FILEACCESS_MON_ROOTPATH"] = os.getcwd()
1819+
env["MAXMOTHERPID"] = f"{os.getpid()}"
1820+
1821+
fileaccess_log_file_name = f"pipeline_fileaccess_{os.getpid()}.log"
1822+
fileaccess_log_file = open(fileaccess_log_file_name, "w")
1823+
fileaccess_monitor_proc = subprocess.Popen(
1824+
[o2dpg_filegraph_exec],
1825+
stdout=fileaccess_log_file,
1826+
stderr=subprocess.STDOUT,
1827+
env=env)
1828+
else:
1829+
fileaccess_monitor_proc = None
1830+
1831+
try:
1832+
# This is core workflow runner invocation
1833+
executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args)
1834+
rc = executor.execute()
1835+
finally:
1836+
if fileaccess_monitor_proc:
1837+
fileaccess_monitor_proc.terminate() # sends SIGTERM
1838+
try:
1839+
fileaccess_monitor_proc.wait(timeout=5)
1840+
except subprocess.TimeoutExpired:
1841+
fileaccess_monitor_proc.kill() # force kill if not stopping
1842+
# now produce the final filegraph output
1843+
o2dpg_root = os.getenv("O2DPG_ROOT")
1844+
analyse_cmd = [
1845+
sys.executable, # runs with same Python interpreter
1846+
f"{o2dpg_root}/UTILS/FileIOGraph/analyse_FileIO.py",
1847+
"--actionFile", actionlogger_file,
1848+
"--monitorFile", fileaccess_log_file_name,
1849+
"-o", f"pipeline_fileaccess_report_{os.getpid()}.json",
1850+
"--basedir", os.getcwd() ]
1851+
print (f"Producing FileIOGraph with command {analyse_cmd}")
1852+
subprocess.run(analyse_cmd, check=True)
1853+
1854+
sys.exit(rc)

UTILS/FileIOGraph/analyse_FileIO.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
# the run-number of data taking or default if unanchored
2323
parser.add_argument('--actionFile', type=str, help="O2DPG pipeline runner action file")
2424
parser.add_argument('--monitorFile', type=str, help="monitoring file provided by fanotify tool. See O2DPG/UTILS/FileIOGraph.")
25-
parser.add_argument('--basedir', type=str, help="O2DPG workflow dir")
25+
parser.add_argument('--basedir', default="/", type=str, help="O2DPG workflow dir")
2626
parser.add_argument('--file-filters', nargs='+', default=[r'.*'], help="Filters (regular expressions) to select files (default all = '.*')")
2727
parser.add_argument('--graphviz', type=str, help="Produce a graphviz plot")
2828
parser.add_argument('-o','--output', type=str, help="Output JSON report")
@@ -60,7 +60,8 @@
6060
file_written_task = {}
6161
file_consumed_task = {}
6262

63-
pattern = re.compile(args.basedir + r'([^,]+),((?:read|write)),(.*)')
63+
pattern = re.compile(r'"?([^"]+)"?,((?:read|write)),(.*)')
64+
basedir_pattern = re.compile("^" + args.basedir)
6465
# neglecting some framework file names
6566
file_exclude_filter = re.compile(r'(.*)\.log(.*)|(ccdb/log)|(.*)dpl-config\.json')
6667

@@ -76,6 +77,13 @@
7677
mode = match.group(2)
7778
pids = match.group(3).split(";")
7879

80+
# see if matches the workdir
81+
if not basedir_pattern.match(file_name):
82+
continue
83+
84+
# remove basedir from file_name
85+
file_name = file_name.replace(args.basedir + '/', "./", 1)
86+
7987
# implement file name filter
8088
if file_exclude_filter.match(file_name):
8189
continue

UTILS/FileIOGraph/monitor_fileaccess_v2.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ int main(int argc, char **argv)
129129
auto ROOT_PATH_ENV = getenv("FILEACCESS_MON_ROOTPATH");
130130
std::string root_path = "/";
131131
if (ROOT_PATH_ENV) {
132-
std::cerr << "Observing file access below " << root_path << "\n";
133132
root_path = std::string(ROOT_PATH_ENV);
133+
std::cerr << "Observing file access below " << root_path << "\n";
134134
}
135135

136136
CHK(fan = fanotify_init(FAN_CLASS_NOTIF, O_RDONLY), -1);

0 commit comments

Comments
 (0)