@@ -174,7 +174,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
174174 callbacks.set <CallbackService::Id::Start>(producerChannelInit);
175175 }
176176 // the compute callback of the producer
177- auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared<size_t >()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds) {
177+ auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared<size_t >()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds, MessageContext& messageContext ) {
178178 int data = *counter;
179179 // outputs.make<int>(OutputRef{"data", 0}) = data;
180180
@@ -233,6 +233,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
233233 }
234234 // using utility from ExternalFairMQDeviceProxy
235235 o2::framework::sendOnChannel (device, messages, *channelName, (size_t )-1 );
236+ messageContext.fakeDispatch ();
236237
237238 if (++(*counter) >= nRolls) {
238239 // send the end of stream signal, this is transferred by the proxies
@@ -256,6 +257,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
256257 // add empty payload message
257258 out.AddPart (std::move (device.NewMessageFor (*channelName, 0 , 0 )));
258259 o2::framework::sendOnChannel (device, out, *channelName, (size_t )-1 );
260+ messageContext.fakeDispatch ();
259261 }
260262 }
261263 };
@@ -401,7 +403,9 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
401403 output.AddPart (std::move (inputs.At (msgidx)));
402404 }
403405 }
406+ auto &messageContext = services.get <MessageContext>();
404407 o2::framework::sendOnChannel (*device, output, channelName, (size_t )-1 );
408+ messageContext.fakeDispatch ();
405409 return output.Size () != 0 ;
406410 };
407411
0 commit comments