Skip to content

Commit 039c587

Browse files
committed
Improvements for MCProdInfo; MCProdInfo harvester tool
A tool which can extract MCProdInfo from logs on the GRID. Can be used to push MCProdInfo info retroactively to the CCDB. Slight extension of MCProdInfo to contain software tags used.
1 parent fef755c commit 039c587

File tree

3 files changed

+201
-9
lines changed

3 files changed

+201
-9
lines changed

MC/bin/o2dpg_sim_workflow_anchored.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
raise EnvironmentError("O2DPG_ROOT is not set in the environment.")
2424
mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo"))
2525
sys.path.append(mc_prodinfo_path)
26-
from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo
26+
from mcprodinfo_ccdb_upload import MCProdInfo, publish_MCProdInfo
2727
import dataclasses
2828

2929
# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)
@@ -450,14 +450,6 @@ def parse_file(filename):
450450
print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded")
451451
return excluded
452452

453-
def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
454-
print("Publishing MCProdInfo")
455-
456-
# see if this already has meta-data uploaded, otherwise do nothing
457-
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
458-
if mc_prod_info_q == None:
459-
# could make this depend on hash values in future
460-
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))
461453

462454

463455
def main():

MC/prodinfo/mcprodinfo_ccdb_upload.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class MCProdInfo:
2020
OrbitsPerTF: int
2121
# max_events_per_tf: Optional[int] = -1
2222
Comment: Optional[str] = None
23+
McTag: Optional[str] = None # main software tag used
24+
RecoTag: Optional[str] = None # RecoTag (if any)
2325
Hash: Optional[str] = field(default=None)
2426

2527
def __post_init__(self):
@@ -142,3 +144,12 @@ def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_
142144
os.remove(empty_file)
143145

