|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +# A python tool, that fills CCDB MCProdInfo by |
| 4 | +# harvesting the data from production log files from the GRID. |
| 5 | +# This is useful, when the information was not directly filled |
| 6 | +# by the MC job itself. |
| 7 | + |
| 8 | +import json |
| 9 | +import subprocess |
| 10 | +import sys |
| 11 | +from collections import defaultdict |
| 12 | +from zipfile import ZipFile |
| 13 | +import re |
| 14 | +import os |
| 15 | +import argparse |
| 16 | + |
| 17 | +from mcprodinfo_ccdb_upload import MCProdInfo, publish_MCProdInfo |
| 18 | + |
| 19 | + |
| 20 | +def alien_find(path, pattern="*"): |
| 21 | + cmd = ["alien.py", "find", path, pattern] |
| 22 | + result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| 23 | + return [line.strip() for line in result.stdout.splitlines() if line.strip()] |
| 24 | + |
| 25 | + |
| 26 | +def alien_cp(alien_path, local_path): |
| 27 | + cmd = ["alien.py", "cp", f"alien://{alien_path}", f"file://{local_path}"] |
| 28 | + subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) |
| 29 | + |
| 30 | + |
| 31 | +def parse_workflow_path(path, prod_tag): |
| 32 | + parts = path.strip("/").split("/") |
| 33 | + try: |
| 34 | + idx = parts.index(prod_tag) |
| 35 | + except ValueError: |
| 36 | + return None |
| 37 | + |
| 38 | + after = parts[idx + 1 :] |
| 39 | + if len(after) < 2: |
| 40 | + return None |
| 41 | + |
| 42 | + if after[0].isdigit() and len(after[0]) == 1: |
| 43 | + cycle = int(after[0]) |
| 44 | + run_number = int(after[1]) |
| 45 | + split = after[2] |
| 46 | + else: |
| 47 | + cycle = None |
| 48 | + run_number = int(after[0]) |
| 49 | + split = after[1] |
| 50 | + |
| 51 | + return cycle, run_number, split |
| 52 | + |
| 53 | + |
| 54 | +def extract_from_zip(local_zip_path): |
| 55 | + """Extract workflow.json and stdout from log_archive.zip.""" |
| 56 | + wf_data = None |
| 57 | + env_vars = {} |
| 58 | + try: |
| 59 | + with ZipFile(local_zip_path, "r") as zf: |
| 60 | + # workflow.json |
| 61 | + if "workflow.json" in zf.namelist(): |
| 62 | + with zf.open("workflow.json") as wf_file: |
| 63 | + wf_data = json.load(wf_file) |
| 64 | + |
| 65 | + # stdout (could be named stdout or stdout.log) |
| 66 | + candidates = [n for n in zf.namelist() if n.startswith("stdout")] |
| 67 | + if candidates: |
| 68 | + with zf.open(candidates[0]) as so: |
| 69 | + text = so.read().decode(errors="ignore") |
| 70 | + for key in [ |
| 71 | + "ALIEN_JDL_PACKAGES", |
| 72 | + "ALIEN_JDL_O2DPG_ASYNC_RECO_TAG", |
| 73 | + "ALIEN_MASTERJOB", |
| 74 | + ]: |
| 75 | + m = re.search(rf"{key}=(.*)", text) |
| 76 | + if m: |
| 77 | + env_vars[key] = m.group(1).strip() |
| 78 | + except Exception as e: |
| 79 | + print(f"⚠️ Failed to extract from {local_zip_path}: {e}") |
| 80 | + return wf_data, env_vars |
| 81 | + |
| 82 | + |
| 83 | +def build_info(prod_tag, run_number, wf_data, env_vars): |
| 84 | + meta=wf_data.get("meta") |
| 85 | + if meta != None: |
| 86 | + int_rate = meta.get("interactionRate") |
| 87 | + col_system = meta.get("col") |
| 88 | + orbits_per_tf = meta.get("orbitsPerTF") |
| 89 | + |
| 90 | + return MCProdInfo( |
| 91 | + LPMProductionTag=prod_tag, |
| 92 | + Col=col_system, |
| 93 | + IntRate=int_rate, |
| 94 | + RunNumber=run_number, |
| 95 | + OrbitsPerTF=orbits_per_tf, |
| 96 | + McTag=env_vars.get("ALIEN_JDL_PACKAGES"), |
| 97 | + RecoTag=env_vars.get("ALIEN_JDL_O2DPG_ASYNC_RECO_TAG") |
| 98 | + ) |
| 99 | + |
| 100 | + |
| 101 | +def pick_split(prod_tag, run_number, candidates, ascending=True): |
| 102 | + """Pick the first valid split (min if ascending, max if not).""" |
| 103 | + def split_key(entry): |
| 104 | + _, split, _ = entry |
| 105 | + try: |
| 106 | + return int(split) |
| 107 | + except ValueError: |
| 108 | + return float("inf") |
| 109 | + |
| 110 | + candidates_sorted = sorted(candidates, key=split_key, reverse=not ascending) |
| 111 | + |
| 112 | + for cycle, split, zip_path in candidates_sorted: |
| 113 | + print (f"Trying to analyse {zip_path}") |
| 114 | + local_zip = f"/tmp/log_archive_{run_number}_{cycle or 0}_{split}.zip" |
| 115 | + try: |
| 116 | + alien_cp(zip_path, local_zip) |
| 117 | + except subprocess.CalledProcessError: |
| 118 | + continue |
| 119 | + |
| 120 | + wf_data, env_vars = extract_from_zip(local_zip) |
| 121 | + |
| 122 | + try: |
| 123 | + os.remove(local_zip) # cleanup downloaded file |
| 124 | + except OSError: |
| 125 | + pass |
| 126 | + |
| 127 | + if wf_data: |
| 128 | + info = build_info(prod_tag, run_number, wf_data, env_vars) |
| 129 | + return info, cycle, split, zip_path |
| 130 | + print (f"Failed") |
| 131 | + |
| 132 | + return None, None, None, None |
| 133 | + |
| 134 | + |
| 135 | +def process_prod_tag(prod_tag, year="2025", ccdb_url=None, username=None): |
| 136 | + base_path = f"/alice/sim/{year}/{prod_tag}" |
| 137 | + |
| 138 | + # Step 1: find all log_archive.zip files |
| 139 | + print (f"Querying AliEn for all directories with zip files") |
| 140 | + zip_files = alien_find(base_path, "log_archive.zip") |
| 141 | + |
| 142 | + # exclude some unnecessary paths |
| 143 | + zip_files = [ |
| 144 | + zf for zf in zip_files |
| 145 | + if "/AOD/" not in zf and "/QC/" not in zf and "/TimeseriesTPCmerging/" not in zf and "/Stage" not in zf |
| 146 | + ] |
| 147 | + |
| 148 | + # Step 2: group by run_number |
| 149 | + runs = defaultdict(list) |
| 150 | + for zf in zip_files: |
| 151 | + parsed = parse_workflow_path(zf, prod_tag) |
| 152 | + if parsed is None: |
| 153 | + continue |
| 154 | + cycle, run_number, split = parsed |
| 155 | + runs[run_number].append((cycle, split, zf)) |
| 156 | + |
| 157 | + print(f"Found {len(runs)} run numbers") |
| 158 | + |
| 159 | + # Step 3: for each run_number, handle smallest and largest valid split |
| 160 | + for run_number, candidates in sorted(runs.items()): |
| 161 | + print (f"Analysing run {run_number}") |
| 162 | + info_min, cycle_min, split_min, _ = pick_split(prod_tag, run_number, candidates, ascending=True) |
| 163 | + info_max, cycle_max, split_max, _ = pick_split(prod_tag, run_number, candidates, ascending=False) |
| 164 | + |
| 165 | + # some consistency checks |
| 166 | + if info_min and info_max: |
| 167 | + if info_min.Col != info_max.Col: |
| 168 | + print(f"❌ ColSystem mismatch for run {run_number}") |
| 169 | + if info_min.OrbitsPerTF != info_max.OrbitsPerTF: |
| 170 | + print(f"❌ OrbitsPerTF mismatch for run {run_number}") |
| 171 | + |
| 172 | + publish_MCProdInfo(info_min, username=username, ccdb_url=ccdb_url) |
| 173 | + print (info_min) |
| 174 | + |
| 175 | + |
| 176 | +def main(): |
| 177 | + parser = argparse.ArgumentParser( |
| 178 | + description="Harvest MC production metadata from AlienGRID and publish to CCDB" |
| 179 | + ) |
| 180 | + parser.add_argument("--prod_tag", required=True, help="Production tag (e.g. prod2025a)") |
| 181 | + parser.add_argument("--ccdb", required=False, default="https://alice-ccdb.cern.ch", help="CCDB server URL") |
| 182 | + parser.add_argument("--username", required=False, help="GRID username (needs appropriate AliEn token initialized)") |
| 183 | + parser.add_argument("--year", default="2025", help="Production year (default: 2025)") |
| 184 | + args = parser.parse_args() |
| 185 | + |
| 186 | + process_prod_tag(args.prod_tag, year=args.year, ccdb_url=args.ccdb, username=args.username) |
| 187 | + |
| 188 | +if __name__ == "__main__": |
| 189 | + main() |
0 commit comments