Skip to content

Commit 2c43b1d

Browse files
committed
Refactor of anchor configuration logic
This commit refactors the transfer of anchoring configuration from async-reco scripts into the MC pipeline, addressing O2-5011 by replacing the previous whitelisting mechanism with a blacklisting approach. In addition, it introduces broader configuration/customization support: users can now inject arbitrary command-line options into any task via a `customize.json` file, reducing the need to edit workflow scripts. For example: { "ConfigParams": { "EMCSimParam": { "mBusyTime": "11.11" } }, "Executables": { "o2-ft0-reco-workflow": { "filtered": { "--my-custom-option": "foo" } }, "ft0fv0emcctp_digi": { "filtered": { "--another-custom-option": "baz" } } } } This allows for flexible injection of both configKeyValues and CLI options via `o2dpg_sim_workflow.py --overwrite-config customize.json`.
1 parent 506cec9 commit 2c43b1d

File tree

6 files changed

+779
-207
lines changed

6 files changed

+779
-207
lines changed

MC/bin/o2dpg_dpl_config_tools.py

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
#!/usr/bin/env python3
2+
3+
import json
4+
import re
5+
import sys
6+
from collections import defaultdict
7+
from copy import deepcopy
8+
import os
9+
from o2dpg_workflow_utils import merge_dicts
10+
11+
BUILTIN_BLACKLIST = {
12+
"--session", "--severity", "--shm-segment-id", "--shm-segment-size",
13+
"--resources-monitoring", "--resources-monitoring-dump-interval",
14+
"--delay", "--loop", "--early-forward-policy", "--fairmq-rate-logging",
15+
"--pipeline", "--disable-mc", "--disable-root-input", "--timeframes-rate-limit",
16+
"--timeframes-rate-limit-ipcid"
17+
}
18+
19+
def parse_command_string(cmd_str):
20+
"""
21+
Parse a DPL command string into structured config_base:
22+
{
23+
"executable": str,
24+
"options": {key: val, ...},
25+
"configKeyValues": {"Group": {subkey: val}}
26+
}
27+
"""
28+
tokens = re.findall(r'(?:[^\s"\'=]+|"[^"]*"|\'[^\']*\')+', cmd_str.strip())
29+
if not tokens:
30+
return {}
31+
32+
exe = tokens[0]
33+
opts = {}
34+
config_keyvals_raw = None
35+
36+
i = 1
37+
while i < len(tokens):
38+
token = tokens[i]
39+
if token.startswith('--') or (token.startswith('-') and len(token) == 2):
40+
key = token # preserve the dash prefix: "-b" or "--run-number"
41+
if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'):
42+
value = tokens[i + 1].strip('"').strip("'")
43+
i += 1
44+
else:
45+
value = True
46+
if key == "--configKeyValues":
47+
config_keyvals_raw = value
48+
else:
49+
opts[key] = value
50+
i += 1
51+
52+
config_kv_parsed, config_groups = {}, set()
53+
if config_keyvals_raw:
54+
config_kv_parsed, config_groups = parse_configKeyValues_block(config_keyvals_raw)
55+
56+
return {
57+
"executable": exe,
58+
"options": opts,
59+
"configKeyValues": config_kv_parsed,
60+
"configKeyGroups": sorted(config_groups)
61+
}
62+
63+
def parse_command_string_symmetric(cmd_str, configname = None):
64+
"""
65+
Parses a DPL command string into the same structure as parse_workflow_config(...):
66+
{
67+
"ConfigParams": { group: {key: value, ...} },
68+
"Executables": {
69+
"o2-executable": {
70+
"full": {...},
71+
"filtered": {...},
72+
"blacklisted": [],
73+
"configKeyValues": [group, ...]
74+
}
75+
}
76+
}
77+
"""
78+
tokens = re.findall(r'(?:[^\s"\'=]+|"[^"]*"|\'[^\']*\')+', cmd_str.strip())
79+
if not tokens:
80+
return {}
81+
82+
exe = os.path.basename(tokens[0]) if configname == None else configname
83+
opts = {}
84+
config_kv_raw = None
85+
86+
i = 1
87+
while i < len(tokens):
88+
token = tokens[i]
89+
if token.startswith('--') or (token.startswith('-') and len(token) == 2):
90+
key = token # preserve the dash prefix: "-b" or "--run-number"
91+
if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'):
92+
value = tokens[i + 1].strip('"').strip("'")
93+
i += 1
94+
else:
95+
value = True
96+
if key == "--configKeyValues":
97+
config_kv_raw = value
98+
else:
99+
opts[key] = value
100+
i += 1
101+
102+
config_params = {}
103+
config_key_groups = []
104+
105+
if config_kv_raw:
106+
parsed_kv, groups = parse_configKeyValues_block(config_kv_raw)
107+
config_params = parsed_kv
108+
config_key_groups = sorted(groups)
109+
110+
return {
111+
"ConfigParams": config_params,
112+
"Executables": {
113+
exe: {
114+
"full": opts,
115+
"filtered": dict(opts),
116+
"blacklisted": [],
117+
"configKeyValues": config_key_groups
118+
}
119+
}
120+
}
121+
122+
123+
def parse_configKeyValues_block(raw_value):
124+
result = defaultdict(dict)
125+
seen_groups = set()
126+
raw_value = raw_value.replace('\\"', '"').replace("\\'", "'")
127+
parts = raw_value.split(";")
128+
for part in parts:
129+
part = part.strip()
130+
if not part or "=" not in part:
131+
continue
132+
key, val = part.split("=", 1)
133+
key = key.strip()
134+
val = val.strip()
135+
if "." in key:
136+
group, subkey = key.split(".", 1)
137+
result[group][subkey] = val
138+
seen_groups.add(group)
139+
return dict(result), seen_groups
140+
141+
def log_line(logger, message):
142+
if logger is None or logger == sys.stdout:
143+
print(message)
144+
elif isinstance(logger, str):
145+
with open(logger, "a") as f:
146+
f.write(message + "\n")
147+
else:
148+
logger.write(message + "\n")
149+
150+
def modify_dpl_command(cmd_str, config_anchor, allow_overwrite=False, logger=None, configname=None):
151+
# check if cmd_str is given as list, in which case we transfrom to string
152+
if isinstance(cmd_str, list) == True:
153+
cmd_str = " ".join(filter(None, cmd_str))
154+
155+
base = parse_command_string(cmd_str)
156+
exe = base["executable"]
157+
existing_opts = base["options"]
158+
existing_kv = base["configKeyValues"]
159+
160+
# Start building new command
161+
new_cmd = [exe]
162+
added = []
163+
overwritten = []
164+
165+
exe_basename = os.path.basename(exe) if configname == None else configname
166+
anchor_exec = None
167+
if "Executables" in config_anchor:
168+
anchor_exec = config_anchor["Executables"].get(exe_basename, None)
169+
if anchor_exec == None:
170+
# try without the Executable section
171+
anchor_exec = config_anchor.get(exe_basename, None)
172+
173+
if anchor_exec == None:
174+
print(f"[WARN] No anchor config found for {exe}")
175+
return cmd_str
176+
177+
anchor_opts = anchor_exec.get("filtered", {})
178+
anchor_kv_groups = anchor_exec.get("configKeyValues", [])
179+
180+
# --- Step 1: Reconstruct executable and its CLI options
181+
new_cmd = [exe]
182+
added = []
183+
overwritten = []
184+
185+
# Step 1: Existing options (preserved or overwritten)
186+
for key, val in existing_opts.items():
187+
if allow_overwrite and key in anchor_opts:
188+
val = anchor_opts[key]
189+
overwritten.append(key)
190+
new_cmd.append(f"{key} {val}" if val is not True else f"{key}")
191+
192+
# Step 2: Add missing options from anchor
193+
for key, val in anchor_opts.items():
194+
if key not in existing_opts:
195+
new_cmd.append(f"{key} {val}" if val is not True else f"{key}")
196+
added.append(key)
197+
198+
# what about config-key values (should already be done) Merge configKeyValues
199+
merged_kv = deepcopy(existing_kv)
200+
# for group in anchor_kv_groups:
201+
# group_kvs = config_anchor.get("ConfigParams", {}).get(group, {})
202+
# if group not in merged_kv:
203+
# merged_kv[group] = group_kvs
204+
# elif allow_overwrite:
205+
# merged_kv[group].update(group_kvs)
206+
207+
if merged_kv:
208+
kv_flat = [f"{g}.{k}={v}" for g, kv in merged_kv.items() for k, v in kv.items()]
209+
new_cmd.append('--configKeyValues "' + ";".join(kv_flat) + '"')
210+
211+
# log changes
212+
log_line(logger, f"\n[Executable: {exe}]")
213+
if added:
214+
log_line(logger, f" Added options: {added}")
215+
if overwritten:
216+
log_line(logger, f" Overwritten options: {overwritten}")
217+
if not added and not overwritten:
218+
log_line(logger, f" No changes made to command.")
219+
220+
return " ".join(new_cmd)
221+
222+
# CLI: Parse log + blacklist into output.json
223+
def parse_configKeyValues(raw_value):
224+
return parse_configKeyValues_block(raw_value)
225+
226+
def parse_workflow_config(log_path):
227+
config_params = defaultdict(dict)
228+
executables = {}
229+
230+
with open(log_path) as f:
231+
for line in f:
232+
line = line.strip()
233+
if not line or line.startswith("#"):
234+
continue
235+
236+
parsed = parse_command_string(line)
237+
exe = parsed["executable"]
238+
config_groups_used = parsed["configKeyGroups"]
239+
full_opts = parsed["options"]
240+
241+
for group, kv in parsed["configKeyValues"].items():
242+
config_params[group].update(kv)
243+
244+
executables[exe] = {
245+
"configKeyValues": sorted(config_groups_used),
246+
"full": full_opts
247+
}
248+
249+
return config_params, executables
250+
251+
252+
def apply_blacklist(executables, blacklist_cfg):
253+
for exe, data in executables.items():
254+
full_opts = data["full"]
255+
exe_blacklist = set(blacklist_cfg.get(exe, []))
256+
total_blacklist = BUILTIN_BLACKLIST.union(exe_blacklist)
257+
258+
blacklisted = []
259+
filtered = {}
260+
261+
for key, val in full_opts.items():
262+
if key in total_blacklist:
263+
blacklisted.append(key)
264+
else:
265+
filtered[key] = val
266+
267+
data["blacklisted"] = sorted(blacklisted)
268+
data["filtered"] = filtered
269+
data["full"] = deepcopy(full_opts) # keep original
270+
return executables
271+
272+
def dpl_option_from_config(config, dpl_workflow, key, default_value = None):
273+
"""
274+
Utility to extract a DPL option for workflow dpl_workflow from
275+
the configuration dict "config". The configuration is:
276+
- either a flattish JSON produced by older tool parse-async-WorkflowConfig.py
277+
- more structured version produced by o2dpg_dpl_config_tools (this tool)
278+
"""
279+
if "Executables" in config:
280+
# new standard
281+
return config["Executables"].get(dpl_workflow,{}).get("filtered",{}).get(key, default_value)
282+
else:
283+
# backward compatible versions
284+
dpl_workflow_key = dpl_workflow + '-options'
285+
if dpl_workflow_key in config:
286+
return config.get(dpl_workflow_key, {}).get(key, default_value)
287+
dpl_workflow_key = dpl_workflow_key
288+
if dpl_workflow_key in config:
289+
return config.get(dpl_workflow_key, {}).get(key, default_value)
290+
return default_value
291+
292+
def main():
293+
if len(sys.argv) == 4:
294+
log_path = sys.argv[1]
295+
blacklist_path = sys.argv[2]
296+
output_path = sys.argv[3]
297+
298+
with open(blacklist_path) as f:
299+
blacklist_data = json.load(f)
300+
301+
config_params, executables = parse_workflow_config(log_path)
302+
executables = apply_blacklist(executables, blacklist_data)
303+
304+
result = {
305+
"ConfigParams": dict(config_params),
306+
"Executables": executables
307+
}
308+
309+
with open(output_path, "w") as out:
310+
json.dump(result, out, indent=2)
311+
312+
print(f"[INFO] Wrote structured config to: {output_path}")
313+
else:
314+
print("Usage:")
315+
print(" CLI parsing: python3 dpl_config_tools.py workflowconfig.log blacklist.json output.json")
316+
print(" Python usage: import and call parse_command_string() or modify_dpl_command()")
317+
318+
319+
class TaskFinalizer:
320+
def __init__(self, anchor_config, allow_overwrite=False, logger=None):
321+
self.anchor_config = anchor_config
322+
self.allow_overwrite = allow_overwrite
323+
self.logger = logger
324+
self.final_config = {
325+
"ConfigParams": {},
326+
"Executables": {}
327+
}
328+
329+
def __call__(self, cmd_str_or_list, configname = None):
330+
final_cmd = modify_dpl_command(cmd_str_or_list, self.anchor_config.get("Executables",{}), logger=self.logger, configname=configname)
331+
this_config_final = parse_command_string_symmetric(final_cmd, configname)
332+
print (this_config_final)
333+
merge_dicts (self.final_config, this_config_final)
334+
return final_cmd
335+
336+
def dump_collected_config(self, path):
337+
with open(path, "w") as f:
338+
json.dump(self.final_config, f, indent=2)
339+
340+
341+
if __name__ == "__main__":
342+
main()

0 commit comments

Comments
 (0)