144146
return response
147+
148+
def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
149+
print("Publishing MCProdInfo")
150+
151+
# see if this already has meta-data uploaded, otherwise do nothing
152+
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
153+
if mc_prod_info_q == None:
154+
# could make this depend on hash values in future
155+
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
#!/usr/bin/env python3
2+
3+
# A python tool, that fills CCDB MCProdInfo by
4+
# harvesting the data from production log files from the GRID.
5+
# This is useful, when the information was not directly filled
6+
# by the MC job itself.
7+
8+
import json
9+
import subprocess
10+
import sys
11+
from collections import defaultdict
12+
from zipfile import ZipFile
13+
import re
14+
import os
15+
import argparse
16+
17+
from mcprodinfo_ccdb_upload import MCProdInfo, publish_MCProdInfo
18+
19+
20+
def alien_find(path, pattern="*"):
21+
cmd = ["alien.py", "find", path, pattern]
22+
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
23+
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
24+
25+
26+
def alien_cp(alien_path, local_path):
27+
cmd = ["alien.py", "cp", f"alien://{alien_path}", f"file://{local_path}"]
28+
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
29+
30+
31+
def parse_workflow_path(path, prod_tag):
32+
parts = path.strip("/").split("/")
33+
try:
34+
idx = parts.index(prod_tag)
35+
except ValueError:
36+
return None
37+
38+
after = parts[idx + 1 :]
39+
if len(after) < 2:
40+
return None
41+
42+
if after[0].isdigit() and len(after[0]) == 1:
43+
cycle = int(after[0])
44+
run_number = int(after[1])
45+
split = after[2]
46+
else:
47+
cycle = None
48+
run_number = int(after[0])
49+
split = after[1]
50+
51+
return cycle, run_number, split
52+
53+
54+
def extract_from_zip(local_zip_path):
55+
"""Extract workflow.json and stdout from log_archive.zip."""
56+
wf_data = None
57+
env_vars = {}
58+
try:
59+
with ZipFile(local_zip_path, "r") as zf:
60+
# workflow.json
61+
if "workflow.json" in zf.namelist():
62+
with zf.open("workflow.json") as wf_file:
63+
wf_data = json.load(wf_file)
64+
65+
# stdout (could be named stdout or stdout.log)
66+
candidates = [n for n in zf.namelist() if n.startswith("stdout")]
67+
if candidates:
68+
with zf.open(candidates[0]) as so:
69+
text = so.read().decode(errors="ignore")
70+
for key in [
71+
"ALIEN_JDL_PACKAGES",
72+
"ALIEN_JDL_O2DPG_ASYNC_RECO_TAG",
73+
"ALIEN_MASTERJOB",
74+
]:
75+
m = re.search(rf"{key}=(.*)", text)
76+
if m:
77+
env_vars[key] = m.group(1).strip()
78+
except Exception as e:
79+
print(f"⚠️ Failed to extract from {local_zip_path}: {e}")
80+
return wf_data, env_vars
81+
82+
83+
def build_info(prod_tag, run_number, wf_data, env_vars):
84+
meta=wf_data.get("meta")
85+
if meta != None:
86+
int_rate = meta.get("interactionRate")
87+
col_system = meta.get("col")
88+
orbits_per_tf = meta.get("orbitsPerTF")
89+
90+
return MCProdInfo(
91+
LPMProductionTag=prod_tag,
92+
Col=col_system,
93+
IntRate=int_rate,
94+
RunNumber=run_number,
95+
OrbitsPerTF=orbits_per_tf,
96+
McTag=env_vars.get("ALIEN_JDL_PACKAGES"),
97+
RecoTag=env_vars.get("ALIEN_JDL_O2DPG_ASYNC_RECO_TAG")
98+
)
99+
100+
101+
def pick_split(prod_tag, run_number, candidates, ascending=True):
102+
"""Pick the first valid split (min if ascending, max if not)."""
103+
def split_key(entry):
104+
_, split, _ = entry
105+
try:
106+
return int(split)
107+
except ValueError:
108+
return float("inf")
109+
110+
candidates_sorted = sorted(candidates, key=split_key, reverse=not ascending)
111+
112+
for cycle, split, zip_path in candidates_sorted:
113+
print (f"Trying to analyse {zip_path}")
114+
local_zip = f"/tmp/log_archive_{run_number}_{cycle or 0}_{split}.zip"
115+
try:
116+
alien_cp(zip_path, local_zip)
117+
except subprocess.CalledProcessError:
118+
continue
119+
120+
wf_data, env_vars = extract_from_zip(local_zip)
121+
122+
try:
123+
os.remove(local_zip) # cleanup downloaded file
124+
except OSError:
125+
pass
126+
127+
if wf_data:
128+
info = build_info(prod_tag, run_number, wf_data, env_vars)
129+
return info, cycle, split, zip_path
130+
print (f"Failed")
131+
132+
return None, None, None, None
133+
134+
135+
def process_prod_tag(prod_tag, year="2025", ccdb_url=None, username=None):
136+
base_path = f"/alice/sim/{year}/{prod_tag}"
137+
138+
# Step 1: find all log_archive.zip files
139+
print (f"Querying AliEn for all directories with zip files")
140+
zip_files = alien_find(base_path, "log_archive.zip")
141+
142+
# exclude some unnecessary paths
143+
zip_files = [
144+
zf for zf in zip_files
145+
if "/AOD/" not in zf and "/QC/" not in zf and "/TimeseriesTPCmerging/" not in zf and "/Stage" not in zf
146+
]
147+
148+
# Step 2: group by run_number
149+
runs = defaultdict(list)
150+
for zf in zip_files:
151+
parsed = parse_workflow_path(zf, prod_tag)
152+
if parsed is None:
153+
continue
154+
cycle, run_number, split = parsed
155+
runs[run_number].append((cycle, split, zf))
156+
157+
print(f"Found {len(runs)} run numbers")
158+
159+
# Step 3: for each run_number, handle smallest and largest valid split
160+
for run_number, candidates in sorted(runs.items()):
161+
print (f"Analysing run {run_number}")
162+
info_min, cycle_min, split_min, _ = pick_split(prod_tag, run_number, candidates, ascending=True)
163+
info_max, cycle_max, split_max, _ = pick_split(prod_tag, run_number, candidates, ascending=False)
164+
165+
# some consistency checks
166+
if info_min and info_max:
167+
if info_min.Col != info_max.Col:
168+
print(f"❌ ColSystem mismatch for run {run_number}")
169+
if info_min.OrbitsPerTF != info_max.OrbitsPerTF:
170+
print(f"❌ OrbitsPerTF mismatch for run {run_number}")
171+
172+
publish_MCProdInfo(info_min, username=username, ccdb_url=ccdb_url)
173+
print (info_min)
174+
175+
176+
def main():
177+
parser = argparse.ArgumentParser(
178+
description="Harvest MC production metadata from AlienGRID and publish to CCDB"
179+
)
180+
parser.add_argument("--prod_tag", required=True, help="Production tag (e.g. prod2025a)")
181+
parser.add_argument("--ccdb", required=False, default="https://alice-ccdb.cern.ch", help="CCDB server URL")
182+
parser.add_argument("--username", required=False, help="GRID username (needs appropriate AliEn token initialized)")
183+
parser.add_argument("--year", default="2025", help="Production year (default: 2025)")
184+
args = parser.parse_args()
185+
186+
process_prod_tag(args.prod_tag, year=args.year, ccdb_url=args.ccdb, username=args.username)
187+
188+
if __name__ == "__main__":
189+
main()

0 commit comments

Comments
 (0)