Skip to content

Commit 6dafe74

Browse files
Helpers and Tooling (merge, insert) for graph workflows; Analysis example
1 parent 0fc0fd0 commit 6dafe74

File tree

5 files changed

+467
-30
lines changed

5 files changed

+467
-30
lines changed

MC/bin/README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Workflow editing
2+
3+
The tool `$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py` provides some management of workflow files.
4+
5+
## General help
6+
7+
```bash
8+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py [sub-command] --help
9+
```
10+
shows the available sub-commands and for each sub-command, a dedicated help message is provided.
11+
12+
13+
## Create an empty workflow file
14+
15+
```bash
16+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py create my_workflow
17+
```
18+
creates a new file `my_workflow.json` (the extension `.json` can be left out in the command and would be added automatically)
19+
20+
## Add task skeletons to a workflow file
21+
22+
New task skeletons can be added with its name by
23+
```bash
24+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py create my_workflow --add-task task1 [task2 [...]]
25+
```
26+
27+
Regarding the command line to be executet, the required `${O2_ROOT}/share/scripts/jobutils.sh; taskwrapper` is prepended automatically.
28+
29+
## Update number of workers (in case of using relative number of workers)
30+
31+
The number of workers can be updated by (in this case specifying 9 workers)
32+
```bash
33+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py nworkers my_workflow 9
34+
```
35+
36+
## Merge 2 workflow files
37+
38+
Merging of 2 workflow files is done via
39+
```bash
40+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py merge workflow1 workflow2 [-o workflow_merged]
41+
```
42+
43+
If no output filename is provided, the default will be `workflow_merged`. Of course, after that the number of workers can be updated based on the merged one.
44+
45+
## Inspect workflow
46+
47+
This doesn't do much at the moment, but can be foreseen to be equipped with more functionality
48+
```bash
49+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py inspect my_workflow --summary
50+
```
51+
yields a very brief summary of `my_workflow`, whereas
52+
```bash
53+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py inspect my_workflow --check
54+
```
55+
conducts a quick sanity check, for instance checking whether any task names are duplicated or any dependencies are missing.
56+
57+
## Modifying a single task
58+
A task can be updated via the command line and the is no need to do so inside the `JSON` file. To change the dependent tasks, for instance, do
59+
```bash
60+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py modify --needs dep_task_1 [dep_task_2 [...]]
61+
```
62+
As usual, type
63+
```bash
64+
$O2DPG_ROOT/MC/bin/o2dpg-workflow-tools.py modify --help
65+
```
66+
to see all options.
67+
68+
69+
70+
71+

MC/bin/o2dpg-workflow-tools.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#!/usr/bin/env python3
2+
3+
import sys
4+
from os.path import join, dirname, exists
5+
import argparse
6+
7+
sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils'))
8+
9+
from o2dpg_workflow_utils import createTask, read_workflow, dump_workflow, check_workflow, update_workflow_resource_requirements, make_workflow_filename
10+
11+
def extend(args):
12+
"""extend a workflow by another one
13+
14+
The overall configuration from the original workflow
15+
is kept
16+
"""
17+
# load workflows
18+
workflow_orig = read_workflow(args.orig_wf)
19+
workflow_extend = read_workflow(args.extend_wf)
20+
21+
# extend
22+
workflow_orig.extend(workflow_extend)
23+
24+
# dump in new file
25+
filename = args.output if args.output else args.orig_wf
26+
dump_workflow(workflow_orig, filename)
27+
28+
29+
def create(args):
30+
"""create an empty workflow skeleton or add task skeletons to existing workflow
31+
"""
32+
filename = make_workflow_filename(args.file)
33+
if not args.add_task and exists(filename):
34+
print(f"Workflow file {filename} does already exist. Delete it and try again")
35+
return
36+
if not args.add_task or not exists(filename):
37+
# just create an empty workflow
38+
dump_workflow([], filename)
39+
if args.add_task:
40+
# add another task skeleton with name
41+
workflow = read_workflow(filename)
42+
for name in args.add_task:
43+
workflow.append(createTask(name=name))
44+
dump_workflow(workflow, filename)
45+
46+
47+
def find_task(workflow, task_name):
48+
for s in workflow:
49+
if s["name"] == task_name:
50+
return s
51+
return None
52+
53+
54+
def modify(args):
55+
56+
if args.task:
57+
workflow = read_workflow(args.file)
58+
# try to find the requested task
59+
task = find_task(workflow, args.task)
60+
if not task:
61+
print(f"Task with name {args.task} does not exist")
62+
exit(1)
63+
for attr in ("name", "needs", "timeframe", "cwd", "labels", "cmd"):
64+
if hasattr(args, attr) and getattr(args, attr) is not None:
65+
task[attr] = getattr(args, attr)
66+
for attr in ("cpu", "relative_cpu", "mem"):
67+
if hasattr(args, attr) and getattr(args, attr) is not None:
68+
task["resources"][attr] = getattr(args, attr)
69+
70+
dump_workflow(workflow, args.file)
71+
72+
73+
def nworkers(args):
74+
workflow = read_workflow(args.file)
75+
update_workflow_resource_requirements(workflow, args.jobs)
76+
dump_workflow(workflow, args.file)
77+
78+
79+
def inspect(args):
80+
"""Inspecting a workflow
81+
82+
This is at the moment more show-casing what one could do
83+
"""
84+
workflow = read_workflow(args.file)
85+
if args.check:
86+
check_workflow(workflow)
87+
if args.summary:
88+
summary_workflow(workflow)
89+
if args.task:
90+
task = find_task(workflow, args.task)
91+
if not task:
92+
print(f"Task with name {args.task}")
93+
exit(1)
94+
print("Here are the requested task information")
95+
print(task)
96+
97+
98+
def main():
99+
100+
parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow')
101+
102+
sub_parsers = parser.add_subparsers(dest="command")
103+
104+
create_parser = sub_parsers.add_parser("create", help="manage a workflow")
105+
create_parser.set_defaults(func=create)
106+
create_parser.add_argument("file", help="workflow file to be created or modifed")
107+
create_parser.add_argument("--add-task", dest="add_task", nargs="+", help="add named tasks to workflow file")
108+
109+
# Append to (sim) workflow
110+
merge_parser = sub_parsers.add_parser("merge", help="append stages")
111+
merge_parser.set_defaults(func=extend)
112+
merge_parser.add_argument("orig_wf", help="original workflow")
113+
merge_parser.add_argument("extend_wf", help="workflow JSON to be merged to orig")
114+
merge_parser.add_argument("--output", "-o", help="extended workflow output file name", default="workflow_merged.json")
115+
116+
nworker_parser = sub_parsers.add_parser("nworkers", help="update number of workers")
117+
nworker_parser.set_defaults(func=nworkers)
118+
nworker_parser.add_argument("file", help="the workflow file to be modified")
119+
nworker_parser.add_argument("jobs", type=int, help="number of workers to recompute relative cpu")
120+
121+
modify_parser = sub_parsers.add_parser("modify", help="modify a task")
122+
modify_parser.set_defaults(func=modify)
123+
modify_parser.add_argument("file", help="the workflow file to be modified")
124+
modify_parser.add_argument("task", help="name of task to be modified")
125+
# not allowing for changing the name at the moment as this also goes into the log-file name
126+
#modify_parser.add_argument("--name", help="new name of this task")
127+
modify_parser.add_argument("--needs", nargs="+", help="required tasks to be executed before this one")
128+
modify_parser.add_argument("--timeframe", type=int, help="timeframe")
129+
modify_parser.add_argument("--cwd", help="current working directory of this task")
130+
modify_parser.add_argument("--labels", nargs="+", help="attached labels")
131+
modify_parser.add_argument("--cpu", type=int, help="absolute number of workers to be used for this task")
132+
modify_parser.add_argument("--relative-cpu", dest="relative_cpu", type=float, help="realtive fraction of maximum number of available workers")
133+
modify_parser.add_argument("--mem", type=int, help="estimated memory")
134+
modify_parser.add_argument("--cmd", help="command line to be executed")
135+
136+
inspect_parser = sub_parsers.add_parser("inspect", help="inspect a workflow")
137+
inspect_parser.set_defaults(func=inspect)
138+
inspect_parser.add_argument("file", help="Workflow file to inspect")
139+
inspect_parser.add_argument("--summary", action="store_true", help="print summary of workflow")
140+
inspect_parser.add_argument("--check", action="store_true", help="Check sanity of workflow")
141+
142+
args = parser.parse_args()
143+
144+
if not hasattr(args, "func"):
145+
parser.parse_args(["--help"])
146+
exit(0)
147+
148+
args.func(args)
149+
150+
151+
if __name__ == "__main__":
152+
# provide this also stand-alone if called directly from the interpreter
153+
main()

