Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg, option_if_available, overwrite_config
from o2dpg_dpl_config_tools import parse_command_string, modify_dpl_command, dpl_option_from_config, TaskFinalizer

# for some JAliEn interaction
from alienpy.alien import JAlien

parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow')

# the run-number of data taking or default if unanchored
Expand Down Expand Up @@ -1581,6 +1584,18 @@ def getDigiTaskName(det):

aod_df_id = '{0:03}'.format(tf)

import os
aod_creator = os.getenv("JALIEN_USER")
if aod_creator == None:
# we use JAliEn to determine the user and capture it's output into a variable via redirect_stdout
import io
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
if JAlien(['whoami']) == 0:
aod_creator = f.getvalue().strip()
print (f"Determined GRID username {aod_creator}")

AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1')
AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding]
AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; '
Expand All @@ -1596,6 +1611,7 @@ def getDigiTaskName(det):
"--lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}",
"--anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}",
"--anchor-prod ${ALIEN_JDL_LPMANCHORPRODUCTION:-unknown}",
f"--created-by {aod_creator}",
"--combine-source-devices" if not args.no_combine_dpl_devices else "",
"--disable-mc" if args.no_mc_labels else "",
"--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "",
Expand Down
39 changes: 38 additions & 1 deletion MC/bin/o2dpg_sim_workflow_anchored.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
import subprocess
import shlex

# hack to find the script for meta upload
o2dpg_root = os.environ.get("O2DPG_ROOT")
if o2dpg_root is None:
raise EnvironmentError("O2DPG_ROOT is not set in the environment.")
mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo"))
sys.path.append(mc_prodinfo_path)
from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo
import dataclasses

# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)

# Example:
Expand Down Expand Up @@ -417,6 +426,16 @@ def parse_file(filename):
print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded")
return excluded

def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False):
print("Publishing MCProdInfo")

# see if this already has meta-data uploaded, otherwise do nothing
mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag)
if mc_prod_info_q == None:
# could make this depend on hash values in future
upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info))


def main():
parser = argparse.ArgumentParser(description='Creates an O2DPG simulation workflow, anchored to a given LHC run. The workflows are time anchored at regular positions within a run as a function of production size, split-id and cycle.')

Expand All @@ -431,6 +450,7 @@ def main():
parser.add_argument("--run-time-span-file", type=str, dest="run_span_file", help="Run-time-span-file for exclusions of timestamps (bad data periods etc.)", default="")
parser.add_argument("--invert-irframe-selection", action='store_true', help="Inverts the logic of --run-time-span-file")
parser.add_argument("--orbitsPerTF", type=str, help="Force a certain orbits-per-timeframe number; Automatically taken from CCDB if not given.", default="")
parser.add_argument('--publish-mcprodinfo', action='store_true', default=False, help="Publish MCProdInfo metadata to CCDB")
parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation
args = parser.parse_args()
print (args)
Expand Down Expand Up @@ -547,11 +567,28 @@ def main():
else:
print ("Creating time-anchored workflow...")
print ("Executing: " + cmd)
# os.system(cmd)
try:
cmd_list = shlex.split(os.path.expandvars(cmd))
output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120)
print (output)

# when we get here, we can publish info about the production (optionally)
if args.publish_mcprodinfo == True or os.getenv("PUBLISH_MCPRODINFO") != None:
prod_tag = os.getenv("ALIEN_JDL_LPMPRODUCTIONTAG")
grid_user_name = os.getenv("JALIEN_USER")
mcprod_ccdb_server = os.getenv("PUBLISH_MCPRODINFO_CCDBSERVER")
if mcprod_ccdb_server == None:
mcprod_ccdb_server = "https://alice-ccdb.cern.ch"
if prod_tag != None and grid_user_name != None:
info = MCProdInfo(LPMProductionTag = prod_tag,
Col = ColSystem,
IntRate =rate,
RunNumber = args.run_number,
OrbitsPerTF = GLOparams["OrbitsPerTF"])
publish_MCProdInfo(info, username = grid_user_name, ccdb_url = mcprod_ccdb_server)
else:
print("No production tag or GRID user name known. Not publishing MCProdInfo")

except subprocess.CalledProcessError as e:
print(f"Command failed with return code {e.returncode}")
print("Output:")
Expand Down
9 changes: 9 additions & 0 deletions MC/prodinfo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
This directory contains scripts and function to collect, define and upload
CCDB meta data objects for (official) MC productions.

This meta data can be queried in other stages, such as analysis, for the purpose of further data processing.

TODO:

- include cycle number in data
- include software versions (2tag or not)
144 changes: 144 additions & 0 deletions MC/prodinfo/mcprodinfo_ccdb_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import json
import os
import requests
import subprocess

import dataclasses # to define the MCProdInfo data layout and convert it to dict
from dataclasses import dataclass, field, asdict, fields
from typing import Optional
import hashlib

@dataclass(frozen=True)
class MCProdInfo:
"""
struct for MonteCarlo production info
"""
LPMProductionTag: str
Col: int
IntRate: float # only indicative of some interaction rate (could vary within the run)
RunNumber: int
OrbitsPerTF: int
# max_events_per_tf: Optional[int] = -1
Comment: Optional[str] = None
Hash: Optional[str] = field(default=None)

