Skip to content

Commit 662c586

Browse files
jackal1-66alcaliva
authored andcommitted
Basic implementation of event pooling in O2DPG workflow (#1760)
* Allow to generate events for event-pool usage (no vertex applied + kinematic merging) * Example script demonstrating simple event pool creation and reading events from pool https://its.cern.ch/jira/browse/O2-5216 (cherry picked from commit 15049b0)
1 parent 6b38d22 commit 662c586

File tree

3 files changed

+140
-23
lines changed

3 files changed

+140
-23
lines changed

MC/bin/o2dpg_sim_workflow.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import importlib.util
2222
import argparse
2323
from os import environ, mkdir
24-
from os.path import join, dirname, isdir, isabs
24+
from os.path import join, dirname, isdir, isabs, isfile
2525
import random
2626
import json
2727
import itertools
@@ -61,6 +61,7 @@
6161
parser.add_argument('-ini',help='generator init parameters file (full paths required), for example: ${O2DPG_ROOT}/MC/config/PWGHF/ini/GeneratorHF.ini', default='')
6262
parser.add_argument('-confKey',help='generator or trigger configuration key values, for example: "GeneratorPythia8.config=pythia8.cfg;A.x=y"', default='')
6363
parser.add_argument('--readoutDets',help='comma separated string of detectors readout (does not modify material budget - only hit creation)', default='all')
64+
parser.add_argument('--make-evtpool', help='Generate workflow for event pool creation.', action='store_true')
6465

6566
parser.add_argument('-interactionRate',help='Interaction rate, used in digitization', default=-1)
6667
parser.add_argument('-bcPatternFile',help='Bunch crossing pattern file, used in digitization (a file name or "ccdb")', default='')
@@ -688,9 +689,14 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
688689
# possible generator)
689690

690691
workflow['stages'].append(SGN_CONFIG_task)
692+
693+
# default flags for extkinO2 signal simulation (no transport)
694+
extkinO2Config = ''
695+
if GENERATOR == 'extkinO2':
696+
extkinO2Config = ';GeneratorFromO2Kine.randomize=true;GeneratorFromO2Kine.rngseed=' + str(TFSEED)
691697

692698
# determine final conf key for signal simulation
693-
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey))
699+
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey + extkinO2Config))
694700
# -----------------
695701
# transport signals
696702
# -----------------
@@ -730,7 +736,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
730736
# determine the skip number
731737
cmd = 'export HEPMCEVENTSKIP=$(${O2DPG_ROOT}/UTILS/ReadHepMCEventSkip.sh ../HepMCEventSkip.json ' + str(tf) + ');'
732738
SGNGENtask['cmd'] = cmd
733-
SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode kCCDB' \
739+
740+
# No vertexing for event pool generation
741+
vtxmode = 'kNoVertex' if args.make_evtpool else 'kCCDB'
742+
743+
SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode ' + vtxmode \
734744
+ ' --run ' + str(args.run) + ' ' + str(CONFKEY) + str(TRIGGER) \
735745
+ ' -g ' + str(GENERATOR) + ' ' + str(INIFILE) + ' -o genevents ' + embeddinto \
736746
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] \
@@ -743,6 +753,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
743753
if sep_event_mode == True:
744754
workflow['stages'].append(SGNGENtask)
745755
signalneeds = signalneeds + [SGNGENtask['name']]
756+
if args.make_evtpool:
757+
continue
758+
759+
# GeneratorFromO2Kine parameters are needed only before the transport
760+
CONFKEY = re.sub(r'GeneratorFromO2Kine.*?;', '', CONFKEY)
746761

747762
sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
748763
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"],
@@ -1509,26 +1524,32 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
15091524
TFcleanup['cmd'] += 'rm *cluster*.root'
15101525
workflow['stages'].append(TFcleanup)
15111526

