Skip to content

Commit 0b4e8b1

Browse files
committed
DPL: do not do unneeded forwards / copies
The forwarding destination depends only on the current header in an (header, payloads...), not on the ones coming before it because wildcard could aggregate messages that then need to be forwarded to different consumers. Each multipart (header, payloads, ...) in MessageSet should be copied / forwarded as such depending in isolation.
1 parent 72b9f51 commit 0b4e8b1

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwar
615615
// part of a split payload. All the others will use the same.
616616
// but always check if we have a sequence of multiple payloads
617617
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
618+
cachedForwardingChoices.clear();
618619
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
619620
}
620621
return cachedForwardingChoices.empty() == false;
@@ -658,15 +659,15 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
658659
// to the next one in the daisy chain.
659660
// FIXME: do it in a smarter way than O(N^2)
660661
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
661-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
662+
TimesliceIndex::OldestOutputInfo oldestTimeslice, const bool copyByDefault, bool consume = true) {
662663
auto& proxy = registry.get<FairMQDeviceProxy>();
663664
// we collect all messages per forward in a map and send them together
664665
std::vector<fair::mq::Parts> forwardedParts;
665666
forwardedParts.resize(proxy.getNumForwards());
666667
std::vector<ChannelIndex> cachedForwardingChoices{};
667668
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
668669
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
669-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
670+
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
670671

671672
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
672673
auto& messageSet = currentSetOfInputs[ii];
@@ -679,8 +680,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
679680
}
680681
cachedForwardingChoices.clear();
681682

682-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
683-
auto& messageSet = currentSetOfInputs[ii];
683+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
684684
auto& header = messageSet.header(pi);
685685
auto& payload = messageSet.payload(pi);
686686
auto total = messageSet.getNumberOfPayloads(pi);
@@ -691,9 +691,8 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
691691

692692
// In case of more than one forward route, we need to copy the message.
693693
// This will eventually use the same mamory if running with the same backend.
694-
if (cachedForwardingChoices.size() > 1) {
695-
copy = true;
696-
}
694+
bool copy = copyByDefault || cachedForwardingChoices.size();
695+
697696
auto* dh = o2::header::get<DataHeader*>(header->GetData());
698697
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
699698

0 commit comments

Comments
 (0)