1313#include < fairlogger/Logger.h>
1414#include < algorithm>
1515
16+ #include < tbb/concurrent_queue.h>
17+ #include < tbb/task_arena.h>
18+ #include < tbb/parallel_for.h>
19+
1620namespace o2
1721{
1822namespace eventgen
@@ -34,7 +38,7 @@ GeneratorHybrid::GeneratorHybrid(const std::string& inputgens)
3438 }
3539 }
3640 int index = 0 ;
37- if (!mRandomize ) {
41+ if (!( mRandomize || mGenerationMode == GenMode:: kParallel ) ) {
3842 if (mFractions .size () != mInputGens .size ()) {
3943 LOG (fatal) << " Number of fractions does not match the number of generators" ;
4044 return ;
@@ -159,54 +163,116 @@ Bool_t GeneratorHybrid::Init()
159163 } else {
160164 LOG (info) << " Generators will be used in sequence, following provided fractions" ;
161165 }
166+
167+ if (mGenerationMode == GenMode::kParallel ) {
168+ // in parallel mode we just use one queue --> collaboration
169+ mResultQueue .resize (1 );
170+ } else {
171+ // in sequential mode we have one queue per generator
172+ mResultQueue .resize (gens.size ());
173+ }
174+ // Create a task arena with a specified number of threads
175+ mTaskArena .initialize (GeneratorHybridParam::Instance ().num_workers );
176+
177+ // the process task function actually calls event generation
178+ // when it is done it notifies the outside world by pushing it's index into an appropriate queue
179+ // This should be a lambda, which can be given at TaskPool creation time
180+ auto process_generator_task = [this ](int task) {
181+ LOG (info) << " Starting eventgen for task " << task;
182+ auto & generator = gens[task];
183+ generator->clearParticles ();
184+ generator->generateEvent ();
185+ generator->importParticles ();
186+ if (mGenerationMode == GenMode::kParallel ) {
187+ mResultQueue [0 ].push (task);
188+ } else {
189+ mResultQueue [task].push (task);
190+ }
191+ };
192+
193+ // fundamental tbb thread-worker function
194+ auto worker_function = [this , process_generator_task]() {
195+ while (!mStopFlag ) {
196+ int task;
197+ if (mInputTaskQueue .try_pop (task)) {
198+ process_generator_task (task); // Process the task
199+ } else {
200+ std::this_thread::sleep_for (std::chrono::milliseconds (10 )); // Wait if no task
201+ }
202+ }
203+ std::cout << " Worker thread exiting on thread " << std::this_thread::get_id () << std::endl;
204+ };
205+
206+ // let the TBB task system run in it's own thread
207+ mTBBTaskPoolRunner = std::thread ([this , worker_function]() { mTaskArena .execute ([&]() { tbb::parallel_for (0 , mTaskArena .max_concurrency (), [&](int ) { worker_function (); }); }); });
208+
209+ mTBBTaskPoolRunner .detach ();
210+
211+ // let's initially push initial generation tasks for each event generator
212+ for (size_t genindex = 0 ; genindex < gens.size (); ++genindex) {
213+ mInputTaskQueue .push (genindex);
214+ }
215+ mIsInitialized = true ;
162216 return Generator::Init ();
163217}
164218
165- Bool_t GeneratorHybrid::generateEvent ()
219+ bool GeneratorHybrid::generateEvent ()
166220{
167- // Order randomisation or sequence of generators
168- // following provided fractions. If not available generators will be used sequentially
169- if (mRandomize ) {
170- if (mRngFractions .size () != 0 ) {
171- // Generate number between 0 and 1
172- float rnum = gRandom ->Rndm ();
173- // Find generator index
174- for (int k = 0 ; k < mRngFractions .size (); k++) {
175- if (rnum <= mRngFractions [k]) {
176- mIndex = k;
177- break ;
221+ if (!mIsInitialized ) {
222+ Init ();
223+ }
224+ if (mGenerationMode == GenMode::kParallel ) {
225+ mIndex = -1 ; // this means any index is welcome
226+ notifySubGenerator (0 ); // we shouldn't distinguish the sub-gen ids
227+ } else {
228+ // Order randomisation or sequence of generators
229+ // following provided fractions, if not generators are used in proper sequence
230+ // Order randomisation or sequence of generators
231+ // following provided fractions. If not available generators will be used sequentially
232+ if (mRandomize ) {
233+ if (mRngFractions .size () != 0 ) {
234+ // Generate number between 0 and 1
235+ float rnum = gRandom ->Rndm ();
236+ // Find generator index
237+ for (int k = 0 ; k < mRngFractions .size (); k++) {
238+ if (rnum <= mRngFractions [k]) {
239+ mIndex = k;
240+ break ;
241+ }
178242 }
243+ } else {
244+ mIndex = gRandom ->Integer (mGens .size ());
179245 }
180246 } else {
181- mIndex = gRandom ->Integer (mGens .size ());
182- }
183- } else {
184- while (mFractions [mCurrentFraction ] == 0 || mseqCounter == mFractions [mCurrentFraction ]) {
185- if (mFractions [mCurrentFraction ] != 0 ) {
186- mseqCounter = 0 ;
247+ while (mFractions [mCurrentFraction ] == 0 || mseqCounter == mFractions [mCurrentFraction ]) {
248+ if (mFractions [mCurrentFraction ] != 0 ) {
249+ mseqCounter = 0 ;
250+ }
251+ mCurrentFraction = (mCurrentFraction + 1 ) % mFractions .size ();
187252 }
188- mCurrentFraction = ( mCurrentFraction + 1 ) % mFractions . size () ;
253+ mIndex = mCurrentFraction ;
189254 }
190- mIndex = mCurrentFraction ;
255+ notifySubGenerator ( mIndex ) ;
191256 }
192- if (mConfigs [mIndex ].compare (" " ) == 0 ) {
193- LOG (info) << " GeneratorHybrid: generating event with generator " << mGens [mIndex ];
194- } else {
195- LOG (info) << " GeneratorHybrid: generating event with generator " << mConfigs [mIndex ];
196- }
197- gens[mIndex ]->clearParticles (); // clear container of this class
198- gens[mIndex ]->generateEvent ();
199- // notify the sub event generator
200- notifySubGenerator (mIndex );
201- mseqCounter++;
202257 return true ;
203258}
204259
205- Bool_t GeneratorHybrid::importParticles ()
260+ bool GeneratorHybrid::importParticles ()
206261{
207- mParticles .clear (); // clear container of mother class
208- gens[mIndex ]->importParticles ();
209- std::copy (gens[mIndex ]->getParticles ().begin (), gens[mIndex ]->getParticles ().end (), std::back_insert_iterator (mParticles ));
262+ int genIndex = -1 ;
263+ if (mIndex == -1 ) {
264+ // this means parallel mode ---> we have a common queue
265+ mResultQueue [0 ].pop (genIndex);
266+ } else {
267+ // need to pop from a particular queue
268+ mResultQueue [mIndex ].pop (genIndex);
269+ }
270+
271+ // at this moment the mIndex-th generator is ready to be used
272+ std::copy (gens[genIndex]->getParticles ().begin (), gens[genIndex]->getParticles ().end (), std::back_insert_iterator (mParticles ));
273+
274+ // we initiate the next async event generation stage for this generator
275+ mInputTaskQueue .push (genIndex);
210276
211277 // we need to fix particles statuses --> need to enforce this on the importParticles level of individual generators
212278 for (auto & p : mParticles ) {
@@ -215,6 +281,7 @@ Bool_t GeneratorHybrid::importParticles()
215281 p.SetBit (ParticleStatus::kToBeDone , true );
216282 }
217283
284+ mseqCounter++;
218285 return true ;
219286}
220287
@@ -244,6 +311,21 @@ Bool_t GeneratorHybrid::parseJSON(const std::string& path)
244311 return false ;
245312 }
246313
314+ // check if there is a mode field
315+ if (doc.HasMember (" mode" )) {
316+ const auto & mode = doc[" mode" ].GetString ();
317+ if (mode == " sequential" ) {
318+ // events are generated in the order given by fractions or random weight
319+ mGenerationMode = GenMode::kSeq ;
320+ }
321+ if (mode == std::string (" parallel" )) {
322+ // events are generated fully in parallel and the order will be random
323+ // this is mainly for event pool generation or mono-type generators
324+ mGenerationMode = GenMode::kParallel ;
325+ LOG (info) << " Setting mode to parallel" ;
326+ }
327+ }
328+
247329 // Put the generator names in mInputGens
248330 if (doc.HasMember (" generators" )) {
249331 const auto & gens = doc[" generators" ];
0 commit comments