@@ -1854,11 +1854,56 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18541854 VariableContextHelpers::getTimeslice (variables);
18551855 forwardInputs (ref, slot, dropped, oldestOutputInfo, false , true );
18561856 };
1857+
1858+ auto onInsertion = [](ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) {
1859+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
1860+
1861+ auto & spec = ref.get <DeviceSpec const >();
1862+ bool hasForwards = spec.forwards .empty () == false ;
1863+ auto & context = ref.get <DataProcessorContext>();
1864+ if (context.canForwardEarly && hasForwards) {
1865+ O2_SIGNPOST_EVENT_EMIT (device, sid, " device" , " Early forwardinding before injecting data into relayer." );
1866+ auto & timesliceIndex = ref.get <TimesliceIndex>();
1867+ auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput ();
1868+
1869+ auto & proxy = ref.get <FairMQDeviceProxy>();
1870+
1871+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
1872+ O2_SIGNPOST_START (forwarding, sid, " forwardInputs" , " Starting forwarding for incoming messages with oldestTimeslice %zu with copy" ,
1873+ oldestTimeslice.timeslice .value );
1874+ std::vector<fair::mq::Parts> forwardedParts;
1875+ forwardedParts.resize (proxy.getNumForwards ());
1876+ DataProcessingHelpers::routeForwardedMessages (proxy, messages, forwardedParts, true , false );
1877+
1878+ for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
1879+ if (forwardedParts[fi].Size () == 0 ) {
1880+ continue ;
1881+ }
1882+ ForwardChannelInfo info = proxy.getForwardChannelInfo (ChannelIndex{fi});
1883+ auto & parts = forwardedParts[fi];
1884+ if (info.policy == nullptr ) {
1885+ O2_SIGNPOST_EVENT_EMIT_ERROR (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d has no policy." , info.name .c_str (), fi);
1886+ continue ;
1887+ }
1888+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding to %{public}s %d" , info.name .c_str (), fi);
1889+ info.policy ->forward (parts, ChannelIndex{fi}, ref);
1890+ }
1891+ auto & asyncQueue = ref.get <AsyncQueue>();
1892+ auto & decongestion = ref.get <DecongestionService>();
1893+ O2_SIGNPOST_ID_GENERATE (aid, async_queue);
1894+ O2_SIGNPOST_EVENT_EMIT (async_queue, aid, " forwardInputs" , " Queuing forwarding oldestPossible %zu" , oldestTimeslice.timeslice .value );
1895+ AsyncQueueHelpers::post (asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice , .id = decongestion.oldestPossibleTimesliceTask , .debounce = -1 , .callback = decongestionCallbackLate}
1896+ .user <DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1897+ O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
1898+ }
1899+ };
1900+
18571901 auto relayed = relayer.relay (parts.At (headerIndex)->GetData (),
18581902 &parts.At (headerIndex),
18591903 input,
18601904 nMessages,
18611905 nPayloadsPerHeader,
1906+ onInsertion,
18621907 onDrop);
18631908 switch (relayed.type ) {
18641909 case DataRelayer::RelayChoice::Type::Backpressured:
@@ -2273,9 +2318,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22732318 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
22742319
22752320 if (context.canForwardEarly && hasForwards && consumeSomething) {
2276- O2_SIGNPOST_EVENT_EMIT (device, aid, " device" , " Early forwainding: %{public}s." , fmt::format (" {}" , action.op ).c_str ());
2277- auto & timesliceIndex = ref.get <TimesliceIndex>();
2278- forwardInputs (ref, action.slot , currentSetOfInputs, timesliceIndex.getOldestPossibleOutput (), true , action.op == CompletionPolicy::CompletionOp::Consume);
2321+ // We used to do fowarding here, however we now do it much earlier.
2322+ // We still need to clean the inputs which were already consumed
2323+ // via ConsumeExisting and which still have an header to hold the slot.
2324+ DataProcessingHelpers::cleanForwardedMessageSet (currentSetOfInputs);
22792325 }
22802326 markInputsAsDone (action.slot );
22812327
0 commit comments