Skip to content

Commit 032f733

Browse files
committed
DPL: refactor input forwarding routing
Separate routing of the forwarding to a separate helper. Add test for said helper.
1 parent a807b70 commit 032f733

File tree

5 files changed

+759
-131
lines changed

5 files changed

+759
-131
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ add_executable(o2-test-framework-core
224224
test/test_FairMQOptionsRetriever.cxx
225225
test/test_FairMQResizableBuffer.cxx
226226
test/test_FairMQ.cxx
227+
test/test_ForwardInputs.cxx
227228
test/test_FrameworkDataFlowToDDS.cxx
228229
test/test_FrameworkDataFlowToO2Control.cxx
229230
test/test_Graphviz.cxx

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#define O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_
1313

1414
#include <cstddef>
15+
#include "Framework/TimesliceSlot.h"
16+
#include "Framework/TimesliceIndex.h"
17+
#include <fairmq/FwdDecls.h>
18+
#include <vector>
1519

1620
namespace o2::framework
1721
{
@@ -23,6 +27,9 @@ struct OutputChannelSpec;
2327
struct OutputChannelState;
2428
struct ProcessingPolicies;
2529
struct DeviceSpec;
30+
struct FairMQDeviceProxy;
31+
struct MessageSet;
32+
struct ChannelIndex;
2633
enum struct StreamingState;
2734
enum struct TransitionHandlingState;
2835

@@ -45,7 +52,9 @@ struct DataProcessingHelpers {
4552
static bool hasOnlyGenerated(DeviceSpec const& spec);
4653
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
4754
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
55+
/// Helper to route messages for forwarding
56+
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
57+
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true);
4858
};
49-
5059
} // namespace o2::framework
5160
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -550,76 +550,6 @@ void on_signal_callback(uv_signal_t* handle, int signum)
550550
O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
551551
}
552552

553-
static auto toBeForwardedHeader = [](void* header) -> bool {
554-
// If is now possible that the record is not complete when
555-
// we forward it, because of a custom completion policy.
556-
// this means that we need to skip the empty entries in the
557-
// record for being forwarded.
558-
if (header == nullptr) {
559-
return false;
560-
}
561-
auto sih = o2::header::get<SourceInfoHeader*>(header);
562-
if (sih) {
563-
return false;
564-
}
565-
566-
auto dih = o2::header::get<DomainInfoHeader*>(header);
567-
if (dih) {
568-
return false;
569-
}
570-
571-
auto dh = o2::header::get<DataHeader*>(header);
572-
if (!dh) {
573-
return false;
574-
}
575-
auto dph = o2::header::get<DataProcessingHeader*>(header);
576-
if (!dph) {
577-
return false;
578-
}
579-
return true;
580-
};
581-
582-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
583-
FairMQDeviceProxy& proxy,
584-
std::unique_ptr<fair::mq::Message>& header,
585-
std::unique_ptr<fair::mq::Message>& payload,
586-
size_t total,
587-
bool consume) {
588-
if (header.get() == nullptr) {
589-
// Missing an header is not an error anymore.
590-
// it simply means that we did not receive the
591-
// given input, but we were asked to
592-
// consume existing, so we skip it.
593-
return false;
594-
}
595-
if (payload.get() == nullptr && consume == true) {
596-
// If the payload is not there, it means we already
597-
// processed it with ConsumeExisiting. Therefore we
598-
// need to do something only if this is the last consume.
599-
header.reset(nullptr);
600-
return false;
601-
}
602-
603-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
604-
if (fdph == nullptr) {
605-
LOG(error) << "Data is missing DataProcessingHeader";
606-
return false;
607-
}
608-
auto fdh = o2::header::get<DataHeader*>(header->GetData());
609-
if (fdh == nullptr) {
610-
LOG(error) << "Data is missing DataHeader";
611-
return false;
612-
}
613-
614-
// We need to find the forward route only for the first
615-
// part of a split payload. All the others will use the same.
616-
// but always check if we have a sequence of multiple payloads
617-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
618-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
619-
}
620-
return cachedForwardingChoices.empty() == false;
621-
};
622-
623553
struct DecongestionContext {
624554
ServiceRegistryRef ref;
625555
TimesliceIndex::OldestOutputInfo oldestTimeslice;
@@ -660,67 +590,9 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
660590
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
661591
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
662592
auto& proxy = registry.get<FairMQDeviceProxy>();
663-
// we collect all messages per forward in a map and send them together
664-
std::vector<fair::mq::Parts> forwardedParts;
665-
forwardedParts.resize(proxy.getNumForwards());
666-
std::vector<ChannelIndex> cachedForwardingChoices{};
667-
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
668-
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
669-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
670-
671-
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
672-
auto& messageSet = currentSetOfInputs[ii];
673-
// In case the messageSet is empty, there is nothing to be done.
674-
if (messageSet.size() == 0) {
675-
continue;
676-
}
677-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
678-
continue;
679-
}
680-
cachedForwardingChoices.clear();
681-
682-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
683-
auto& messageSet = currentSetOfInputs[ii];
684-
auto& header = messageSet.header(pi);
685-
auto& payload = messageSet.payload(pi);
686-
auto total = messageSet.getNumberOfPayloads(pi);
593+
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy);
687594

