Skip to content

Commit 89ed368

Browse files
committed
pipeline-runner: ability to specify logfile names
1 parent cf8a4bb commit 89ed368

File tree

1 file changed

+35
-30
lines changed

1 file changed

+35
-30
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@
2323

2424
sys.setrecursionlimit(100000)
2525

26+
import argparse
27+
import psutil
28+
max_system_mem=psutil.virtual_memory().total
29+
30+
# defining command line options
31+
parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.',
32+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
33+
34+
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
35+
parser.add_argument('-jmax','--maxjobs', help='Number of maximal parallel tasks.', default=100)
36+
parser.add_argument('--dry-run', action='store_true', help='Show what you would do.')
37+
parser.add_argument('--visualize-workflow', action='store_true', help='Saves a graph visualization of workflow.')
38+
parser.add_argument('--target-labels', nargs='+', help='Runs the pipeline by target labels (example "TPC" or "DIGI").\
39+
This condition is used as logical AND together with --target-tasks.', default=[])
40+
parser.add_argument('-tt','--target-tasks', nargs='+', help='Runs the pipeline by target tasks (example "tpcdigi"). By default everything in the graph is run. Regular expressions supported.', default=["*"])
41+
parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.')
42+
parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task (or pattern). All dependent jobs will be rerun.')
43+
parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true')
44+
45+
parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint', default=max_system_mem)
46+
parser.add_argument('--cpu-limit', help='Set CPU limit (core count)', default=8)
47+
parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\
48+
ust exist and the tasks file must be writable to with the current user.')
49+
parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,')
50+
parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel
51+
parser.add_argument('--checkpoint-on-failure', help=argparse.SUPPRESS) # debug option making a debug-tarball and sending to specified address
52+
# argument is alien-path
53+
parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used')
54+
parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used')
55+
args = parser.parse_args()
56+
2657
def setup_logger(name, log_file, level=logging.INFO):
2758
"""To setup as many loggers as you want"""
2859

@@ -36,10 +67,10 @@ def setup_logger(name, log_file, level=logging.INFO):
3667
return logger
3768

3869
# first file logger
39-
actionlogger = setup_logger('pipeline_action_logger', 'pipeline_action.log', level=logging.DEBUG)
70+
actionlogger = setup_logger('pipeline_action_logger', ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None], level=logging.DEBUG)
4071

4172
# second file logger
42-
metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log')
73+
metriclogger = setup_logger('pipeline_metric_logger', ('pipeline_metric_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None])
4374

4475
# for debugging without terminal access
4576
# TODO: integrate into standard logger
@@ -752,7 +783,8 @@ def waitforany(self, process_list, finished):
752783
if failuredetected and self.stoponfailure:
753784
actionlogger.info('Stoping pipeline due to failure in stages with PID ' + str(failingpids))
754785
# self.analyse_files_and_connections()
755-
self.cat_logfiles_tostdout(failingtasks)
786+
if self.args.stdout_on_failure:
787+
self.cat_logfiles_tostdout(failingtasks)
756788
self.send_checkpoint(failingtasks, self.args.checkpoint_on_failure)
757789
self.stop_pipeline_and_exit(process_list)
758790

@@ -1041,35 +1073,8 @@ def execute(self):
10411073
print ('\n**** Pipeline done *****\n')
10421074
# self.analyse_files_and_connections()
10431075

1044-
import argparse
1045-
import psutil
1046-
max_system_mem=psutil.virtual_memory().total
10471076

1048-
parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.',
1049-
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
10501077

1051-
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
1052-
parser.add_argument('-jmax','--maxjobs', help='Number of maximal parallel tasks.', default=100)
1053-
parser.add_argument('--dry-run', action='store_true', help='Show what you would do.')
1054-
parser.add_argument('--visualize-workflow', action='store_true', help='Saves a graph visualization of workflow.')
1055-
parser.add_argument('--target-labels', nargs='+', help='Runs the pipeline by target labels (example "TPC" or "DIGI").\
1056-
This condition is used as logical AND together with --target-tasks.', default=[])
1057-
parser.add_argument('-tt','--target-tasks', nargs='+', help='Runs the pipeline by target tasks (example "tpcdigi"). By default everything in the graph is run. Regular expressions supported.', default=["*"])
1058-
parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.')
1059-
parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task (or pattern). All dependent jobs will be rerun.')
1060-
parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true')
1061-
1062-
parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint', default=max_system_mem)
1063-
parser.add_argument('--cpu-limit', help='Set CPU limit (core count)', default=8)
1064-
parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\
1065-
ust exist and the tasks file must be writable to with the current user.')
1066-
parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,')
1067-
parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel
1068-
parser.add_argument('--checkpoint-on-failure', help=argparse.SUPPRESS) # debug option making a debug-tarball and sending to specified address
1069-
# argument is alien-path
1070-
1071-
args = parser.parse_args()
1072-
print (args)
10731078

10741079
if args.cgroup!=None:
10751080
myPID=os.getpid()

0 commit comments

Comments
 (0)