def __post_init__(self):
if self.Hash == None:
# Hash only the meaningful fields
data_to_hash = {
k: v for k, v in asdict(self).items()
if k != 'hash'
}
hash_str = hashlib.sha256(
json.dumps(data_to_hash, sort_keys=True).encode()
).hexdigest()
object.__setattr__(self, 'hash', hash_str)


import re

def extract_metadata_blocks_from_CCDB(text: str):
blocks = []
# Split on 'Metadata:\n' and iterate over each block
sections = text.split('Metadata:\n')
for section in sections[1:]: # skip the first chunk (before any Metadata:)
metadata = {}
for line in section.splitlines():
if not line.strip(): # stop at first blank line
break
match = re.match(r'\s*(\w+)\s*=\s*(.+)', line)
if match:
key, val = match.groups()
# Type conversion
if val == "None":
val = None
elif val.isdigit() or (val.startswith('-') and val[1:].isdigit()):
val = int(val)
else:
try:
val = float(val)
except ValueError:
val = val.strip()
metadata[key] = val
if metadata:
blocks.append(metadata)
return blocks



def query_mcprodinfo(base_url, user, run_number, lpm_prod_tag, cert_dir="/tmp"):
"""
Queries MCProdInfo from CCDB. Returns object or None
"""
# check if the tokenfiles are there
key_path = os.environ.get("JALIEN_TOKEN_KEY")
cert_path = os.environ.get("JALIEN_TOKEN_CERT")
if key_path == None and cert_path == None:
uid = os.getuid()
cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem")
key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem")

# Build full URL
user_path = 'Users/' + user[0] + '/' + user
start = run_number
stop = run_number + 1
url = f"{base_url}/browse/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}"

response = requests.get(url, cert=(cert_path, key_path), verify=False)
if response.status_code != 404:
meta = extract_metadata_blocks_from_CCDB(response.content.decode('utf-8'))
if (len(meta) > 0):
def filter_known_fields(cls, data: dict) -> dict:
valid_keys = {f.name for f in fields(cls)}
return {k: v for k, v in data.items() if k in valid_keys}

clean_meta = filter_known_fields(MCProdInfo, meta[0])
return MCProdInfo(**clean_meta)

return None


def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_dir="/tmp"):
"""
Uploads an empty .dat file using client certificates.

Parameters:
- base_url (str): The base HTTPS URL, e.g., "https://URL"
- user (str): The uploader --> Determines location "Users/f/foo_bar/MCProdInfo/..."
- keys (dict): Dictionary with meta information to upload, e.g., {"key1": "var1", "key2": "var2"}
- cert_dir (str): Directory where the .pem files are located (default: /tmp)

Returns:
- Response object from the POST request
"""
# Create an empty file
empty_file = "empty.dat"
with open(empty_file, "w") as f:
f.write("0")

# Construct user ID-specific cert and key paths
key_path = os.environ.get("JALIEN_TOKEN_KEY")
cert_path = os.environ.get("JALIEN_TOKEN_CERT")
if key_path == None and cert_path == None:
uid = os.getuid()
cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem")
key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem")

# Build full URL
query = "/".join(f"{k}={v}" for k, v in keys.items())
user_path = 'Users/' + user[0] + '/' + user
start = run_number
stop = run_number + 1
url = f"{base_url}/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}/{query}"

print (f"Full {url}")

# Prepare request
with open(empty_file, 'rb') as f:
files = {'blob': f}
response = requests.post(url, files=files, cert=(cert_path, key_path), verify=False)

# Optional: remove the temporary file
os.remove(empty_file)

return response
13 changes: 11 additions & 2 deletions MC/run/ANCHOR/anchorMC.sh
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ SEED=${ALIEN_PROC_ID:-${SEED:-1}}
ONCVMFS=0

if [ "${ALIEN_JDL_O2DPG_OVERWRITE}" ]; then
echo "Setting O2DPG_ROOT to overwritten path"
echo "Setting O2DPG_ROOT to overwritten path ${ALIEN_JDL_O2DPG_OVERWRITE}"
export O2DPG_ROOT=${ALIEN_JDL_O2DPG_OVERWRITE}
fi

Expand Down Expand Up @@ -287,10 +287,19 @@ MODULES="--skipModules ZDC"
# Since this is used, set it explicitly
ALICEO2_CCDB_LOCALCACHE=${ALICEO2_CCDB_LOCALCACHE:-$(pwd)/ccdb}

# publish MCPRODINFO for first few jobs of a production
# if external script exported PUBLISH_MCPRODINFO, it will be published anyways
if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then
PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo"
echo "Will publish MCProdInfo"
else
echo "Will not publish MCProdInfo"
fi

# these arguments will be digested by o2dpg_sim_workflow_anchored.py
baseargs="-tf ${NTIMEFRAMES} --split-id ${SPLITID} --prod-split ${PRODSPLIT} --cycle ${CYCLE} --run-number ${ALIEN_JDL_LPMRUNNUMBER} \
${ALIEN_JDL_RUN_TIME_SPAN_FILE:+--run-time-span-file ${ALIEN_JDL_RUN_TIME_SPAN_FILE} ${ALIEN_JDL_INVERT_IRFRAME_SELECTION:+--invert-irframe-selection}} \
${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}}"
${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}} ${PUBLISH_MCPRODINFO_OPTION}"

# these arguments will be passed as well but only eventually be digested by o2dpg_sim_workflow.py which is called from o2dpg_sim_workflow_anchored.py
remainingargs="-seed ${SEED} -ns ${NSIGEVENTS} --include-local-qc --pregenCollContext"
Expand Down