@@ -221,102 +221,128 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221 }
222222}
223223
224- auto DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy,
225- std::vector<MessageSet>& currentSetOfInputs,
226- const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
224+ void DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
225+ const bool copyByDefault, bool consume)
227226{
228- // we collect all messages per forward in a map and send them together
229- std::vector<fair::mq::Parts> forwardedParts;
230- forwardedParts.resize (proxy.getNumForwards ());
231- std::vector<ChannelIndex> forwardingChoices{};
232227 O2_SIGNPOST_ID_GENERATE (sid, forwarding);
228+ std::vector<ChannelIndex> forwardingChoices{};
229+ size_t pi = 0 ;
230+ while (pi < messages.size ()) {
231+ auto & header = messages[pi];
233232
234- for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
235- auto & messageSet = currentSetOfInputs[ii];
233+ // If is now possible that the record is not complete when
234+ // we forward it, because of a custom completion policy.
235+ // this means that we need to skip the empty entries in the
236+ // record for being forwarded.
237+ if (header->GetData () == nullptr ) {
238+ pi += 2 ;
239+ continue ;
240+ }
241+ auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
242+ if (dih) {
243+ pi += 2 ;
244+ continue ;
245+ }
246+ auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
247+ if (sih) {
248+ pi += 2 ;
249+ continue ;
250+ }
236251
237- for ( size_t pi = 0 ; pi < messageSet. size (); ++pi) {
238- auto & header = messageSet. header (pi );
252+ auto dph = o2::header::get<DataProcessingHeader*>(header-> GetData ());
253+ auto dh = o2:: header::get<o2::header::DataHeader*>(header-> GetData () );
239254
240- // If is now possible that the record is not complete when
241- // we forward it, because of a custom completion policy.
242- // this means that we need to skip the empty entries in the
243- // record for being forwarded.
244- if (header->GetData () == nullptr ) {
245- continue ;
246- }
247- auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
248- if (dih) {
249- continue ;
250- }
251- auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
252- if (sih) {
253- continue ;
254- }
255+ if (dph == nullptr || dh == nullptr ) {
256+ // Complain only if this is not an out-of-band message
257+ LOGP (error, " Data is missing {}{}{}" ,
258+ dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
259+ pi += 2 ;
260+ continue ;
261+ }
255262
256- auto dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
257- auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData ());
263+ // At least one payload.
264+ auto & payload = messages[pi + 1 ];
265+ // Calculate the number of messages which should be handled together
266+ // all in one go.
267+ size_t numberOfMessages = 0 ;
268+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
269+ // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
270+ numberOfMessages = dh->splitPayloadParts ;
271+ } else {
272+ // Sequence of splitPayloadParts (header, payload) pairs belonging together.
273+ // In case splitPayloadParts = 0, we consider this as a single message pair
274+ numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
275+ }
258276
259- if (dph == nullptr || dh == nullptr ) {
260- // Complain only if this is not an out-of-band message
261- LOGP (error, " Data is missing {}{}{}" ,
262- dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
263- continue ;
264- }
277+ if (payload.get () == nullptr && consume == true ) {
278+ // If the payload is not there, it means we already
279+ // processed it with ConsumeExisiting. Therefore we
280+ // need to do something only if this is the last consume.
281+ header.reset (nullptr );
282+ pi += numberOfMessages;
283+ continue ;
284+ }
265285
266- auto & payload = messageSet.payload (pi);
286+ // We need to find the forward route only for the first
287+ // part of a split payload. All the others will use the same.
288+ // Therefore, we reset and recompute the forwarding choice:
289+ //
290+ // - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
291+ // which is actually always created and handled together. Notice that in this
292+ // case we have splitPayloadParts == splitPayloadIndex
293+ // - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
294+ // belonging to the same multipart message (and therefore we are guaranteed that they
295+ // need to be routed together).
296+ // - If the message is not a multipart (splitPayloadParts 0) or has only one part
297+ // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
298+ // we will already use the same choice in the for loop below.
299+ //
267300
268- if (payload.get () == nullptr && consume == true ) {
269- // If the payload is not there, it means we already
270- // processed it with ConsumeExisiting. Therefore we
271- // need to do something only if this is the last consume.
272- header.reset (nullptr );
273- continue ;
274- }
301+ forwardingChoices.clear ();
302+ proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
275303
276- // We need to find the forward route only for the first
277- // part of a split payload. All the others will use the same.
278- // Therefore, we reset and recompute the forwarding choice:
279- //
280- // - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
281- // which is actually always created and handled together
282- // - If the message is not a multipart (splitPayloadParts 0) or has only one part
283- // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
284- // we will already use the same choice in the for loop below.
285- if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads (pi) > 0 ) {
286- forwardingChoices.clear ();
287- proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
288- }
304+ if (forwardingChoices.empty ()) {
305+ // Nothing to forward go to the next messageset
306+ pi += numberOfMessages;
307+ continue ;
308+ }
289309
290- if (forwardingChoices.empty ()) {
291- // Nothing to forward go to the next messageset
292- continue ;
293- }
310+ // In case of more than one forward route, we need to copy the message.
311+ // This will eventually use the same memory if running with the same backend.
312+ if (copyByDefault || forwardingChoices.size () > 1 ) {
313+ for (auto & choice : forwardingChoices) {
314+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
315+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
294316
295- // In case of more than one forward route, we need to copy the message.
296- // This will eventually use the same memory if running with the same backend.
297- if (copyByDefault || forwardingChoices.size () > 1 ) {
298- for (auto & choice : forwardingChoices) {
299- auto && newHeader = header->GetTransport ()->CreateMessage ();
300- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
301- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
302- newHeader->Copy (*header);
303- forwardedParts[choice.value ].AddPart (std::move (newHeader));
304-
305- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
306- auto && newPayload = header->GetTransport ()->CreateMessage ();
307- newPayload->Copy (*messageSet.payload (pi, payloadIndex));
308- forwardedParts[choice.value ].AddPart (std::move (newPayload));
309- }
310- }
311- } else {
312- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
313- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
314- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.header (pi)));
315- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
316- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.payload (pi, payloadIndex)));
317+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
318+ auto && newMsg = header->GetTransport ()->CreateMessage ();
319+ newMsg->Copy (*messages[ppi]);
320+ forwardedParts[choice.value ].AddPart (std::move (newMsg));
317321 }
318322 }
323+ } else {
324+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
325+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
326+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
327+ forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messages[ppi]));
328+ }
319329 }
330+ pi += numberOfMessages;
331+ }
332+ }
333+
334+ auto DataProcessingHelpers::routeForwardedMessageSet (FairMQDeviceProxy& proxy,
335+ std::vector<MessageSet>& currentSetOfInputs,
336+ const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
337+ {
338+ // we collect all messages per forward in a map and send them together
339+ std::vector<fair::mq::Parts> forwardedParts;
340+ forwardedParts.resize (proxy.getNumForwards ());
341+ std::vector<ChannelIndex> forwardingChoices{};
342+
343+ for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
344+ auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages );
345+ routeForwardedMessages (proxy, span, forwardedParts, copyByDefault, consume);
320346 }
321347 return forwardedParts;
322348};
0 commit comments