Skip to content

Commit 15049b0

Browse files
authored
Basic implementation of event pooling in O2DPG workflow (#1760)
* Allow to generate events for event-pool usage (no vertex applied + kinematic merging) * Example script demonstrating simple event pool creation and reading events from pool https://its.cern.ch/jira/browse/O2-5216
1 parent f26dfd2 commit 15049b0

File tree

3 files changed

+140
-23
lines changed

3 files changed

+140
-23
lines changed

MC/bin/o2dpg_sim_workflow.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import importlib.util
2222
import argparse
2323
from os import environ, mkdir
24-
from os.path import join, dirname, isdir, isabs
24+
from os.path import join, dirname, isdir, isabs, isfile
2525
import random
2626
import json
2727
import itertools
@@ -61,6 +61,7 @@
6161
parser.add_argument('-ini',help='generator init parameters file (full paths required), for example: ${O2DPG_ROOT}/MC/config/PWGHF/ini/GeneratorHF.ini', default='')
6262
parser.add_argument('-confKey',help='generator or trigger configuration key values, for example: "GeneratorPythia8.config=pythia8.cfg;A.x=y"', default='')
6363
parser.add_argument('--readoutDets',help='comma separated string of detectors readout (does not modify material budget - only hit creation)', default='all')
64+
parser.add_argument('--make-evtpool', help='Generate workflow for event pool creation.', action='store_true')
6465

6566
parser.add_argument('-interactionRate',help='Interaction rate, used in digitization', default=-1)
6667
parser.add_argument('-bcPatternFile',help='Bunch crossing pattern file, used in digitization (a file name or "ccdb")', default='')
@@ -699,9 +700,14 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
699700
# possible generator)
700701

701702
workflow['stages'].append(SGN_CONFIG_task)
703+
704+
# default flags for extkinO2 signal simulation (no transport)
705+
extkinO2Config = ''
706+
if GENERATOR == 'extkinO2':
707+
extkinO2Config = ';GeneratorFromO2Kine.randomize=true;GeneratorFromO2Kine.rngseed=' + str(TFSEED)
702708

703709
# determine final conf key for signal simulation
704-
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey))
710+
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey + extkinO2Config))
705711
# -----------------
706712
# transport signals
707713
# -----------------
@@ -741,7 +747,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
741747
# determine the skip number
742748
cmd = 'export HEPMCEVENTSKIP=$(${O2DPG_ROOT}/UTILS/ReadHepMCEventSkip.sh ../HepMCEventSkip.json ' + str(tf) + ');'
743749
SGNGENtask['cmd'] = cmd
744-
SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode kCCDB' \
750+
751+
# No vertexing for event pool generation
752+
vtxmode = 'kNoVertex' if args.make_evtpool else 'kCCDB'
753+
754+
SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode ' + vtxmode \
745755
+ ' --run ' + str(args.run) + ' ' + str(CONFKEY) + str(TRIGGER) \
746756
+ ' -g ' + str(GENERATOR) + ' ' + str(INIFILE) + ' -o genevents ' + embeddinto \
747757
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] \
@@ -754,6 +764,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
754764
if sep_event_mode == True:
755765
workflow['stages'].append(SGNGENtask)
756766
signalneeds = signalneeds + [SGNGENtask['name']]
767+
if args.make_evtpool:
768+
continue
769+
770+
# GeneratorFromO2Kine parameters are needed only before the transport
771+
CONFKEY = re.sub(r'GeneratorFromO2Kine.*?;', '', CONFKEY)
757772

758773
sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
759774
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"],
@@ -1520,26 +1535,32 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
15201535
TFcleanup['cmd'] += 'rm *cluster*.root'
15211536
workflow['stages'].append(TFcleanup)
15221537

