Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,44 @@ def update_resource_estimates(workflow, resource_json):
# a python dictionary
def get_alienv_software_environment(packagestring):
"""
packagestring is something like O2::v202298081-1,O2Physics::xxx
packagestring is something like O2::v202298081-1,O2Physics::xxx representing packages
published on CVMFS ... or ... a file containing directly the software environment to apply
"""

# the trivial cases do nothing
if packagestring == None or packagestring == "" or packagestring == "None":
return {}

def load_env_file(env_file):
"""Transform an environment file generated with 'export > env.txt' into a python dictionary."""
env_vars = {}
with open(env_file, "r") as f:
for line in f:
line = line.strip()

# Ignore empty lines or comments
if not line or line.startswith("#"):
continue

# Remove 'declare -x ' if present
if line.startswith("declare -x "):
line = line.replace("declare -x ", "", 1)

# Handle case: "FOO" without "=" (assign empty string)
if "=" not in line:
key, value = line.strip(), ""
else:
key, value = line.split("=", 1)
value = value.strip('"') # Remove surrounding quotes if present

env_vars[key.strip()] = value
return env_vars

# see if this is a file
if os.path.exists(packagestring) and os.path.isfile(packagestring):
actionlogger.info("Taking software environment from file " + packagestring)
return load_env_file(packagestring)

# alienv printenv packagestring --> dictionary
# for the moment this works with CVMFS only
cmd="/cvmfs/alice.cern.ch/bin/alienv printenv " + packagestring
Expand Down Expand Up @@ -1089,19 +1125,28 @@ def submit(self, tid, nice):
return subprocess.Popen(['/bin/bash','-c',drycommand], cwd=workdir)

taskenv = os.environ.copy()
# add task specific environment
if self.workflowspec['stages'][tid].get('env')!=None:
taskenv.update(self.workflowspec['stages'][tid]['env'])

# apply specific (non-default) software version, if any
# (this was setup earlier)
alternative_env = self.alternative_envs.get(tid, None)
if alternative_env != None:
if alternative_env != None and len(alternative_env) > 0:
actionlogger.info('Applying alternative software environment to task ' + self.idtotask[tid])
for entry in alternative_env:
if alternative_env.get('TERM') != None:
# the environment is a complete environment
taskenv = {}
taskenv = alternative_env
else:
for entry in alternative_env:
# overwrite what is present in default
taskenv[entry] = alternative_env[entry]

# add task specific environment
if self.workflowspec['stages'][tid].get('env')!=None:
taskenv.update(self.workflowspec['stages'][tid]['env'])

# envfilename = "taskenv_" + str(tid) + ".json"
# with open(envfilename, "w") as file:
# json.dump(taskenv, file, indent=2)

p = psutil.Popen(['/bin/bash','-c',c], cwd=workdir, env=taskenv)
try:
p.nice(nice)
Expand Down Expand Up @@ -1406,7 +1451,7 @@ def get_tar_command(dir='./', flags='cf', findtype='f', filename='checkpoint.tar

def init_alternative_software_environments(self):
"""
Initiatialises alternative software environments for specific tasks, if there
Initialises alternative software environments for specific tasks, if there
is an annotation in the workflow specificiation.
"""

Expand Down
7 changes: 3 additions & 4 deletions MC/bin/o2dpg_sim_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def parse_dpl_help_output(executable, envfile):

# the DEVNULL is important for o2-dpl workflows not to hang on non-interactive missing tty environments
# it is cleaner that the echo | trick
output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 10)
output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 100)
except subprocess.CalledProcessError:
return {}, {}

Expand Down Expand Up @@ -211,6 +211,5 @@ def get_dpl_options_for_executable(executable, envfile):

def option_if_available(executable, option, envfile = None):
"""Checks if an option is available for a given executable and returns it as a string. Otherwise empty string"""
# _, inverse_lookup = get_dpl_options_for_executable(executable, envfile)
# return ' ' + option if option in inverse_lookup else ''
return option
_, inverse_lookup = get_dpl_options_for_executable(executable, envfile)
return ' ' + option if option in inverse_lookup else ''
1 change: 1 addition & 0 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
# produce MonaLisa event stat file
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
AOD_merge_task['alternative_alienv_package'] = "None" # we want latest software for this step
workflow['stages'].append(AOD_merge_task)

job_merging = False
Expand Down
11 changes: 10 additions & 1 deletion MC/bin/o2dpg_sim_workflow_anchored.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import json
import math
import pandas as pd
import subprocess
import shlex

# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc)

Expand Down Expand Up @@ -542,7 +544,14 @@ def main():
print ("TIMESTAMP IS EXCLUDED IN RUN")
else:
print ("Creating time-anchored workflow...")
os.system(cmd)
print ("Executing: " + cmd)
# os.system(cmd)
try:
cmd_list = shlex.split(os.path.expandvars(cmd))
output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120)
except subprocess.CalledProcessError:
return {}, {}


if __name__ == "__main__":
sys.exit(main())
3 changes: 2 additions & 1 deletion MC/bin/o2dpg_workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ def matches_or_inherits_label(taskid, label, cache):
for taskid in range(len(workflowspec['stages'])):
if (matches_or_inherits_label(taskid, "RECO", matches_label)):
# now we do the final adjust (as annotation) in the workflow itself
workflowspec['stages'][taskid]["alternative_alienv_package"] = package
if workflowspec['stages'][taskid].get("alternative_alienv_package") == None:
workflowspec['stages'][taskid]["alternative_alienv_package"] = package

def merge_dicts(dict1, dict2):
"""
Expand Down
4 changes: 3 additions & 1 deletion MC/run/ANCHOR/anchorMC.sh
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ remainingargs="${remainingargs} -productionTag ${ALIEN_JDL_LPMPRODUCTIONTAG:-ali
# since the last passed argument wins, e.g. -productionTag cannot be overwritten by the user
remainingargs="${ALIEN_JDL_ANCHOR_SIM_OPTIONS} ${remainingargs} --anchor-config config-json.json"
# apply software tagging choice
remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}"
# remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}"
remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${PWD}/env_async.env}"


echo_info "baseargs passed to o2dpg_sim_workflow_anchored.py: ${baseargs}"
echo_info "remainingargs forwarded to o2dpg_sim_workflow.py: ${remainingargs}"
Expand Down