Skip to content

Commit 57e7546

Browse files
committed
DPL: fix a number of inconsistencies in the forwarding logic
If a message came together with a DomainInfoHeader or a SourceInfoHeader, it would have not been forwarded. If one (header, payload, ...) tuple in a MessageSet was to be copied, all the subsequent ones would have been copied. If one (header, payload, ...) tuple got redirected to more than one destination, all the subsequent ones would have been redirected there.
1 parent abe259d commit 57e7546

File tree

1 file changed

+60
-99
lines changed

1 file changed

+60
-99
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 60 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -550,76 +550,6 @@ void on_signal_callback(uv_signal_t* handle, int signum)
550550
O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
551551
}
552552

553-
static auto toBeForwardedHeader = [](void* header) -> bool {
554-
// If is now possible that the record is not complete when
555-
// we forward it, because of a custom completion policy.
556-
// this means that we need to skip the empty entries in the
557-
// record for being forwarded.
558-
if (header == nullptr) {
559-
return false;
560-
}
561-
auto sih = o2::header::get<SourceInfoHeader*>(header);
562-
if (sih) {
563-
return false;
564-
}
565-
566-
auto dih = o2::header::get<DomainInfoHeader*>(header);
567-
if (dih) {
568-
return false;
569-
}
570-
571-
auto dh = o2::header::get<DataHeader*>(header);
572-
if (!dh) {
573-
return false;
574-
}
575-
auto dph = o2::header::get<DataProcessingHeader*>(header);
576-
if (!dph) {
577-
return false;
578-
}
579-
return true;
580-
};
581-
582-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
583-
FairMQDeviceProxy& proxy,
584-
std::unique_ptr<fair::mq::Message>& header,
585-
std::unique_ptr<fair::mq::Message>& payload,
586-
size_t total,
587-
bool consume) {
588-
if (header.get() == nullptr) {
589-
// Missing an header is not an error anymore.
590-
// it simply means that we did not receive the
591-
// given input, but we were asked to
592-
// consume existing, so we skip it.
593-
return false;
594-
}
595-
if (payload.get() == nullptr && consume == true) {
596-
// If the payload is not there, it means we already
597-
// processed it with ConsumeExisiting. Therefore we
598-
// need to do something only if this is the last consume.
599-
header.reset(nullptr);
600-
return false;
601-
}
602-
603-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
604-
if (fdph == nullptr) {
605-
LOG(error) << "Data is missing DataProcessingHeader";
606-
return false;
607-
}
608-
auto fdh = o2::header::get<DataHeader*>(header->GetData());
609-
if (fdh == nullptr) {
610-
LOG(error) << "Data is missing DataHeader";
611-
return false;
612-
}
613-
614-
// We need to find the forward route only for the first
615-
// part of a split payload. All the others will use the same.
616-
// but always check if we have a sequence of multiple payloads
617-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
618-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
619-
}
620-
return cachedForwardingChoices.empty() == false;
621-
};
622-
623553
struct DecongestionContext {
624554
ServiceRegistryRef ref;
625555
TimesliceIndex::OldestOutputInfo oldestTimeslice;
@@ -658,69 +588,100 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
658588
// to the next one in the daisy chain.
659589
// FIXME: do it in a smarter way than O(N^2)
660590
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
661-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
591+
TimesliceIndex::OldestOutputInfo oldestTimeslice, const bool copyByDefault, bool consume = true) {
662592
auto& proxy = registry.get<FairMQDeviceProxy>();
663593
// we collect all messages per forward in a map and send them together
664594
std::vector<fair::mq::Parts> forwardedParts;
665595
forwardedParts.resize(proxy.getNumForwards());
666-
std::vector<ChannelIndex> cachedForwardingChoices{};
596+
std::vector<ChannelIndex> forwardingChoices{};
667597
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
668598
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
669-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
599+
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
670600

671601
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
672602
auto& messageSet = currentSetOfInputs[ii];
673-
// In case the messageSet is empty, there is nothing to be done.
674-
if (messageSet.size() == 0) {
675-
continue;
676-
}
677-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
678-
continue;
679-
}
680-
cachedForwardingChoices.clear();
681603

682-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
683-
auto& messageSet = currentSetOfInputs[ii];
604+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
684605
auto& header = messageSet.header(pi);
606+
607+
// If is now possible that the record is not complete when
608+
// we forward it, because of a custom completion policy.
609+
// this means that we need to skip the empty entries in the
610+
// record for being forwarded.
611+
if (header->GetData() == nullptr) {
612+
continue;
613+
}
614+
615+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
616+
auto dh = o2::header::get<DataHeader*>(header->GetData());
617+
618+
if (dph == nullptr || dh == nullptr) {
619+
// Complain only if this is not an out-of-band message
620+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
621+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
622+
if (dih == nullptr || sih == nullptr) {
623+
LOGP(error, "Data is missing {}{}{}",
624+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
625+
}
626+
continue;
627+
}
628+
685629
auto& payload = messageSet.payload(pi);
686-
auto total = messageSet.getNumberOfPayloads(pi);
687630

688-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
631+
if (payload.get() == nullptr && consume == true) {
632+
// If the payload is not there, it means we already
633+
// processed it with ConsumeExisiting. Therefore we
634+
// need to do something only if this is the last consume.
635+
header.reset(nullptr);
689636
continue;
690637
}
691638

692-
// In case of more than one forward route, we need to copy the message.
693-
// This will eventually use the same mamory if running with the same backend.
694-
if (cachedForwardingChoices.size() > 1) {
695-
copy = true;
639+
// We need to find the forward route only for the first
640+
// part of a split payload. All the others will use the same.
641+
// Therefore, we reset and recompute the forwarding choice:
642+
//
643+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
644+
// which is actually always created and handled together
645+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
646+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
647+
// we will already use the same choice in the for loop below.
648+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
649+
forwardingChoices.clear();
650+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
696651
}
697-
auto* dh = o2::header::get<DataHeader*>(header->GetData());
698-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
699652

700-
if (copy) {
701-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
653+
if (forwardingChoices.empty()) {
654+
// Nothing to forward go to the next messageset
655+
continue;
656+
}
657+
658+
// In case of more than one forward route, we need to copy the message.
659+
// This will eventually use the same memory if running with the same backend.
660+
if (copyByDefault || forwardingChoices.size()) {
661+
for (auto& choice : forwardingChoices) {
702662
auto&& newHeader = header->GetTransport()->CreateMessage();
703663
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
704-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
664+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
705665
newHeader->Copy(*header);
706-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
666+
forwardedParts[choice.value].AddPart(std::move(newHeader));
707667

708668
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
709669
auto&& newPayload = header->GetTransport()->CreateMessage();
710670
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
711-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
671+
forwardedParts[choice.value].AddPart(std::move(newPayload));
712672
}
713673
}
714674
} else {
715675
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
716-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
717-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
676+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
677+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
718678
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
719-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
679+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
720680
}
721681
}
722682
}
723683
}
684+
724685
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
725686
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
726687
if (forwardedParts[fi].Size() == 0) {

0 commit comments

Comments
 (0)