1523-
# AOD merging as one global final step
1524-
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1525-
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
1526-
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
1527-
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
1528-
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
1529-
# produce MonaLisa event stat file
1530-
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
1531-
workflow['stages'].append(AOD_merge_task)
1532-
1533-
job_merging = False
1534-
if includeFullQC:
1535-
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))
1536-
1537-
1538-
if includeAnalysis:
1539-
# include analyses and potentially final QC upload tasks
1540-
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
1541-
if QUALITYCONTROL_ROOT:
1542-
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
1538+
if not args.make_evtpool:
1539+
# AOD merging as one global final step
1540+
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1541+
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
1542+
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
1543+
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
1544+
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
1545+
# produce MonaLisa event stat file
1546+
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
1547+
workflow['stages'].append(AOD_merge_task)
1548+
1549+
job_merging = False
1550+
if includeFullQC:
1551+
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))
1552+
1553+
if includeAnalysis:
1554+
# include analyses and potentially final QC upload tasks
1555+
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
1556+
if QUALITYCONTROL_ROOT:
1557+
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
1558+
else:
1559+
wfneeds=['sgngen_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1560+
tfpool=['tf' + str(tf) + '/genevents_Kine.root' for tf in range(1, NTIMEFRAMES + 1)]
1561+
POOL_merge_task = createTask(name='poolmerge', needs=wfneeds, lab=["POOL"], mem='2000', cpu='1')
1562+
POOL_merge_task['cmd'] = '${O2DPG_ROOT}/UTILS/root_merger.py -o evtpool.root -i ' + ','.join(tfpool)
1563+
workflow['stages'].append(POOL_merge_task)
15431564

15441565
# adjust for alternate (RECO) software environments
15451566
adjust_RECO_environment(workflow, args.alternative_reco_software)

MC/run/examples/event_pool.sh

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env bash
2+
3+
# Example on how to produce an event pool and how to feed it
4+
# to the O2DPG simulation workflow
5+
6+
# make sure O2DPG + O2 is loaded
7+
[ ! "${O2DPG_ROOT}" ] && echo "Error: This needs O2DPG loaded" && exit 1
8+
[ ! "${O2_ROOT}" ] && echo "Error: This needs O2 loaded" && exit 1
9+
10+
# Parse arguments
11+
MAKE=false
12+
INPUT=""
13+
14+
help() {
15+
echo "Usage: $0 [--make] [-i|--input <input_file>]"
16+
echo " --make: Create the event pool"
17+
echo " -i|--input: Input event pool file to be used in the simulation workflow. Alien paths are supported."
18+
echo " A full path must be provided (use of environment variables allowed), otherwise generation will fail."
19+
echo " -h|--help: Display this help message"
20+
exit 0
21+
}
22+
23+
while [[ "$#" -gt 0 ]]; do
24+
case $1 in
25+
--make) MAKE=true ;;
26+
-i|--input) INPUT="$2"; shift ;;
27+
-h|--help) help ;;
28+
*) echo "Unknown operation requested: $1"; help ;;
29+
esac
30+
shift
31+
done
32+
33+
if $MAKE; then
34+
echo "Started generation of event pool"
35+
# Workflow creation. All the parameters are used as examples
36+
# No transport will be executed. The workflow will stop at the event generation and will conclude with the merging of all the
37+
# kinematic root files of the timeframes in a file called evtpool.root in the current working directory
38+
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -col pp -gen pythia8 -proc cdiff -tf 2 -ns 5000 --make-evtpool -seed 546 -interactionRate 500000 -productionTag "evtpoolcreation" -o evtpool
39+
# Workflow runner
40+
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f evtpool.json -tt pool
41+
elif [[ -n "$INPUT" ]]; then
42+
echo "Input file provided: $INPUT"
43+
if [[ -f "$INPUT" && -s "$INPUT" ]] || [[ "$INPUT" == alien://* ]]; then
44+
# Workflow creation. Phi Rotation is set manually, while the event randomisation of the pool is set by default
45+
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -confKey "GeneratorFromO2Kine.randomphi=true;GeneratorFromO2Kine.fileName=$INPUT" -gen extkinO2 -tf 2 -ns 10 -e TGeant4 -j 4 -interactionRate 500000 -seed 546 -productionTag "evtpooltest"
46+
# Workflow runner. The rerun option is set in case you will run directly the script in the same folder (no need to manually delete files)
47+
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f workflow.json -tt aod --rerun-from grpcreate
48+
else
49+
echo "Error: File does not exist or is empty: $INPUT"
50+
exit 1
51+
fi
52+
else
53+
echo "Usage: $0 [--make] [-i|--input <input_file>]"
54+
exit 1
55+
fi

UTILS/root_merger.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/usr/bin/env python3
2+
3+
# Simple ROOT files merger
4+
5+
from ROOT import TFile, TFileMerger
6+
import sys
7+
import os
8+
import argparse
9+
10+
output_file = ''
11+
input_files = []
12+
# defining command line options
13+
14+
parser = argparse.ArgumentParser(description='Simple ROOT files merger',
15+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
16+
17+
parser.add_argument('-o','--output', help='Output ROOT filename', required=True)
18+
parser.add_argument('-i','--input', help='Input ROOT files to be merged, separated by a comma', required=True)
19+
20+
args = parser.parse_args()
21+
22+
output_file = args.output
23+
input_files = args.input.split(',')
24+
25+
merger = TFileMerger(False)
26+
merger.OutputFile(output_file)
27+
28+
for input_file in input_files:
29+
if os.path.exists(input_file):
30+
merger.AddFile(input_file)
31+
else:
32+
print(f"Fatal: {input_file} does not exist.")
33+
sys.exit(1)
34+
35+
if not merger.Merge():
36+
print("Error: Merging failed.")
37+
sys.exit(2)
38+
else:
39+
print(f"Successfully merged files into {output_file}")
40+
41+
sys.exit(0)

0 commit comments

Comments
 (0)