Skip to content

Commit 60de896

Browse files
committed
Publish MCProdInfo
This commit provides code that publishes MonteCarlo MetaData describing a production. The information is stored into CCDB, into folders specific to the executing user as well as the LPM production tag. We now also write the username into AO2D.root, so that clients can retrieve the username next to the LPM production tag. By default upload of MCProdInfo is attempted for the first few JobIDs within a production. This is decided based on the split-id.
1 parent 6fe590d commit 60de896

File tree

5 files changed

+220
-3
lines changed

5 files changed

+220
-3
lines changed

MC/bin/o2dpg_sim_workflow.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg, option_if_available, overwrite_config
4646
from o2dpg_dpl_config_tools import parse_command_string, modify_dpl_command, dpl_option_from_config, TaskFinalizer
4747

48+
# for some JAliEn interaction
49+
from alienpy.alien import JAlien
50+
4851
parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow')
4952

5053
# the run-number of data taking or default if unanchored
@@ -1581,6 +1584,18 @@ def getDigiTaskName(det):
15811584

15821585
aod_df_id = '{0:03}'.format(tf)
15831586

1587+
import os
1588+
aod_creator = os.getenv("JALIEN_USER")
1589+
if aod_creator == None:
1590+
# we use JAliEn to determine the user and capture it's output into a variable via redirect_stdout
1591+
import io
1592+
from contextlib import redirect_stdout
1593+
f = io.StringIO()
1594+
with redirect_stdout(f):
1595+
if JAlien(['whoami']) == 0:
1596+
aod_creator = f.getvalue().strip()
1597+
print (f"Determined GRID username {aod_creator}")
1598+
15841599
AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1')
15851600
AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding]
15861601
AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; '
@@ -1596,6 +1611,7 @@ def getDigiTaskName(det):
15961611
"--lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}",
15971612
"--anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}",
15981613
"--anchor-prod ${ALIEN_JDL_LPMANCHORPRODUCTION:-unknown}",
1614+
f"--created-by {aod_creator}",
15991615
"--combine-source-devices" if not args.no_combine_dpl_devices else "",
16001616
"--disable-mc" if args.no_mc_labels else "",
16011617
"--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "",

MC/bin/o2dpg_sim_workflow_anchored.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
import subprocess
1818
import shlex
1919

20+
# hack to find the script for meta upload
21+
o2dpg_root = os.environ.get("O2DPG_ROOT")
22+
if o2dpg_root is None:
23+
raise EnvironmentError("O2DPG_ROOT is not set in the environment.")
24+
mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo"))
25+
sys.path.append(mc_prodinfo_path)
26+
from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo
27+
import dataclasses
28+
2029
# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)
2130

2231
# Example:
@@ -417,6 +426,16 @@ def parse_file(filename):
417426
print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded")
418427
return excluded
419428

429+
def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
430+
print("Publishing MCProdInfo")
431+
432+
# see if this already has meta-data uploaded, otherwise do nothing
433+
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
434+
if mc_prod_info_q == None:
435+
# could make this depend on hash values in future
436+
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))
437+
438+
420439
def main():
421440
parser = argparse.ArgumentParser(description='Creates an O2DPG simulation workflow, anchored to a given LHC run. The workflows are time anchored at regular positions within a run as a function of production size, split-id and cycle.')
422441

@@ -431,6 +450,7 @@ def main():
431450
parser.add_argument("--run-time-span-file", type=str, dest="run_span_file", help="Run-time-span-file for exclusions of timestamps (bad data periods etc.)", default="")
432451
parser.add_argument("--invert-irframe-selection", action='store_true', help="Inverts the logic of --run-time-span-file")
433452
parser.add_argument("--orbitsPerTF", type=str, help="Force a certain orbits-per-timeframe number; Automatically taken from CCDB if not given.", default="")
453+
parser.add_argument('--publish-mcprodinfo', action='store_true', default=False, help="Publish MCProdInfo metadata to CCDB")
434454
parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation
435455
args = parser.parse_args()
436456
print (args)
@@ -547,11 +567,28 @@ def main():
547567
else:
548568
print ("Creating time-anchored workflow...")
549569
print ("Executing: " + cmd)
550-
# os.system(cmd)
551570
try:
552571
cmd_list = shlex.split(os.path.expandvars(cmd))
553572
output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120)
554573
print (output)
574+
575+
# when we get here, we can publish info about the production (optionally)
576+
if args.publish_mcprodinfo == True or os.getenv("PUBLISH_MCPRODINFO") != None:
577+
prod_tag = os.getenv("ALIEN_JDL_LPMPRODUCTIONTAG")
578+
grid_user_name = os.getenv("JALIEN_USER")
579+
mcprod_ccdb_server = os.getenv("PUBLISH_MCPRODINFO_CCDBSERVER")
580+
if mcprod_ccdb_server == None:
581+
mcprod_ccdb_server = "https://alice-ccdb.cern.ch"
582+
if prod_tag != None and grid_user_name != None:
583+
info = MCProdInfo(LPMProductionTag = prod_tag,
584+
Col = ColSystem,
585+
IntRate =rate,
586+
RunNumber = args.run_number,
587+
OrbitsPerTF = GLOparams["OrbitsPerTF"])
588+
publish_MCProdInfo(info, username = grid_user_name, ccdb_url = mcprod_ccdb_server)
589+
else:
590+
print("No production tag or GRID user name known. Not publishing MCProdInfo")
591+
555592
except subprocess.CalledProcessError as e:
556593
print(f"Command failed with return code {e.returncode}")
557594
print("Output:")