MC/bin/o2dpg_sim_workflow.py

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@
1717
# -col pp -eA 2.510 -proc "ccbar" --embedding
1818
#
1919

20+
import sys
2021
import argparse
2122
from os import environ
23+
from os.path import join, dirname
2224
import json
2325
import array as arr
2426

27+
sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils'))
28+
29+
from o2dpg_workflow_utils import createTask, dump_workflow
30+
2531
parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow')
2632

2733
parser.add_argument('-ns',help='number of signal events / timeframe', default=20)
@@ -96,20 +102,6 @@
96102
workflow={}
97103
workflow['stages'] = []
98104

99-
def relativeCPU(n_rel, n_workers=NWORKERS):
100-
# compute number of CPUs from a given number of workers
101-
# n_workers and a fraction n_rel
102-
# catch cases where n_rel > 1 or n_workers * n_rel
103-
return round(min(n_workers, n_workers * n_rel), 2)
104-
105-
taskcounter=0
106-
def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=1, relative_cpu=None, mem=500):
107-
if relative_cpu is not None:
108-
# Re-compute, if relative number of CPUs requested
109-
cpu = relativeCPU(relative_cpu)
110-
global taskcounter
111-
taskcounter = taskcounter + 1
112-
return { 'name': name, 'cmd':'', 'needs': needs, 'resources': { 'cpu': cpu , 'mem': mem }, 'timeframe' : tf, 'labels' : lab, 'cwd' : cwd }
113105

114106
def getDPL_global_options(bigshm=False,nosmallrate=False):
115107
common="-b --run --fairmq-ipc-prefix ${FAIRMQ_IPC_PREFIX:-./} --driver-client-backend ws:// " + ('--rate 1000','')[nosmallrate]
@@ -535,21 +527,6 @@ def createRestDigiTask(name, det='ALLSMALLER'):
535527
workflow['stages'].append(TFcleanup);
536528

537529

538-
def trimString(cmd):
539-
return ' '.join(cmd.split())
540-
541-
# insert taskwrapper stuff
542-
for s in workflow['stages']:
543-
s['cmd']='. ${O2_ROOT}/share/scripts/jobutils.sh; taskwrapper ' + s['name']+'.log \'' + s['cmd'] + '\''
544-
545-
# remove whitespaces etc
546-
for s in workflow['stages']:
547-
s['cmd']=trimString(s['cmd'])
548-
549-
550-
# write workflow to json
551-
workflowfile=args.o
552-
with open(workflowfile, 'w') as outfile:
553-
json.dump(workflow, outfile, indent=2)
530+
dump_workflow(workflow["stages"], args.o)
554531

555532
exit (0)

0 commit comments

Comments
 (0)