Skip to content

Commit 348e9b1

Browse files
committed
DPL: refactor input forwarding routing
Separate routing of the forwarding to a separate helper. Add test for said helper.
1 parent 99a7714 commit 348e9b1

File tree

5 files changed

+838
-124
lines changed

5 files changed

+838
-124
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ add_executable(o2-test-framework-core
224224
test/test_FairMQOptionsRetriever.cxx
225225
test/test_FairMQResizableBuffer.cxx
226226
test/test_FairMQ.cxx
227+
test/test_ForwardInputs.cxx
227228
test/test_FrameworkDataFlowToDDS.cxx
228229
test/test_FrameworkDataFlowToO2Control.cxx
229230
test/test_Graphviz.cxx

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#define O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_
1313

1414
#include <cstddef>
15+
#include "Framework/TimesliceSlot.h"
16+
#include "Framework/TimesliceIndex.h"
17+
#include <fairmq/FwdDecls.h>
18+
#include <vector>
1519

1620
namespace o2::framework
1721
{
@@ -23,6 +27,9 @@ struct OutputChannelSpec;
2327
struct OutputChannelState;
2428
struct ProcessingPolicies;
2529
struct DeviceSpec;
30+
struct FairMQDeviceProxy;
31+
struct MessageSet;
32+
struct ChannelIndex;
2633
enum struct StreamingState;
2734
enum struct TransitionHandlingState;
2835

@@ -45,7 +52,9 @@ struct DataProcessingHelpers {
4552
static bool hasOnlyGenerated(DeviceSpec const& spec);
4653
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
4754
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
55+
/// Helper to route messages for forwarding
56+
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
57+
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume);
4858
};
49-
5059
} // namespace o2::framework
5160
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -550,69 +550,6 @@ void on_signal_callback(uv_signal_t* handle, int signum)
550550
O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
551551
}
552552

553-
static auto toBeForwardedHeader = [](void* header) -> bool {
554-
// If is now possible that the record is not complete when
555-
// we forward it, because of a custom completion policy.
556-
// this means that we need to skip the empty entries in the
557-
// record for being forwarded.
558-
if (header == nullptr) {
559-
return false;
560-
}
561-
auto dh = o2::header::get<DataHeader*>(header);
562-
if (!dh) {
563-
return false;
564-
}
565-
bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
566-
!o2::header::get<DomainInfoHeader*>(header) &&
567-
o2::header::get<DataProcessingHeader*>(header);
568-
// DataHeader is there. Complain if we have unexpected headers present / missing
569-
if (!retval) {
570-
LOGP(error, "Dropping data because of malformed header structure");
571-
}
572-
return retval;
573-
};
574-
575-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
576-
FairMQDeviceProxy& proxy,
577-
std::unique_ptr<fair::mq::Message>& header,
578-
std::unique_ptr<fair::mq::Message>& payload,
579-
size_t total,
580-
bool consume) {
581-
if (header.get() == nullptr) {
582-
// Missing an header is not an error anymore.
583-
// it simply means that we did not receive the
584-
// given input, but we were asked to
585-
// consume existing, so we skip it.
586-
return false;
587-
}
588-
if (payload.get() == nullptr && consume == true) {
589-
// If the payload is not there, it means we already
590-
// processed it with ConsumeExisiting. Therefore we
591-
// need to do something only if this is the last consume.
592-
header.reset(nullptr);
593-
return false;
594-
}
595-
596-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
597-
if (fdph == nullptr) {
598-
LOG(error) << "Data is missing DataProcessingHeader";
599-
return false;
600-
}
601-
auto fdh = o2::header::get<DataHeader*>(header->GetData());
602-
if (fdh == nullptr) {
603-
LOG(error) << "Data is missing DataHeader";
604-
return false;
605-
}
606-
607-
// We need to find the forward route only for the first
608-
// part of a split payload. All the others will use the same.
609-
// but always check if we have a sequence of multiple payloads
610-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
611-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
612-
}
613-
return cachedForwardingChoices.empty() == false;
614-
};
615-
616553
struct DecongestionContext {
617554
ServiceRegistryRef ref;
618555
TimesliceIndex::OldestOutputInfo oldestTimeslice;
@@ -653,67 +590,9 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
653590
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
654591
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
655592
auto& proxy = registry.get<FairMQDeviceProxy>();
656-
// we collect all messages per forward in a map and send them together
657-
std::vector<fair::mq::Parts> forwardedParts;
658-
forwardedParts.resize(proxy.getNumForwards());
659-
std::vector<ChannelIndex> cachedForwardingChoices{};
660-
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
661-
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
662-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
663-
664-
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
665-
auto& messageSet = currentSetOfInputs[ii];
666-
// In case the messageSet is empty, there is nothing to be done.
667-
if (messageSet.size() == 0) {
668-
continue;
669-
}
670-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
671-
continue;
672-
}
673-
cachedForwardingChoices.clear();
674-
675-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
676-
auto& messageSet = currentSetOfInputs[ii];
677-
auto& header = messageSet.header(pi);
678-
auto& payload = messageSet.payload(pi);
679-
auto total = messageSet.getNumberOfPayloads(pi);
680-
681-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
682-
continue;
683-
}
593+
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy, consume);
684594

