Skip to content

Commit 22cc6ac

Browse files
authored
Support for 2tag anchoredMC (#1924)
With this commit, we achieve support for 2tag operation in anchoredMC.
1 parent 03eb3cc commit 22cc6ac

File tree

6 files changed

+72
-15
lines changed

6 files changed

+72
-15
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,44 @@ def update_resource_estimates(workflow, resource_json):
470470
# a python dictionary
471471
def get_alienv_software_environment(packagestring):
472472
"""
473-
packagestring is something like O2::v202298081-1,O2Physics::xxx
473+
packagestring is something like O2::v202298081-1,O2Physics::xxx representing packages
474+
published on CVMFS ... or ... a file containing directly the software environment to apply
474475
"""
476+
477+
# the trivial cases do nothing
478+
if packagestring == None or packagestring == "" or packagestring == "None":
479+
return {}
480+
481+
def load_env_file(env_file):
482+
"""Transform an environment file generated with 'export > env.txt' into a python dictionary."""
483+
env_vars = {}
484+
with open(env_file, "r") as f:
485+
for line in f:
486+
line = line.strip()
487+
488+
# Ignore empty lines or comments
489+
if not line or line.startswith("#"):
490+
continue
491+
492+
# Remove 'declare -x ' if present
493+
if line.startswith("declare -x "):
494+
line = line.replace("declare -x ", "", 1)
495+
496+
# Handle case: "FOO" without "=" (assign empty string)
497+
if "=" not in line:
498+
key, value = line.strip(), ""
499+
else:
500+
key, value = line.split("=", 1)
501+
value = value.strip('"') # Remove surrounding quotes if present
502+
503+
env_vars[key.strip()] = value
504+
return env_vars
505+
506+
# see if this is a file
507+
if os.path.exists(packagestring) and os.path.isfile(packagestring):
508+
actionlogger.info("Taking software environment from file " + packagestring)
509+
return load_env_file(packagestring)
510+
475511
# alienv printenv packagestring --> dictionary
476512
# for the moment this works with CVMFS only
477513
cmd="/cvmfs/alice.cern.ch/bin/alienv printenv " + packagestring
@@ -1089,19 +1125,28 @@ def submit(self, tid, nice):
10891125
return subprocess.Popen(['/bin/bash','-c',drycommand], cwd=workdir)
10901126

10911127
taskenv = os.environ.copy()
1092-
# add task specific environment
1093-
if self.workflowspec['stages'][tid].get('env')!=None:
1094-
taskenv.update(self.workflowspec['stages'][tid]['env'])
1095-
10961128
# apply specific (non-default) software version, if any
10971129
# (this was setup earlier)
10981130
alternative_env = self.alternative_envs.get(tid, None)
1099-
if alternative_env != None:
1131+
if alternative_env != None and len(alternative_env) > 0:
11001132
actionlogger.info('Applying alternative software environment to task ' + self.idtotask[tid])
1101-
for entry in alternative_env:
1133+
if alternative_env.get('TERM') != None:
1134+
# the environment is a complete environment
1135+
taskenv = {}
1136+
taskenv = alternative_env
1137+
else:
1138+
for entry in alternative_env:
11021139
# overwrite what is present in default
11031140
taskenv[entry] = alternative_env[entry]
11041141

1142+
# add task specific environment
1143+
if self.workflowspec['stages'][tid].get('env')!=None:
1144+
taskenv.update(self.workflowspec['stages'][tid]['env'])
1145+
1146+
# envfilename = "taskenv_" + str(tid) + ".json"
1147+
# with open(envfilename, "w") as file:
1148+
# json.dump(taskenv, file, indent=2)
1149+
11051150
p = psutil.Popen(['/bin/bash','-c',c], cwd=workdir, env=taskenv)
11061151
try:
11071152
p.nice(nice)
@@ -1406,7 +1451,7 @@ def get_tar_command(dir='./', flags='cf', findtype='f', filename='checkpoint.tar
14061451

14071452
def init_alternative_software_environments(self):
14081453
"""
1409-
Initiatialises alternative software environments for specific tasks, if there
1454+
Initialises alternative software environments for specific tasks, if there
14101455
is an annotation in the workflow specificiation.
14111456
"""
14121457

MC/bin/o2dpg_sim_config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def parse_dpl_help_output(executable, envfile):
180180

181181
# the DEVNULL is important for o2-dpl workflows not to hang on non-interactive missing tty environments
182182
# it is cleaner that the echo | trick
183-
output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 10)
183+
output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 100)
184184
except subprocess.CalledProcessError:
185185
return {}, {}
186186

@@ -211,6 +211,5 @@ def get_dpl_options_for_executable(executable, envfile):
211211

212212
def option_if_available(executable, option, envfile = None):
213213
"""Checks if an option is available for a given executable and returns it as a string. Otherwise empty string"""
214-
# _, inverse_lookup = get_dpl_options_for_executable(executable, envfile)
215-
# return ' ' + option if option in inverse_lookup else ''
216-
return option
214+
_, inverse_lookup = get_dpl_options_for_executable(executable, envfile)
215+
return ' ' + option if option in inverse_lookup else ''

MC/bin/o2dpg_sim_workflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
16051605
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
16061606
# produce MonaLisa event stat file
16071607
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
1608+
AOD_merge_task['alternative_alienv_package'] = "None" # we want latest software for this step
16081609
workflow['stages'].append(AOD_merge_task)
16091610

16101611
job_merging = False

MC/bin/o2dpg_sim_workflow_anchored.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import json
1515
import math
1616
import pandas as pd
17+
import subprocess
18+
import shlex
1719

1820
# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)
1921

@@ -542,7 +544,14 @@ def main():
542544
print ("TIMESTAMP IS EXCLUDED IN RUN")
543545
else:
544546
print ("Creating time-anchored workflow...")
545-
os.system(cmd)
547+
print ("Executing: " + cmd)
548+
# os.system(cmd)
549+
try:
550+
cmd_list = shlex.split(os.path.expandvars(cmd))
551+
output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120)
552+
except subprocess.CalledProcessError:
553+
return {}, {}
554+
546555

547556
if __name__ == "__main__":
548557
sys.exit(main())

MC/bin/o2dpg_workflow_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ def matches_or_inherits_label(taskid, label, cache):
307307
for taskid in range(len(workflowspec['stages'])):
308308
if (matches_or_inherits_label(taskid, "RECO", matches_label)):
309309
# now we do the final adjust (as annotation) in the workflow itself
310-
workflowspec['stages'][taskid]["alternative_alienv_package"] = package
310+
if workflowspec['stages'][taskid].get("alternative_alienv_package") == None:
311+
workflowspec['stages'][taskid]["alternative_alienv_package"] = package
311312

312313
def merge_dicts(dict1, dict2):
313314
"""

MC/run/ANCHOR/anchorMC.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ remainingargs="${remainingargs} -productionTag ${ALIEN_JDL_LPMPRODUCTIONTAG:-ali
282282
# since the last passed argument wins, e.g. -productionTag cannot be overwritten by the user
283283
remainingargs="${ALIEN_JDL_ANCHOR_SIM_OPTIONS} ${remainingargs} --anchor-config config-json.json"
284284
# apply software tagging choice
285-
remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}"
285+
# remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}"
286+
remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${PWD}/env_async.env}"
287+
286288

287289
echo_info "baseargs passed to o2dpg_sim_workflow_anchored.py: ${baseargs}"
288290
echo_info "remainingargs forwarded to o2dpg_sim_workflow.py: ${remainingargs}"

0 commit comments

Comments
 (0)