Skip to content

Commit 4325909

Browse files
committed
DPL: add helper to clean consumed headers
1 parent 5e90876 commit 4325909

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ struct DataProcessingHelpers {
5959
/// Helper to route messages for forwarding
6060
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
6161
bool copy, bool consume);
62+
/// clean the headers when finally consuming a slot
63+
static void cleanForwardedMessageSet(std::vector<MessageSet>& currentSetOfInputs);
6264
};
6365
} // namespace o2::framework
6466
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,46 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
345345
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
346346
}
347347
return forwardedParts;
348-
};
348+
}
349+
350+
void DataProcessingHelpers::cleanForwardedMessageSet(std::vector<MessageSet>& currentSetOfInputs)
351+
{
352+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
353+
auto messages = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
354+
size_t pi = 0;
355+
while (pi < messages.size()) {
356+
auto& header = messages[pi];
357+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
358+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
359+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
360+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
361+
if (header->GetData() == nullptr || sih || dih || dph == nullptr || dh == nullptr) {
362+
pi += 2;
363+
continue;
364+
}
365+
366+
// At least one payload.
367+
auto& payload = messages[pi + 1];
368+
369+
if (payload.get() == nullptr) {
370+
// If the payload is not there, it means we already
371+
// processed it with ConsumeExisiting. Therefore we
372+
// need to do something only if this is the last consume.
373+
header.reset(nullptr);
374+
}
375+
376+
// Calculate the number of messages which should be handled together
377+
// all in one go.
378+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
379+
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
380+
pi += dh->splitPayloadParts;
381+
} else {
382+
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
383+
// In case splitPayloadParts = 0, we consider this as a single message pair
384+
pi += (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
385+
}
386+
}
387+
}
388+
}
349389

350390
} // namespace o2::framework

0 commit comments

Comments
 (0)