Skip to content

Commit 750fb32

Browse files
committed
Refactor of anchor configuration logic
This commit refactors the transfer of anchoring configuration from async-reco scripts into the MC pipeline, addressing O2-5011 by replacing the previous whitelisting mechanism with a blacklisting approach. In addition, it introduces broader configuration/customization support: users can now inject arbitrary command-line options into any task via a `customize.json` file, reducing the need to edit workflow scripts. For example: { "ConfigParams": { "EMCSimParam": { "mBusyTime": "11.11" } }, "Executables": { "o2-ft0-reco-workflow": { "filtered": { "--my-custom-option": "foo" } }, "ft0fv0emcctp_digi": { "filtered": { "--another-custom-option": "baz" } } } } This allows for flexible injection of both configKeyValues and CLI options via `o2dpg_sim_workflow.py --overwrite-config customize.json`.
1 parent e227d82 commit 750fb32

File tree

6 files changed

+816
-215
lines changed

6 files changed

+816
-215
lines changed

MC/bin/o2dpg_dpl_config_tools.py

Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
#!/usr/bin/env python3
2+
3+
import json
4+
import re
5+
import sys
6+
from collections import defaultdict
7+
from copy import deepcopy
8+
import os
9+
from o2dpg_workflow_utils import merge_dicts
10+
import shlex
11+
12+
BUILTIN_BLACKLIST = {
13+
"--session", "--severity", "--shm-segment-id", "--shm-segment-size",
14+
"--resources-monitoring", "--resources-monitoring-dump-interval",
15+
"--delay", "--loop", "--early-forward-policy", "--fairmq-rate-logging",
16+
"--pipeline", "--disable-mc", "--disable-root-input", "--timeframes-rate-limit",
17+
"--timeframes-rate-limit-ipcid",
18+
"--lumi-type", # TPC corrections are treated separately in o2dpg_sim_workflow
19+
"--corrmap-lumi-mode", # TPC corrections are treated separately in o2dpg_sim_workflow
20+
"--enable-M-shape-correction" # TPC correction option not needed in MC
21+
}
22+
23+
def parse_command_string(cmd_str):
24+
"""
25+
Parse a DPL command string into structured config_base:
26+
{
27+
"executable": str,
28+
"options": {key: val, ...},
29+
"configKeyValues": {"Group": {subkey: val}}
30+
}
31+
"""
32+
try:
33+
tokens = shlex.split(cmd_str, posix=False)
34+
except ValueError as e:
35+
print(f"[ERROR] Failed to parse command string: {cmd_str}")
36+
raise e
37+
38+
if not tokens:
39+
return {}
40+
41+
exe = tokens[0]
42+
opts = {}
43+
config_keyvals_raw = None
44+
45+
i = 1
46+
while i < len(tokens):
47+
token = tokens[i]
48+
if token.startswith('--') or (token.startswith('-') and len(token) == 2):
49+
key = token # preserve the dash prefix: "-b" or "--run-number"
50+
if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'):
51+
value = tokens[i + 1].strip('"').strip("'")
52+
i += 1
53+
else:
54+
value = True
55+
if key == "--configKeyValues":
56+
config_keyvals_raw = value
57+
else:
58+
opts[key] = value
59+
i += 1
60+
61+
config_kv_parsed, config_groups = {}, set()
62+
if config_keyvals_raw:
63+
config_kv_parsed, config_groups = parse_configKeyValues_block(config_keyvals_raw)
64+
65+
return {
66+
"executable": exe,
67+
"options": opts,
68+
"configKeyValues": config_kv_parsed,
69+
"configKeyGroups": sorted(config_groups)
70+
}
71+
72+
def parse_command_string_symmetric(cmd_str, configname = None):
73+
"""
74+
Parses a DPL command string into the same structure as parse_workflow_config(...):
75+
{
76+
"ConfigParams": { group: {key: value, ...} },
77+
"Executables": {
78+
"o2-executable": {
79+
"full": {...},
80+
"filtered": {...},
81+
"blacklisted": [],
82+
"configKeyValues": [group, ...]
83+
}
84+
}
85+
}
86+
"""
87+
try:
88+
tokens = shlex.split(cmd_str, posix=False)
89+
except ValueError as e:
90+
print(f"[ERROR] Failed to parse command string: {cmd_str}")
91+
raise e
92+
93+
if not tokens:
94+
return {}
95+
96+
exe = os.path.basename(tokens[0]) if configname == None else configname
97+
opts = {}
98+
config_kv_raw = None
99+
100+
i = 1
101+
while i < len(tokens):
102+
token = tokens[i]
103+
if token.startswith('--') or (token.startswith('-') and len(token) == 2):
104+
key = token # preserve the dash prefix: "-b" or "--run-number"
105+
if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'):
106+
value = tokens[i + 1].strip('"').strip("'")
107+
i += 1
108+
else:
109+
value = True
110+
if key == "--configKeyValues":
111+
config_kv_raw = value
112+
else:
113+
opts[key] = value
114+
i += 1
115+
116+
config_params = {}
117+
config_key_groups = []
118+
119+
if config_kv_raw:
120+
parsed_kv, groups = parse_configKeyValues_block(config_kv_raw)
121+
config_params = parsed_kv
122+
config_key_groups = sorted(groups)
123+
124+
return {
125+
"ConfigParams": config_params,
126+
"Executables": {
127+
exe: {
128+
"full": opts,
129+
"filtered": dict(opts),
130+
"blacklisted": [],
131+
"configKeyValues": config_key_groups
132+
}
133+
}
134+
}
135+
136+
137+
def parse_configKeyValues_block(raw_value):
138+
result = defaultdict(dict)
139+
seen_groups = set()
140+
raw_value = raw_value.replace('\\"', '"').replace("\\'", "'")
141+
parts = raw_value.split(";")
142+
for part in parts:
143+
part = part.strip()
144+
if not part or "=" not in part:
145+
continue
146+
key, val = part.split("=", 1)
147+
key = key.strip()
148+
val = val.strip()
149+
if "." in key:
150+
group, subkey = key.split(".", 1)
151+
result[group][subkey] = val
152+
seen_groups.add(group)
153+
return dict(result), seen_groups
154+
155+
def log_line(logger, message):
156+
if logger is None or logger == sys.stdout:
157+
print(message)
158+
elif isinstance(logger, str):
159+
with open(logger, "a") as f:
160+
f.write(message + "\n")
161+
else:
162+
logger.write(message + "\n")
163+
164+
def modify_dpl_command(cmd_str, config_anchor, allow_overwrite=False, logger=None, configname=None):
165+
# check if cmd_str is given as list, in which case we transfrom to string
166+
if isinstance(cmd_str, list) == True:
167+
cmd_str = " ".join(filter(None, cmd_str))
168+
169+
base = parse_command_string(cmd_str)
170+
exe = base["executable"]
171+
existing_opts = base["options"]
172+
existing_kv = base["configKeyValues"]
173+
174+
# Start building new command
175+
new_cmd = [exe]
176+
added = []
177+
overwritten = []
178+
179+
exe_basename = os.path.basename(exe) if configname == None else configname
180+
anchor_exec = None
181+
if "Executables" in config_anchor:
182+
anchor_exec = config_anchor["Executables"].get(exe_basename, None)
183+
if anchor_exec == None:
184+
# try without the Executable section
185+
anchor_exec = config_anchor.get(exe_basename, None)
186+
187+
if anchor_exec == None:
188+
print(f"[WARN] No anchor config found for {exe}")
189+
return cmd_str
190+
191+
anchor_opts = anchor_exec.get("filtered", {})
192+
anchor_kv_groups = anchor_exec.get("configKeyValues", [])
193+
194+
# --- Step 1: Reconstruct executable and its CLI options
195+
new_cmd = [exe]
196+
added = []
197+
overwritten = []
198+
199+
def quote_if_needed(val):
200+
s = str(val)
201+
if " " in s and not (s.startswith('"') and s.endswith('"')):
202+
return f'"{s}"'
203+
return s
204+
205+
# Step 1: Existing options (preserved or overwritten)
206+
for key, val in existing_opts.items():
207+
if allow_overwrite and key in anchor_opts:
208+
val = anchor_opts[key]
209+
overwritten.append(key)
210+
new_cmd.append(f"{key} {quote_if_needed(val)}" if val is not True else f"{key}")
211+
212+
# Step 2: Add missing options from anchor
213+
for key, val in anchor_opts.items():
214+
if key not in existing_opts:
215+
new_cmd.append(f"{key} {quote_if_needed(val)}" if val is not True else f"{key}")
216+
added.append(key)
217+
218+
# what about config-key values (should already be done) Merge configKeyValues
219+
merged_kv = deepcopy(existing_kv)
220+
# for group in anchor_kv_groups:
221+
# group_kvs = config_anchor.get("ConfigParams", {}).get(group, {})
222+
# if group not in merged_kv:
223+
# merged_kv[group] = group_kvs
224+
# elif allow_overwrite:
225+
# merged_kv[group].update(group_kvs)
226+
227+
if merged_kv:
228+
kv_flat = [f"{g}.{k}={v}" for g, kv in merged_kv.items() for k, v in kv.items()]
229+
new_cmd.append('--configKeyValues "' + ";".join(kv_flat) + '"')
230+
231+
# log changes
232+
log_line(logger, f"\n[Executable: {exe}]")
233+
if added:
234+
log_line(logger, f" Added options: {added}")
235+
if overwritten:
236+
log_line(logger, f" Overwritten options: {overwritten}")
237+
if not added and not overwritten:
238+
log_line(logger, f" No changes made to command.")
239+
240+
return " ".join(new_cmd)
241+
242+
# CLI: Parse log + blacklist into output.json
243+
def parse_configKeyValues(raw_value):
244+
return parse_configKeyValues_block(raw_value)
245+
246+
def parse_workflow_config(log_path):
247+
config_params = defaultdict(dict)
248+
executables = {}
249+
250+
with open(log_path) as f:
251+
for line in f:
252+
line = line.strip()
253+
if not line or line.startswith("#"):
254+
continue
255+
256+
parsed = parse_command_string(line)
257+
exe = parsed["executable"]
258+
config_groups_used = parsed["configKeyGroups"]
259+
full_opts = parsed["options"]
260+
261+
for group, kv in parsed["configKeyValues"].items():
262+
config_params[group].update(kv)
263+
264+
executables[exe] = {
265+
"configKeyValues": sorted(config_groups_used),
266+
"full": full_opts
267+
}
268+
269+
return config_params, executables
270+
271+
272+
def apply_blacklist(executables, blacklist_cfg):
273+
for exe, data in executables.items():
274+
full_opts = data["full"]
275+
exe_blacklist = set(blacklist_cfg.get(exe, []))
276+
total_blacklist = BUILTIN_BLACKLIST.union(exe_blacklist)
277+
278+
blacklisted = []
279+
filtered = {}
280+
281+
for key, val in full_opts.items():
282+
if key in total_blacklist:
283+
blacklisted.append(key)
284+
else:
285+
filtered[key] = val
286+
287+
data["blacklisted"] = sorted(blacklisted)
288+
data["filtered"] = filtered
289+
data["full"] = deepcopy(full_opts) # keep original
290+
return executables
291+
292+
def dpl_option_from_config(config, dpl_workflow, key, section = "filtered", default_value = None):
293+
"""
294+
Utility to extract a DPL option for workflow dpl_workflow from
295+
the configuration dict "config". The configuration is:
296+
- either a flattish JSON produced by older tool parse-async-WorkflowConfig.py
297+
- more structured version produced by o2dpg_dpl_config_tools (this tool)
298+
"""
299+
if "Executables" in config:
300+
# new standard
301+
return config["Executables"].get(dpl_workflow,{}).get(section,{}).get(key, default_value)
302+
else:
303+
# backward compatible versions
304+
dpl_workflow_key = dpl_workflow + '-options'
305+
if dpl_workflow_key in config:
306+
return config.get(dpl_workflow_key, {}).get(key, default_value)
307+
dpl_workflow_key = dpl_workflow_key
308+
if dpl_workflow_key in config:
309+
return config.get(dpl_workflow_key, {}).get(key, default_value)
310+
return default_value
311+
312+
def main():
313+
if len(sys.argv) == 4:
314+
log_path = sys.argv[1]
315+
blacklist_path = sys.argv[2]
316+
output_path = sys.argv[3]
317+
318+
with open(blacklist_path) as f:
319+
blacklist_data = json.load(f)
320+
321+
config_params, executables = parse_workflow_config(log_path)
322+
executables = apply_blacklist(executables, blacklist_data)
323+
324+
result = {
325+
"ConfigParams": dict(config_params),
326+
"Executables": executables
327+
}
328+
329+
with open(output_path, "w") as out:
330+
json.dump(result, out, indent=2)
331+
332+
print(f"[INFO] Wrote structured config to: {output_path}")
333+
else:
334+
print("Usage:")
335+
print(" CLI parsing: python3 dpl_config_tools.py workflowconfig.log blacklist.json output.json")
336+
print(" Python usage: import and call parse_command_string() or modify_dpl_command()")
337+
338+
339+
class TaskFinalizer:
340+
def __init__(self, anchor_config, allow_overwrite=False, logger=None):
341+
self.anchor_config = anchor_config
342+
self.allow_overwrite = allow_overwrite
343+
self.logger = logger
344+
self.final_config = {
345+
"ConfigParams": {},
346+
"Executables": {}
347+
}
348+
349+
def __call__(self, cmd_str_or_list, configname = None):
350+
final_cmd = modify_dpl_command(cmd_str_or_list, self.anchor_config.get("Executables",{}), logger=self.logger, configname=configname)
351+
this_config_final = parse_command_string_symmetric(final_cmd, configname)
352+
print (this_config_final)
353+
merge_dicts (self.final_config, this_config_final)
354+
return final_cmd
355+
356+
def dump_collected_config(self, path):
357+
with open(path, "w") as f:
358+
json.dump(self.final_config, f, indent=2)
359+
360+
361+
if __name__ == "__main__":
362+
main()

0 commit comments

Comments
 (0)