685-
// In case of more than one forward route, we need to copy the message.
686-
// This will eventually use the same mamory if running with the same backend.
687-
if (cachedForwardingChoices.size() > 1) {
688-
copy = true;
689-
}
690-
auto* dh = o2::header::get<DataHeader*>(header->GetData());
691-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
692-
693-
if (copy) {
694-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
695-
auto&& newHeader = header->GetTransport()->CreateMessage();
696-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
697-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
698-
newHeader->Copy(*header);
699-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
700-
701-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
702-
auto&& newPayload = header->GetTransport()->CreateMessage();
703-
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
704-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
705-
}
706-
}
707-
} else {
708-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
709-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
710-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
711-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
712-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
713-
}
714-
}
715-
}
716-
}
595+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
717596
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
718597
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
719598
if (forwardedParts[fi].Size() == 0) {

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "MemoryResources/MemoryResources.h"
1717
#include "Framework/FairMQDeviceProxy.h"
1818
#include "Headers/DataHeader.h"
19+
#include "Headers/DataHeaderHelpers.h"
1920
#include "Headers/Stack.h"
2021
#include "Framework/Logger.h"
2122
#include "Framework/SendingPolicy.h"
@@ -31,6 +32,8 @@
3132
#include "Framework/ControlService.h"
3233
#include "Framework/DataProcessingContext.h"
3334
#include "Framework/DeviceStateEnums.h"
35+
#include "Headers/DataHeader.h"
36+
#include "Framework/DataProcessingHeader.h"
3437

3538
#include <fairmq/Device.h>
3639
#include <fairmq/Channel.h>
@@ -41,6 +44,7 @@
4144
O2_DECLARE_DYNAMIC_LOG(device);
4245
// Stream which keeps track of the calibration lifetime logic
4346
O2_DECLARE_DYNAMIC_LOG(calibration);
47+
O2_DECLARE_DYNAMIC_LOG(forwarding);
4448

4549
namespace o2::framework
4650
{
@@ -217,4 +221,134 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
217221
}
218222
}
219223

224+
static auto toBeForwardedHeader = [](void* header) -> bool {
225+
// If is now possible that the record is not complete when
226+
// we forward it, because of a custom completion policy.
227+
// this means that we need to skip the empty entries in the
228+
// record for being forwarded.
229+
if (header == nullptr) {
230+
return false;
231+
}
232+
auto dh = o2::header::get<header::DataHeader*>(header);
233+
if (!dh) {
234+
return false;
235+
}
236+
bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237+
!o2::header::get<DomainInfoHeader*>(header) &&
238+
o2::header::get<DataProcessingHeader*>(header);
239+
// DataHeader is there. Complain if we have unexpected headers present / missing
240+
if (!retval) {
241+
LOGP(error, "Dropping data because of malformed header structure");
242+
}
243+
return retval;
244+
};
245+
246+
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247+
FairMQDeviceProxy& proxy,
248+
std::unique_ptr<fair::mq::Message>& header,
249+
std::unique_ptr<fair::mq::Message>& payload,
250+
size_t total,
251+
bool consume) {
252+
if (header.get() == nullptr) {
253+
// Missing an header is not an error anymore.
254+
// it simply means that we did not receive the
255+
// given input, but we were asked to
256+
// consume existing, so we skip it.
257+
return false;
258+
}
259+
if (payload.get() == nullptr && consume == true) {
260+
// If the payload is not there, it means we already
261+
// processed it with ConsumeExisiting. Therefore we
262+
// need to do something only if this is the last consume.
263+
header.reset(nullptr);
264+
return false;
265+
}
266+
267+
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268+
if (fdph == nullptr) {
269+
LOG(error) << "Data is missing DataProcessingHeader";
270+
return false;
271+
}
272+
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273+
if (fdh == nullptr) {
274+
LOG(error) << "Data is missing DataHeader";
275+
return false;
276+
}
277+
278+
// We need to find the forward route only for the first
279+
// part of a split payload. All the others will use the same.
280+
// but always check if we have a sequence of multiple payloads
281+
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282+
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
283+
}
284+
return cachedForwardingChoices.empty() == false;
285+
};
286+
287+
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
288+
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
289+
{
290+
// we collect all messages per forward in a map and send them together
291+
std::vector<fair::mq::Parts> forwardedParts;
292+
forwardedParts.resize(proxy.getNumForwards());
293+
std::vector<ChannelIndex> cachedForwardingChoices{};
294+
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
295+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296+
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
297+
298+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299+
auto& messageSet = currentSetOfInputs[ii];
300+
// In case the messageSet is empty, there is nothing to be done.
301+
if (messageSet.size() == 0) {
302+
continue;
303+
}
304+
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
305+
continue;
306+
}
307+
cachedForwardingChoices.clear();
308+
309+
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310+
auto& messageSet = currentSetOfInputs[ii];
311+
auto& header = messageSet.header(pi);
312+
auto& payload = messageSet.payload(pi);
313+
auto total = messageSet.getNumberOfPayloads(pi);
314+
315+
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
316+
continue;
317+
}
318+
319+
// In case of more than one forward route, we need to copy the message.
320+
// This will eventually use the same mamory if running with the same backend.
321+
if (cachedForwardingChoices.size() > 1) {
322+
copy = true;
323+
}
324+
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325+
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
326+
327+
if (copy) {
328+
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
329+
auto&& newHeader = header->GetTransport()->CreateMessage();
330+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
331+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
332+
newHeader->Copy(*header);
333+
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
334+
335+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336+
auto&& newPayload = header->GetTransport()->CreateMessage();
337+
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338+
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
339+
}
340+
}
341+
} else {
342+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
343+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
344+
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
345+
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346+
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
347+
}
348+
}
349+
}
350+
}
351+
return forwardedParts;
352+
};
353+
220354
} // namespace o2::framework

0 commit comments

Comments
 (0)