688-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
689-
continue;
690-
}
691-
692-
// In case of more than one forward route, we need to copy the message.
693-
// This will eventually use the same mamory if running with the same backend.
694-
if (cachedForwardingChoices.size() > 1) {
695-
copy = true;
696-
}
697-
auto* dh = o2::header::get<DataHeader*>(header->GetData());
698-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
699-
700-
if (copy) {
701-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
702-
auto&& newHeader = header->GetTransport()->CreateMessage();
703-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
704-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
705-
newHeader->Copy(*header);
706-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
707-
708-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
709-
auto&& newPayload = header->GetTransport()->CreateMessage();
710-
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
711-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
712-
}
713-
}
714-
} else {
715-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
716-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
717-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
718-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
719-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
720-
}
721-
}
722-
}
723-
}
595+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
724596
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
725597
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
726598
if (forwardedParts[fi].Size() == 0) {

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "MemoryResources/MemoryResources.h"
1717
#include "Framework/FairMQDeviceProxy.h"
1818
#include "Headers/DataHeader.h"
19+
#include "Headers/DataHeaderHelpers.h"
1920
#include "Headers/Stack.h"
2021
#include "Framework/Logger.h"
2122
#include "Framework/SendingPolicy.h"
@@ -31,6 +32,8 @@
3132
#include "Framework/ControlService.h"
3233
#include "Framework/DataProcessingContext.h"
3334
#include "Framework/DeviceStateEnums.h"
35+
#include "Headers/DataHeader.h"
36+
#include "Framework/DataProcessingHeader.h"
3437

3538
#include <fairmq/Device.h>
3639
#include <fairmq/Channel.h>
@@ -41,6 +44,7 @@
4144
O2_DECLARE_DYNAMIC_LOG(device);
4245
// Stream which keeps track of the calibration lifetime logic
4346
O2_DECLARE_DYNAMIC_LOG(calibration);
47+
O2_DECLARE_DYNAMIC_LOG(forwarding);
4448

4549
namespace o2::framework
4650
{
@@ -217,4 +221,141 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
217221
}
218222
}
219223

224+
static auto toBeForwardedHeader = [](void* header) -> bool {
225+
// If is now possible that the record is not complete when
226+
// we forward it, because of a custom completion policy.
227+
// this means that we need to skip the empty entries in the
228+
// record for being forwarded.
229+
if (header == nullptr) {
230+
return false;
231+
}
232+
auto sih = o2::header::get<SourceInfoHeader*>(header);
233+
if (sih) {
234+
return false;
235+
}
236+
237+
auto dih = o2::header::get<DomainInfoHeader*>(header);
238+
if (dih) {
239+
return false;
240+
}
241+
242+
auto dh = o2::header::get<header::DataHeader*>(header);
243+
if (!dh) {
244+
return false;
245+
}
246+
auto dph = o2::header::get<DataProcessingHeader*>(header);
247+
if (!dph) {
248+
return false;
249+
}
250+
return true;
251+
};
252+
253+
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
254+
FairMQDeviceProxy& proxy,
255+
std::unique_ptr<fair::mq::Message>& header,
256+
std::unique_ptr<fair::mq::Message>& payload,
257+
size_t total,
258+
bool consume) {
259+
if (header.get() == nullptr) {
260+
// Missing an header is not an error anymore.
261+
// it simply means that we did not receive the
262+
// given input, but we were asked to
263+
// consume existing, so we skip it.
264+
return false;
265+
}
266+
if (payload.get() == nullptr && consume == true) {
267+
// If the payload is not there, it means we already
268+
// processed it with ConsumeExisiting. Therefore we
269+
// need to do something only if this is the last consume.
270+
header.reset(nullptr);
271+
return false;
272+
}
273+
274+
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
275+
if (fdph == nullptr) {
276+
LOG(error) << "Data is missing DataProcessingHeader";
277+
return false;
278+
}
279+
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
280+
if (fdh == nullptr) {
281+
LOG(error) << "Data is missing DataHeader";
282+
return false;
283+
}
284+
285+
// We need to find the forward route only for the first
286+
// part of a split payload. All the others will use the same.
287+
// but always check if we have a sequence of multiple payloads
288+
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
289+
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
290+
}
291+
return cachedForwardingChoices.empty() == false;
292+
};
293+
294+
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
295+
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
296+
{
297+
// we collect all messages per forward in a map and send them together
298+
std::vector<fair::mq::Parts> forwardedParts;
299+
forwardedParts.resize(proxy.getNumForwards());
300+
std::vector<ChannelIndex> cachedForwardingChoices{};
301+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
302+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
303+
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
304+
305+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
306+
auto& messageSet = currentSetOfInputs[ii];
307+
// In case the messageSet is empty, there is nothing to be done.
308+
if (messageSet.size() == 0) {
309+
continue;
310+
}
311+
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
312+
continue;
313+
}
314+
cachedForwardingChoices.clear();
315+
316+
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
317+
auto& messageSet = currentSetOfInputs[ii];
318+
auto& header = messageSet.header(pi);
319+
auto& payload = messageSet.payload(pi);
320+
auto total = messageSet.getNumberOfPayloads(pi);
321+
322+
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
323+
continue;
324+
}
325+
326+
// In case of more than one forward route, we need to copy the message.
327+
// This will eventually use the same mamory if running with the same backend.
328+
if (cachedForwardingChoices.size() > 1) {
329+
copy = true;
330+
}
331+
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
332+
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
333+
334+
if (copy) {
335+
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
336+
auto&& newHeader = header->GetTransport()->CreateMessage();
337+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
338+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
339+
newHeader->Copy(*header);
340+
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
341+
342+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
343+
auto&& newPayload = header->GetTransport()->CreateMessage();
344+
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
345+
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
346+
}
347+
}
348+
} else {
349+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
350+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
351+
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
352+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
353+
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
354+
}
355+
}
356+
}
357+
}
358+
return forwardedParts;
359+
};
360+
220361
} // namespace o2::framework

0 commit comments

Comments
 (0)