MC/prodinfo/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
This directory contains scripts and function to collect, define and upload
2+
CCDB meta data objects for (official) MC productions.
3+
4+
This meta data can be queried in other stages, such as analysis, for the purpose of further data processing.
5+
6+
TODO:
7+
8+
- include cycle number in data
9+
- include software versions (2tag or not)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import json
2+
import os
3+
import requests
4+
import subprocess
5+
6+
import dataclasses # to define the MCProdInfo data layout and convert it to dict
7+
from dataclasses import dataclass, field, asdict, fields
8+
from typing import Optional
9+
import hashlib
10+
11+
@dataclass(frozen=True)
12+
class MCProdInfo:
13+
"""
14+
struct for MonteCarlo production info
15+
"""
16+
LPMProductionTag: str
17+
Col: int
18+
IntRate: float # only indicative of some interaction rate (could vary within the run)
19+
RunNumber: int
20+
OrbitsPerTF: int
21+
# max_events_per_tf: Optional[int] = -1
22+
Comment: Optional[str] = None
23+
Hash: Optional[str] = field(default=None)
24+
25+
def __post_init__(self):
26+
if self.Hash == None:
27+
# Hash only the meaningful fields
28+
data_to_hash = {
29+
k: v for k, v in asdict(self).items()
30+
if k != 'hash'
31+
}
32+
hash_str = hashlib.sha256(
33+
json.dumps(data_to_hash, sort_keys=True).encode()
34+
).hexdigest()
35+
object.__setattr__(self, 'hash', hash_str)
36+
37+
38+
import re
39+
40+
def extract_metadata_blocks_from_CCDB(text: str):
41+
blocks = []
42+
# Split on 'Metadata:\n' and iterate over each block
43+
sections = text.split('Metadata:\n')
44+
for section in sections[1:]: # skip the first chunk (before any Metadata:)
45+
metadata = {}
46+
for line in section.splitlines():
47+
if not line.strip(): # stop at first blank line
48+
break
49+
match = re.match(r'\s*(\w+)\s*=\s*(.+)', line)
50+
if match:
51+
key, val = match.groups()
52+
# Type conversion
53+
if val == "None":
54+
val = None
55+
elif val.isdigit() or (val.startswith('-') and val[1:].isdigit()):
56+
val = int(val)
57+
else:
58+
try:
59+
val = float(val)
60+
except ValueError:
61+
val = val.strip()
62+
metadata[key] = val
63+
if metadata:
64+
blocks.append(metadata)
65+
return blocks
66+
67+
68+
69+
def query_mcprodinfo(base_url, user, run_number, lpm_prod_tag, cert_dir="/tmp"):
70+
"""
71+
Queries MCProdInfo from CCDB. Returns object or None
72+
"""
73+
# check if the tokenfiles are there
74+
key_path = os.environ.get("JALIEN_TOKEN_KEY")
75+
cert_path = os.environ.get("JALIEN_TOKEN_CERT")
76+
if key_path == None and cert_path == None:
77+
uid = os.getuid()
78+
cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem")
79+
key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem")
80+
81+
# Build full URL
82+
user_path = 'Users/' + user[0] + '/' + user
83+
start = run_number
84+
stop = run_number + 1
85+
url = f"{base_url}/browse/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}"
86+
87+
response = requests.get(url, cert=(cert_path, key_path), verify=False)
88+
if response.status_code != 404:
89+
meta = extract_metadata_blocks_from_CCDB(response.content.decode('utf-8'))
90+
if (len(meta) > 0):
91+
def filter_known_fields(cls, data: dict) -> dict:
92+
valid_keys = {f.name for f in fields(cls)}
93+
return {k: v for k, v in data.items() if k in valid_keys}
94+
95+
clean_meta = filter_known_fields(MCProdInfo, meta[0])
96+
return MCProdInfo(**clean_meta)
97+
98+
return None
99+
100+
101+
def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_dir="/tmp"):
102+
"""
103+
Uploads an empty .dat file using client certificates.
104+
105+
Parameters:
106+
- base_url (str): The base HTTPS URL, e.g., "https://URL"
107+
- user (str): The uploader --> Determines location "Users/f/foo_bar/MCProdInfo/..."
108+
- keys (dict): Dictionary with meta information to upload, e.g., {"key1": "var1", "key2": "var2"}
109+
- cert_dir (str): Directory where the .pem files are located (default: /tmp)
110+
111+
Returns:
112+
- Response object from the POST request
113+
"""
114+
# Create an empty file
115+
empty_file = "empty.dat"
116+
with open(empty_file, "w") as f:
117+
f.write("0")
118+
119+
# Construct user ID-specific cert and key paths
120+
key_path = os.environ.get("JALIEN_TOKEN_KEY")
121+
cert_path = os.environ.get("JALIEN_TOKEN_CERT")
122+
if key_path == None and cert_path == None:
123+
uid = os.getuid()
124+
cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem")
125+
key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem")
126+
127+
# Build full URL
128+
query = "/".join(f"{k}={v}" for k, v in keys.items())
129+
user_path = 'Users/' + user[0] + '/' + user
130+
start = run_number
131+
stop = run_number + 1
132+
url = f"{base_url}/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}/{query}"
133+
134+
print (f"Full {url}")
135+
136+
# Prepare request
137+
with open(empty_file, 'rb') as f:
138+
files = {'blob': f}
139+
response = requests.post(url, files=files, cert=(cert_path, key_path), verify=False)
140+
141+
# Optional: remove the temporary file
142+
os.remove(empty_file)
143+
144+
return response