1512-
# AOD merging as one global final step
1513-
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1514-
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
1515-
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
1516-
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
1517-
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
1518-
# produce MonaLisa event stat file
1519-
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
1520-
workflow['stages'].append(AOD_merge_task)
1521-
1522-
job_merging = False
1523-
if includeFullQC:
1524-
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))
1525-
1526-
1527-
if includeAnalysis:
1528-
# include analyses and potentially final QC upload tasks
1529-
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
1530-
if QUALITYCONTROL_ROOT:
1531-
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
1527+
if not args.make_evtpool:
1528+
# AOD merging as one global final step
1529+
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1530+
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
1531+
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
1532+
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
1533+
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
1534+
# produce MonaLisa event stat file
1535+
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
1536+
workflow['stages'].append(AOD_merge_task)
1537+
1538+
job_merging = False
1539+
if includeFullQC:
1540+
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))
1541+
1542+
if includeAnalysis:
1543+
# include analyses and potentially final QC upload tasks
1544+
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
1545+
if QUALITYCONTROL_ROOT:
1546+
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
1547+
else:
1548+
wfneeds=['sgngen_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
1549+
tfpool=['tf' + str(tf) + '/genevents_Kine.root' for tf in range(1, NTIMEFRAMES + 1)]
1550+
POOL_merge_task = createTask(name='poolmerge', needs=wfneeds, lab=["POOL"], mem='2000', cpu='1')
1551+
POOL_merge_task['cmd'] = '${O2DPG_ROOT}/UTILS/root_merger.py -o evtpool.root -i ' + ','.join(tfpool)
1552+
workflow['stages'].append(POOL_merge_task)
15321553

15331554
# adjust for alternate (RECO) software environments
15341555
adjust_RECO_environment(workflow, args.alternative_reco_software)

MC/run/examples/event_pool.sh

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env bash
2+
3+
# Example on how to produce an event pool and how to feed it
4+
# to the O2DPG simulation workflow
5+
6+
# make sure O2DPG + O2 is loaded
7+
[ ! "${O2DPG_ROOT}" ] && echo "Error: This needs O2DPG loaded" && exit 1
8+
[ ! "${O2_ROOT}" ] && echo "Error: This needs O2 loaded" && exit 1
9+
10+
# Parse arguments
11+
MAKE=false
12+
INPUT=""
13+
14+
help() {
15+
echo "Usage: $0 [--make] [-i|--input <input_file>]"
16+
echo " --make: Create the event pool"
17+
echo " -i|--input: Input event pool file to be used in the simulation workflow. Alien paths are supported."
18+
echo " A full path must be provided (use of environment variables allowed), otherwise generation will fail."
19+
echo " -h|--help: Display this help message"
20+
exit 0
21+
}
22+
23+
while [[ "$#" -gt 0 ]]; do
24+
case $1 in
25+
--make) MAKE=true ;;
26+
-i|--input) INPUT="$2"; shift ;;
27+
-h|--help) help ;;
28+
*) echo "Unknown operation requested: $1"; help ;;
29+
esac
30+
shift
31+
done
32+
33+
if $MAKE; then
34+
echo "Started generation of event pool"
35+
# Workflow creation. All the parameters are used as examples
36+
# No transport will be executed. The workflow will stop at the event generation and will conclude with the merging of all the
37+
# kinematic root files of the timeframes in a file called evtpool.root in the current working directory
38+
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -col pp -gen pythia8 -proc cdiff -tf 2 -ns 5000 --make-evtpool -seed 546 -interactionRate 500000 -productionTag "evtpoolcreation" -o evtpool
39+
# Workflow runner
40+
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f evtpool.json -tt pool
41+
elif [[ -n "$INPUT" ]]; then
42+
echo "Input file provided: $INPUT"
43+
if [[ -f "$INPUT" && -s "$INPUT" ]] || [[ "$INPUT" == alien://* ]]; then
44+
# Workflow creation. Phi Rotation is set manually, while the event randomisation of the pool is set by default
45+
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -confKey "GeneratorFromO2Kine.randomphi=true;GeneratorFromO2Kine.fileName=$INPUT" -gen extkinO2 -tf 2 -ns 10 -e TGeant4 -j 4 -interactionRate 500000 -seed 546 -productionTag "evtpooltest"
46+
# Workflow runner. The rerun option is set in case you will run directly the script in the same folder (no need to manually delete files)
47+
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f workflow.json -tt aod --rerun-from grpcreate
48+
else
49+
echo "Error: File does not exist or is empty: $INPUT"
50+
exit 1
51+
fi
52+
else
53+
echo "Usage: $0 [--make] [-i|--input <input_file>]"
54+
exit 1
55+
fi

UTILS/root_merger.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/usr/bin/env python3
2+
3+
# Simple ROOT files merger
4+
5+
from ROOT import TFile, TFileMerger
6+
import sys
7+
import os
8+
import argparse
9+
10+
output_file = ''
11+
input_files = []
12+
# defining command line options
13+
14+
parser = argparse.ArgumentParser(description='Simple ROOT files merger',
15+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
16+
17+
parser.add_argument('-o','--output', help='Output ROOT filename', required=True)
18+
parser.add_argument('-i','--input', help='Input ROOT files to be merged, separated by a comma', required=True)
19+
20+
args = parser.parse_args()
21+
22+
output_file = args.output
23+
input_files = args.input.split(',')
24+
25+
merger = TFileMerger(False)
26+
merger.OutputFile(output_file)
27+
28+
for input_file in input_files:
29+
if os.path.exists(input_file):
30+
merger.AddFile(input_file)
31+
else:
32+
print(f"Fatal: {input_file} does not exist.")
33+
sys.exit(1)
34+
35+
if not merger.Merge():
36+
print("Error: Merging failed.")
37+
sys.exit(2)
38+
else:
39+
print(f"Successfully merged files into {output_file}")
40+
41+
sys.exit(0)

0 commit comments

Comments
 (0)