@@ -345,6 +345,46 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
345345 routeForwardedMessages (proxy, span, forwardedParts, copyByDefault, consume);
346346 }
347347 return forwardedParts;
348- };
348+ }
349+
350+ void DataProcessingHelpers::cleanForwardedMessageSet (std::vector<MessageSet>& currentSetOfInputs)
351+ {
352+ for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
353+ auto messages = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages );
354+ size_t pi = 0 ;
355+ while (pi < messages.size ()) {
356+ auto & header = messages[pi];
357+ auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
358+ auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
359+ auto dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
360+ auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData ());
361+ if (header->GetData () == nullptr || sih || dih || dph == nullptr || dh == nullptr ) {
362+ pi += 2 ;
363+ continue ;
364+ }
365+
366+ // At least one payload.
367+ auto & payload = messages[pi + 1 ];
368+
369+ if (payload.get () == nullptr ) {
370+ // If the payload is not there, it means we already
371+ // processed it with ConsumeExisiting. Therefore we
372+ // need to do something only if this is the last consume.
373+ header.reset (nullptr );
374+ }
375+
376+ // Calculate the number of messages which should be handled together
377+ // all in one go.
378+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
379+ // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 2]) pairs belonging together.
380+ pi += dh->splitPayloadParts + 1 ;
381+ } else {
382+ // Sequence of splitPayloadParts (header, payload) pairs belonging together.
383+ // In case splitPayloadParts = 0, we consider this as a single message pair
384+ pi += (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
385+ }
386+ }
387+ }
388+ }
349389
350390} // namespace o2::framework
0 commit comments