MC/run/ANCHOR/anchorMC.sh

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ SEED=${ALIEN_PROC_ID:-${SEED:-1}}
162162
ONCVMFS=0
163163

164164
if [ "${ALIEN_JDL_O2DPG_OVERWRITE}" ]; then
165-
echo "Setting O2DPG_ROOT to overwritten path"
165+
echo "Setting O2DPG_ROOT to overwritten path ${ALIEN_JDL_O2DPG_OVERWRITE}"
166166
export O2DPG_ROOT=${ALIEN_JDL_O2DPG_OVERWRITE}
167167
fi
168168

@@ -287,10 +287,21 @@ MODULES="--skipModules ZDC"
287287
# Since this is used, set it explicitly
288288
ALICEO2_CCDB_LOCALCACHE=${ALICEO2_CCDB_LOCALCACHE:-$(pwd)/ccdb}
289289

290+
# publish MCPRODINFO for first few jobs of a production
291+
# if external script exported PUBLISH_MCPRODINFO, it will be published anyways
292+
if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then
293+
PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo"
294+
echo "Will publish MCProdInfo"
295+
export AOD_ADDITIONAL_METADATA_FILE="mc-prod-meta-file.json"
296+
297+
else
298+
echo "Will not publish MCProdInfo"
299+
fi
300+
290301
# these arguments will be digested by o2dpg_sim_workflow_anchored.py
291302
baseargs="-tf ${NTIMEFRAMES} --split-id ${SPLITID} --prod-split ${PRODSPLIT} --cycle ${CYCLE} --run-number ${ALIEN_JDL_LPMRUNNUMBER} \
292303
${ALIEN_JDL_RUN_TIME_SPAN_FILE:+--run-time-span-file ${ALIEN_JDL_RUN_TIME_SPAN_FILE} ${ALIEN_JDL_INVERT_IRFRAME_SELECTION:+--invert-irframe-selection}} \
293-
${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}}"
304+
${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}} ${PUBLISH_MCPRODINFO_OPTION}"
294305

295306
# these arguments will be passed as well but only eventually be digested by o2dpg_sim_workflow.py which is called from o2dpg_sim_workflow_anchored.py
296307
remainingargs="-seed ${SEED} -ns ${NSIGEVENTS} --include-local-qc --pregenCollContext"

0 commit comments

Comments
 (0)