@@ -2108,28 +2108,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21082108 // should work just fine.
21092109 std::vector<MessageSet> currentSetOfInputs;
21102110
2111- // For the moment we have a simple "immediately dispatch" policy for stuff
2112- // in the cache. This could be controlled from the outside e.g. by waiting
2113- // for a few sets of inputs to arrive before we actually dispatch the
2114- // computation, however this can be defined at a later stage.
2115- auto canDispatchSomeComputation = [&completed, ref]() -> bool {
2116- ref.get <DataRelayer>().getReadyToProcess (completed);
2117- return completed.empty () == false ;
2118- };
2119-
2120- // We use this to get a list with the actual indexes in the cache which
2121- // indicate a complete set of inputs. Notice how I fill the completed
2122- // vector and return it, so that I can have a nice for loop iteration later
2123- // on.
2124- auto getReadyActions = [&completed, ref]() -> std::vector<DataRelayer::RecordAction> {
2125- auto & stats = ref.get <DataProcessingStats>();
2126- auto & relayer = ref.get <DataRelayer>();
2127- using namespace o2 ::framework;
2128- stats.updateStats ({(int )ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast <int64_t >(relayer.getParallelTimeslices () - completed.size ())});
2129- stats.updateStats ({(int )ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty () ? 1 : 0 });
2130- return completed;
2131- };
2132-
21332111 //
21342112 auto getInputSpan = [ref, ¤tSetOfInputs](TimesliceSlot slot, bool consume = true ) {
21352113 auto & relayer = ref.get <DataRelayer>();
@@ -2264,7 +2242,8 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22642242 control.notifyStreamingState (state.streaming );
22652243 };
22662244
2267- if (canDispatchSomeComputation () == false ) {
2245+ ref.get <DataRelayer>().getReadyToProcess (completed);
2246+ if (completed.empty () == true ) {
22682247 LOGP (debug, " No computations available for dispatching." );
22692248 return false ;
22702249 }
@@ -2322,7 +2301,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23222301 auto & streamContext = ref.get <StreamContext>();
23232302 O2_SIGNPOST_ID_GENERATE (sid, device);
23242303 O2_SIGNPOST_START (device, sid, " device" , " Start processing ready actions" );
2325- for (auto action : getReadyActions ()) {
2304+
2305+ auto & stats = ref.get <DataProcessingStats>();
2306+ auto & relayer = ref.get <DataRelayer>();
2307+ using namespace o2 ::framework;
2308+ stats.updateStats ({(int )ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast <int64_t >(relayer.getParallelTimeslices () - completed.size ())});
2309+ stats.updateStats ({(int )ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty () ? 1 : 0 });
2310+
2311+ for (auto action : completed) {
23262312 O2_SIGNPOST_ID_GENERATE (aid, device);
23272313 O2_SIGNPOST_START (device, aid, " device" , " Processing action on slot %lu for action %{public}s" , action.slot .index , fmt::format (" {}" , action.op ).c_str ());
23282314 if (action.op == CompletionPolicy::CompletionOp::Wait) {
0 commit comments