Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions MC/bin/o2dpg_sim_workflow_anchored.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
raise EnvironmentError("O2DPG_ROOT is not set in the environment.")
mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo"))
sys.path.append(mc_prodinfo_path)
from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo
from mcprodinfo_ccdb_upload import MCProdInfo, publish_MCProdInfo
import dataclasses

# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)
Expand Down Expand Up @@ -450,14 +450,6 @@ def parse_file(filename):
print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded")
return excluded

def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
print("Publishing MCProdInfo")

# see if this already has meta-data uploaded, otherwise do nothing
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
if mc_prod_info_q == None:
# could make this depend on hash values in future
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))


def main():
Expand Down
11 changes: 11 additions & 0 deletions MC/prodinfo/mcprodinfo_ccdb_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class MCProdInfo:
OrbitsPerTF: int
# max_events_per_tf: Optional[int] = -1
Comment: Optional[str] = None
McTag: Optional[str] = None # main software tag used
RecoTag: Optional[str] = None # RecoTag (if any)
Hash: Optional[str] = field(default=None)

def __post_init__(self):
Expand Down Expand Up @@ -142,3 +144,12 @@ def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_
os.remove(empty_file)

return response

def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
print("Publishing MCProdInfo")

# see if this already has meta-data uploaded, otherwise do nothing
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
if mc_prod_info_q == None:
# could make this depend on hash values in future
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))
189 changes: 189 additions & 0 deletions MC/prodinfo/mcprodinfo_harvester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python3

# A python tool, that fills CCDB MCProdInfo by
# harvesting the data from production log files from the GRID.
# This is useful, when the information was not directly filled
# by the MC job itself.

import json
import subprocess
import sys
from collections import defaultdict
from zipfile import ZipFile
import re
import os
import argparse

from mcprodinfo_ccdb_upload import MCProdInfo, publish_MCProdInfo


def alien_find(path, pattern="*"):
cmd = ["alien.py", "find", path, pattern]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return [line.strip() for line in result.stdout.splitlines() if line.strip()]


def alien_cp(alien_path, local_path):
cmd = ["alien.py", "cp", f"alien://{alien_path}", f"file://{local_path}"]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)


def parse_workflow_path(path, prod_tag):
parts = path.strip("/").split("/")
try:
idx = parts.index(prod_tag)
except ValueError:
return None

after = parts[idx + 1 :]
if len(after) < 2:
return None

if after[0].isdigit() and len(after[0]) == 1:
cycle = int(after[0])
run_number = int(after[1])
split = after[2]
else:
cycle = None
run_number = int(after[0])
split = after[1]

return cycle, run_number, split


def extract_from_zip(local_zip_path):
"""Extract workflow.json and stdout from log_archive.zip."""
wf_data = None
env_vars = {}
try:
with ZipFile(local_zip_path, "r") as zf:
# workflow.json
if "workflow.json" in zf.namelist():
with zf.open("workflow.json") as wf_file:
wf_data = json.load(wf_file)

# stdout (could be named stdout or stdout.log)
candidates = [n for n in zf.namelist() if n.startswith("stdout")]
if candidates:
with zf.open(candidates[0]) as so:
text = so.read().decode(errors="ignore")
for key in [
"ALIEN_JDL_PACKAGES",
"ALIEN_JDL_O2DPG_ASYNC_RECO_TAG",
"ALIEN_MASTERJOB",
]:
m = re.search(rf"{key}=(.*)", text)
if m:
env_vars[key] = m.group(1).strip()
except Exception as e:
print(f"⚠️ Failed to extract from {local_zip_path}: {e}")
return wf_data, env_vars


def build_info(prod_tag, run_number, wf_data, env_vars):
meta=wf_data.get("meta")
if meta != None:
int_rate = meta.get("interactionRate")
col_system = meta.get("col")
orbits_per_tf = meta.get("orbitsPerTF")

return MCProdInfo(
LPMProductionTag=prod_tag,
Col=col_system,
IntRate=int_rate,
RunNumber=run_number,
OrbitsPerTF=orbits_per_tf,
McTag=env_vars.get("ALIEN_JDL_PACKAGES"),
RecoTag=env_vars.get("ALIEN_JDL_O2DPG_ASYNC_RECO_TAG")
)


def pick_split(prod_tag, run_number, candidates, ascending=True):
"""Pick the first valid split (min if ascending, max if not)."""
def split_key(entry):
_, split, _ = entry
try:
return int(split)
except ValueError:
return float("inf")

candidates_sorted = sorted(candidates, key=split_key, reverse=not ascending)

for cycle, split, zip_path in candidates_sorted:
print (f"Trying to analyse {zip_path}")
local_zip = f"/tmp/log_archive_{run_number}_{cycle or 0}_{split}.zip"
try:
alien_cp(zip_path, local_zip)
except subprocess.CalledProcessError:
continue

wf_data, env_vars = extract_from_zip(local_zip)

try:
os.remove(local_zip) # cleanup downloaded file
except OSError:
pass

if wf_data:
info = build_info(prod_tag, run_number, wf_data, env_vars)
return info, cycle, split, zip_path
print (f"Failed")

return None, None, None, None


def process_prod_tag(prod_tag, year="2025", ccdb_url=None, username=None):
base_path = f"/alice/sim/{year}/{prod_tag}"

# Step 1: find all log_archive.zip files
print (f"Querying AliEn for all directories with zip files")
zip_files = alien_find(base_path, "log_archive.zip")

# exclude some unnecessary paths
zip_files = [
zf for zf in zip_files
if "/AOD/" not in zf and "/QC/" not in zf and "/TimeseriesTPCmerging/" not in zf and "/Stage" not in zf
]

# Step 2: group by run_number
runs = defaultdict(list)
for zf in zip_files:
parsed = parse_workflow_path(zf, prod_tag)
if parsed is None:
continue
cycle, run_number, split = parsed
runs[run_number].append((cycle, split, zf))

print(f"Found {len(runs)} run numbers")

# Step 3: for each run_number, handle smallest and largest valid split
for run_number, candidates in sorted(runs.items()):
print (f"Analysing run {run_number}")
info_min, cycle_min, split_min, _ = pick_split(prod_tag, run_number, candidates, ascending=True)
info_max, cycle_max, split_max, _ = pick_split(prod_tag, run_number, candidates, ascending=False)

# some consistency checks
if info_min and info_max:
if info_min.Col != info_max.Col:
print(f"❌ ColSystem mismatch for run {run_number}")
if info_min.OrbitsPerTF != info_max.OrbitsPerTF:
print(f"❌ OrbitsPerTF mismatch for run {run_number}")

publish_MCProdInfo(info_min, username=username, ccdb_url=ccdb_url)
print (info_min)


def main():
parser = argparse.ArgumentParser(
description="Harvest MC production metadata from AlienGRID and publish to CCDB"
)
parser.add_argument("--prod_tag", required=True, help="Production tag (e.g. prod2025a)")
parser.add_argument("--ccdb", required=False, default="https://alice-ccdb.cern.ch", help="CCDB server URL")
parser.add_argument("--username", required=False, help="GRID username (needs appropriate AliEn token initialized)")
parser.add_argument("--year", default="2025", help="Production year (default: 2025)")
args = parser.parse_args()

process_prod_tag(args.prod_tag, year=args.year, ccdb_url=args.ccdb, username=args.username)

if __name__ == "__main__":
main()