Skip to content

Commit 23cf1ea

Browse files
committed
Script to extract/upload BcTVX from EvtSelQA
1 parent c81c648 commit 23cf1ea

File tree

1 file changed

+312
-0
lines changed

1 file changed

+312
-0
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
#!/usr/bin/env python3
2+
"""
3+
PyROOT pipeline to:
4+
- parse file paths like ./LHC25as/cpass0/568664/AnalysisResults.root
5+
- choose highest-priority pass per run
6+
- extract histogram event-selection-qa-task/hBcTVX
7+
- hash histogram to prevent duplicates
8+
- write upload ROOT file
9+
- upload using o2-ccdb-upload (or optionally call o2::CcdbApi from C++)
10+
11+
12+
Usage:
13+
python3 upload_pipeline.py --file-list files.txt
14+
OR
15+
python3 upload_pipeline.py ./LHC25as/*/*/*/AnalysisResults.root
16+
"""
17+
18+
import os
19+
import sys
20+
import json
21+
import argparse
22+
import hashlib
23+
import tempfile
24+
import subprocess
25+
from collections import defaultdict
26+
27+
# PyROOT import
28+
import ROOT
29+
from ROOT import o2 # for O2 access
30+
31+
# -------- user config ----------
32+
# priority: earlier in list -> higher priority
33+
PASS_PRIORITY = ["apass6", "apass5", "apass4", "apass3", "apass2", "apass1", "cpass0"]
34+
35+
# path inside AnalysisResults.root to histogram
36+
HIST_PATH = "event-selection-qa-task/hBcTVX"
37+
38+
# Local JSON file storing processed histogram hashes to avoid duplicates
39+
PROCESSED_HASH_DB = "processed_hashes.json"
40+
41+
def getRunInformation(runnumber):
42+
runInfo = o2.parameters.AggregatedRunInfo.buildAggregatedRunInfo(o2.ccdb.BasicCCDBManager.instance(), runnumber)
43+
return {"SOR" : runInfo.sor,
44+
"EOR" : runInfo.eor}
45+
46+
47+
def make_ccdb_upload_command(localfile, passname, runnumber, sor, eor, key="ccdb_object"):
48+
l = [
49+
"o2-ccdb-upload",
50+
"--host", "http://ccdb-test.cern.ch:8080", # <-- adapt to your CCDB server
51+
"--path", "GLO/CALIB/EVSELQA/HBCTVX", # will be filled per-run
52+
"--file", f"{localfile}", # will be replaced with filename
53+
"-k", f"{key}",
54+
"-m", f"run_number={runnumber};pass={passname}", # no extra quotes here (only needed on shell)
55+
"--starttimestamp", f"{sor}",
56+
"--endtimestamp", f"{eor}",
57+
]
58+
return l # " ".join(l)
59+
60+
# -------------------------------
61+
def load_processed_db(path):
62+
if os.path.exists(path):
63+
with open(path, "r") as f:
64+
return json.load(f)
65+
else:
66+
return {"hashes": []}
67+
68+
69+
def save_processed_db(path, db):
70+
with open(path, "w") as f:
71+
json.dump(db, f, indent=2)
72+
73+
74+
def parse_path_meta(filepath):
75+
"""
76+
Find a pattern */<period>/<pass>/<run>/AnalysisResults.root anywhere in the path.
77+
Returns {period, pass, run}.
78+
79+
Example accepted paths:
80+
./LHC25as/cpass0/568664/AnalysisResults.root
81+
/tmp/foo/2023/LHC23zzh/cpass0/544095/AnalysisResults.root
82+
"""
83+
p = os.path.normpath(filepath)
84+
parts = p.split(os.sep)
85+
86+
# Find the index of AnalysisResults.root
87+
try:
88+
idx = parts.index("AnalysisResults.root")
89+
except ValueError:
90+
# maybe something like analysisresults.root? Lowercase?
91+
# Try case-insensitive fallback
92+
idx = None
93+
for i, comp in enumerate(parts):
94+
if comp.lower() == "analysisresults.root":
95+
idx = i
96+
break
97+
if idx is None:
98+
raise ValueError(f"File does not contain AnalysisResults.root: {filepath}")
99+
100+
# Need at least 3 dirs before it: period, pass, run
101+
if idx < 3:
102+
raise ValueError(f"Cannot extract period/pass/run from short path: {filepath}")
103+
104+
run = parts[idx-1]
105+
passname = parts[idx-2]
106+
period = parts[idx-3]
107+
108+
# Optional sanity checks
109+
if not run.isdigit():
110+
raise ValueError(f"Run number is not numeric: '{run}' in path {filepath}")
111+
112+
return {"period": period, "pass": passname, "run": run}
113+
114+
115+
def pass_priority_rank(pass_name):
116+
try:
117+
return PASS_PRIORITY.index(pass_name)
118+
except ValueError:
119+
# unknown pass name -> low priority (append at end)
120+
return len(PASS_PRIORITY)
121+
122+
123+
def pick_best_pass_file(files_for_run):
124+
"""
125+
files_for_run: list of dicts with keys {pass, path, period}
126+
returns the dict for the chosen file (highest priority)
127+
"""
128+
# sort by priority (lower index -> higher preference)
129+
files_sorted = sorted(files_for_run, key=lambda x: pass_priority_rank(x["pass"]))
130+
return files_sorted[0] if files_sorted else None
131+
132+
133+
def histogram_hash(hist):
134+
"""
135+
Deterministic hash of a TH1* content:
136+
- axis nbins, xmin, xmax
137+
- bin contents + bin errors
138+
Returns hex sha256 string.
139+
"""
140+
h = hist
141+
nbins = h.GetNbinsX()
142+
xmin = h.GetXaxis().GetXmin()
143+
xmax = h.GetXaxis().GetXmax()
144+
# collect values
145+
m = hashlib.sha256()
146+
m.update(f"{nbins}|{xmin}|{xmax}|{h.GetName()}|{h.GetTitle()}".encode("utf-8"))
147+
for b in range(0, nbins + 2): # include under/overflow
148+
c = float(h.GetBinContent(b))
149+
e = float(h.GetBinError(b))
150+
m.update(f"{b}:{c:.17g}:{e:.17g};".encode("utf-8"))
151+
return m.hexdigest()
152+
153+
154+
def extract_histogram_from_file(root_path, hist_path):
155+
"""
156+
Returns a clone of the TH1 found at hist_path or raises on error.
157+
"""
158+
f = ROOT.TFile.Open(root_path, "READ")
159+
if not f or f.IsZombie():
160+
raise IOError(f"Cannot open file {root_path}")
161+
obj = f.Get(hist_path)
162+
if not obj:
163+
f.Close()
164+
raise KeyError(f"Histogram {hist_path} not found in {root_path}")
165+
if not isinstance(obj, ROOT.TH1):
166+
f.Close()
167+
raise TypeError(f"Object at {hist_path} is not a TH1 (found {type(obj)}) in {root_path}")
168+
# clone to decouple from file and then close file
169+
clone = obj.Clone(obj.GetName())
170+
clone.SetDirectory(0)
171+
f.Close()
172+
return clone
173+
174+
175+
def write_upload_root(hist, meta, outpath):
176+
"""
177+
Writes histogram and metadata (as a TObjString) into a new ROOT file for uploading.
178+
meta: dict of metadata (period, pass, run, runinfo, hash)
179+
"""
180+
f = ROOT.TFile(outpath, "RECREATE")
181+
f.cd()
182+
# set name to include run for clarity
183+
hist_copy = hist.Clone(hist.GetName())
184+
hist_copy.SetDirectory(f)
185+
hist_copy.Write()
186+
# write metadata as JSON inside TObjString
187+
json_meta = json.dumps(meta)
188+
sobj = ROOT.TObjString(json_meta)
189+
sobj.Write("metadata")
190+
f.Close()
191+
192+
193+
def upload_ccdb_via_cli(upload_file, ccdb_path, passname, runnumber, sor, eor):
194+
"""
195+
Call o2-ccdb-upload CLI with CCDB_UPLOAD_CMD template.
196+
Adjust template above for your environment if needed.
197+
"""
198+
cmd = make_ccdb_upload_command(upload_file, passname, runnumber, sor, eor, key="hBcTVX")
199+
print("Running upload command:", " ".join(cmd))
200+
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
201+
if res.returncode != 0:
202+
# raise RuntimeError(f"o2-ccdb-upload failed: {res.returncode}\nstdout:{res.stdout}\nstderr:{res.stderr}")
203+
print (f"o2-ccdb-upload failed: {res.returncode}\nstdout:{res.stdout}\nstderr:{res.stderr}")
204+
return False
205+
206+
print (f"o2-ccdb-upload succeeded: {res.returncode}\nstdout:{res.stdout}\nstderr:{res.stderr}")
207+
return True
208+
209+
210+
def main(argv):
211+
parser = argparse.ArgumentParser(description="Extract histogram from AnalysisResults.root and upload to CCDB")
212+
parser.add_argument("--file-list", help="Text file with one file path per line (or '-')", default=None)
213+
parser.add_argument("paths", nargs="*", help="globs or paths to AnalysisResults.root files")
214+
parser.add_argument("--skip-upload", action="store_true", help="Only create upload ROOT files, do not call o2-ccdb-upload")
215+
parser.add_argument("--out-dir", default="ccdb_uploads", help="Where to put temporary upload ROOT files")
216+
parser.add_argument("--processed-db", default=PROCESSED_HASH_DB, help="JSON file to keep processed-hashes")
217+
parser.add_argument("--ccdb-base-path", default="/calibration/hBcTVX", help="Base path inside CCDB where to upload")
218+
args = parser.parse_args(argv)
219+
220+
# collect files
221+
file_paths = []
222+
if args.file_list:
223+
if args.file_list == "-":
224+
lines = sys.stdin.read().splitlines()
225+
else:
226+
with open(args.file_list, "r") as f:
227+
lines = [ln.strip() for ln in f if ln.strip()]
228+
file_paths.extend(lines)
229+
if args.paths:
230+
# expand globs
231+
import glob
232+
for p in args.paths:
233+
file_paths.extend(sorted(glob.glob(p)))
234+
if not file_paths:
235+
print("No files provided. Exiting.")
236+
return 1
237+
238+
# build per-run grouping
239+
runs = defaultdict(list)
240+
for p in file_paths:
241+
try:
242+
meta = parse_path_meta(p)
243+
except Exception as e:
244+
print(f"Skipping {p}: cannot parse path: {e}")
245+
continue
246+
runs[meta["run"]].append({"path": p, "pass": meta["pass"], "period": meta["period"]})
247+
248+
# load processed DB
249+
db = load_processed_db(args.processed_db)
250+
processed_hashes = set(db.get("hashes", []))
251+
252+
os.makedirs(args.out_dir, exist_ok=True)
253+
254+
for run, filelist in runs.items():
255+
selected = pick_best_pass_file(filelist)
256+
if not selected:
257+
print(f"No candidate for run {run}, skipping.")
258+
continue
259+
path = selected["path"]
260+
period = selected["period"]
261+
pass_name = selected["pass"]
262+
print(f"Selected for run {run}: {path} (period={period}, pass={pass_name})")
263+
264+
try:
265+
hist = extract_histogram_from_file(path, HIST_PATH)
266+
except Exception as e:
267+
print(f"Failed to extract histogram from {path}: {e}")
268+
continue
269+
270+
# compute hash
271+
hsh = histogram_hash(hist)
272+
if hsh in processed_hashes:
273+
print(f"Histogram hash {hsh} for run {run} already processed -> skipping upload.")
274+
continue
275+
276+
# get run information
277+
runinfo = getRunInformation(int(run))
278+
279+
# prepare metadata
280+
meta = {
281+
"period": period,
282+
"pass": pass_name,
283+
"run": run,
284+
"runinfo": runinfo,
285+
"hist_name": hist.GetName(),
286+
"hist_title": hist.GetTitle(),
287+
"hash": hsh
288+
}
289+
290+
# write temporary upload file
291+
out_fname = os.path.join(args.out_dir, f"upload_{period}_{pass_name}_{run}.root")
292+
write_upload_root(hist, meta, out_fname)
293+
print(f"Wrote upload file: {out_fname}")
294+
295+
# perform upload
296+
if not args.skip_upload:
297+
# build ccdb path (customize to your conventions)
298+
ccdb_path = os.path.join(args.ccdb_base_path, period, pass_name, run)
299+
upload_ccdb_via_cli(out_fname, ccdb_path, pass_name, run, runinfo["SOR"], runinfo["EOR"])
300+
301+
# mark as processed (only after successful upload or skip-upload)
302+
processed_hashes.add(hsh)
303+
db["hashes"] = list(processed_hashes)
304+
save_processed_db(args.processed_db, db)
305+
print(f"Marked hash {hsh} as processed.")
306+
307+
print("Done.")
308+
return 0
309+
310+
311+
if __name__ == "__main__":
312+
sys.exit(main(sys.argv[1:]))

0 commit comments

Comments
 (0)