Skip to content

Commit 1ce6469

Browse files
committed
DPL: avoid MessageSet abstractions when forwarding
This is most likely faster, and it will allow us to move the early forwarding at an earlier stage where the data is not yet in a MessageSet.
1 parent 135238e commit 1ce6469

File tree

4 files changed

+127
-98
lines changed

4 files changed

+127
-98
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "Framework/TimesliceIndex.h"
1717
#include <fairmq/FwdDecls.h>
1818
#include <vector>
19+
#include <span>
1920

2021
namespace o2::framework
2122
{
@@ -53,8 +54,11 @@ struct DataProcessingHelpers {
5354
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
5455
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
5556
/// Helper to route messages for forwarding
56-
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
57-
bool copy, bool consume);
57+
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
58+
bool copy, bool consume);
59+
/// Helper to route messages for forwarding
60+
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
61+
bool copy, bool consume);
5862
};
5963
} // namespace o2::framework
6064
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
592592
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
593593
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
594594
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
595-
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume);
595+
auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume);
596596

597597
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
598598
if (forwardedParts[fi].Size() == 0) {

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 107 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -221,102 +221,128 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221
}
222222
}
223223

224-
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy,
225-
std::vector<MessageSet>& currentSetOfInputs,
226-
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
224+
void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
225+
const bool copyByDefault, bool consume)
227226
{
228-
// we collect all messages per forward in a map and send them together
229-
std::vector<fair::mq::Parts> forwardedParts;
230-
forwardedParts.resize(proxy.getNumForwards());
231-
std::vector<ChannelIndex> forwardingChoices{};
232227
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
228+
std::vector<ChannelIndex> forwardingChoices{};
229+
size_t pi = 0;
230+
while (pi < messages.size()) {
231+
auto& header = messages[pi];
233232

234-
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
235-
auto& messageSet = currentSetOfInputs[ii];
233+
// If is now possible that the record is not complete when
234+
// we forward it, because of a custom completion policy.
235+
// this means that we need to skip the empty entries in the
236+
// record for being forwarded.
237+
if (header->GetData() == nullptr) {
238+
pi += 2;
239+
continue;
240+
}
241+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
242+
if (dih) {
243+
pi += 2;
244+
continue;
245+
}
246+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
247+
if (sih) {
248+
pi += 2;
249+
continue;
250+
}
236251

237-
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
238-
auto& header = messageSet.header(pi);
252+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
253+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
239254

240-
// If is now possible that the record is not complete when
241-
// we forward it, because of a custom completion policy.
242-
// this means that we need to skip the empty entries in the
243-
// record for being forwarded.
244-
if (header->GetData() == nullptr) {
245-
continue;
246-
}
247-
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
248-
if (dih) {
249-
continue;
250-
}
251-
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
252-
if (sih) {
253-
continue;
254-
}
255+
if (dph == nullptr || dh == nullptr) {
256+
// Complain only if this is not an out-of-band message
257+
LOGP(error, "Data is missing {}{}{}",
258+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
259+
pi += 2;
260+
continue;
261+
}
255262

256-
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
257-
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
263+
// At least one payload.
264+
auto& payload = messages[pi + 1];
265+
// Calculate the number of messages which should be handled together
266+
// all in one go.
267+
size_t numberOfMessages = 0;
268+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
269+
// Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
270+
numberOfMessages = dh->splitPayloadParts + 1; // one is for the header
271+
} else {
272+
// Sequence of splitPayloadParts (header, payload) pairs belonging together.
273+
// In case splitPayloadParts = 0, we consider this as a single message pair
274+
numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
275+
}
258276

259-
if (dph == nullptr || dh == nullptr) {
260-
// Complain only if this is not an out-of-band message
261-
LOGP(error, "Data is missing {}{}{}",
262-
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
263-
continue;
264-
}
277+
if (payload.get() == nullptr && consume == true) {
278+
// If the payload is not there, it means we already
279+
// processed it with ConsumeExisiting. Therefore we
280+
// need to do something only if this is the last consume.
281+
header.reset(nullptr);
282+
pi += numberOfMessages;
283+
continue;
284+
}
265285

266-
auto& payload = messageSet.payload(pi);
286+
// We need to find the forward route only for the first
287+
// part of a split payload. All the others will use the same.
288+
// Therefore, we reset and recompute the forwarding choice:
289+
//
290+
// - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
291+
// which is actually always created and handled together. Notice that in this
292+
// case we have splitPayloadParts == splitPayloadIndex
293+
// - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
294+
// belonging to the same multipart message (and therefore we are guaranteed that they
295+
// need to be routed together).
296+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
297+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
298+
// we will already use the same choice in the for loop below.
299+
//
267300

268-
if (payload.get() == nullptr && consume == true) {
269-
// If the payload is not there, it means we already
270-
// processed it with ConsumeExisiting. Therefore we
271-
// need to do something only if this is the last consume.
272-
header.reset(nullptr);
273-
continue;
274-
}
301+
forwardingChoices.clear();
302+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
275303

276-
// We need to find the forward route only for the first
277-
// part of a split payload. All the others will use the same.
278-
// Therefore, we reset and recompute the forwarding choice:
279-
//
280-
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
281-
// which is actually always created and handled together
282-
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
283-
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
284-
// we will already use the same choice in the for loop below.
285-
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
286-
forwardingChoices.clear();
287-
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
288-
}
304+
if (forwardingChoices.empty()) {
305+
// Nothing to forward go to the next messageset
306+
pi += numberOfMessages;
307+
continue;
308+
}
289309

290-
if (forwardingChoices.empty()) {
291-
// Nothing to forward go to the next messageset
292-
continue;
293-
}
310+
// In case of more than one forward route, we need to copy the message.
311+
// This will eventually use the same memory if running with the same backend.
312+
if (copyByDefault || forwardingChoices.size() > 1) {
313+
for (auto& choice : forwardingChoices) {
314+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
315+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
294316

295-
// In case of more than one forward route, we need to copy the message.
296-
// This will eventually use the same memory if running with the same backend.
297-
if (copyByDefault || forwardingChoices.size() > 1) {
298-
for (auto& choice : forwardingChoices) {
299-
auto&& newHeader = header->GetTransport()->CreateMessage();
300-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
301-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
302-
newHeader->Copy(*header);
303-
forwardedParts[choice.value].AddPart(std::move(newHeader));
304-
305-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
306-
auto&& newPayload = header->GetTransport()->CreateMessage();
307-
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
308-
forwardedParts[choice.value].AddPart(std::move(newPayload));
309-
}
310-
}
311-
} else {
312-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
313-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
314-
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
315-
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
316-
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
317+
for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
318+
auto&& newMsg = header->GetTransport()->CreateMessage();
319+
newMsg->Copy(*messages[ppi]);
320+
forwardedParts[choice.value].AddPart(std::move(newMsg));
317321
}
318322
}
323+
} else {
324+
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
325+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
326+
for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
327+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messages[ppi]));
328+
}
319329
}
330+
pi += numberOfMessages;
331+
}
332+
}
333+
334+
auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
335+
std::vector<MessageSet>& currentSetOfInputs,
336+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
337+
{
338+
// we collect all messages per forward in a map and send them together
339+
std::vector<fair::mq::Parts> forwardedParts;
340+
forwardedParts.resize(proxy.getNumForwards());
341+
std::vector<ChannelIndex> forwardingChoices{};
342+
343+
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
344+
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
345+
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
320346
}
321347
return forwardedParts;
322348
};

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ TEST_CASE("ForwardInputsEmpty")
4545

4646
std::vector<MessageSet> currentSetOfInputs;
4747

48-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
48+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
4949
REQUIRE(result.empty());
5050
}
5151

@@ -95,7 +95,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute")
9595
REQUIRE(messageSet.size() == 1);
9696
currentSetOfInputs.emplace_back(std::move(messageSet));
9797

98-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
98+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
9999
REQUIRE(result.size() == 1); // One route
100100
REQUIRE(result[0].Size() == 2); // Two messages for that route
101101
}
@@ -146,7 +146,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
146146
REQUIRE(messageSet.size() == 1);
147147
currentSetOfInputs.emplace_back(std::move(messageSet));
148148

149-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, true);
149+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true);
150150
REQUIRE(result.size() == 1);
151151
REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed.
152152
}
@@ -201,8 +201,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
201201
REQUIRE(messageSet.size() == 1);
202202
currentSetOfInputs.emplace_back(std::move(messageSet));
203203

204-
205-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
204+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
206205
REQUIRE(result.size() == 1); // One route
207206
REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen.
208207
// Correct behavior below:
@@ -260,7 +259,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
260259
REQUIRE(messageSet.size() == 1);
261260
currentSetOfInputs.emplace_back(std::move(messageSet));
262261

263-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
262+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
264263
REQUIRE(result.size() == 1); // One route
265264
REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
266265
// FIXME: actually correct behavior below
@@ -325,7 +324,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
325324
REQUIRE(messageSet.size() == 1);
326325
currentSetOfInputs.emplace_back(std::move(messageSet));
327326

328-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
327+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
329328
REQUIRE(result.size() == 2); // Two routes
330329
REQUIRE(result[0].Size() == 2); // Two messages per route
331330
REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters
@@ -388,7 +387,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
388387
REQUIRE(messageSet.size() == 1);
389388
currentSetOfInputs.emplace_back(std::move(messageSet));
390389

391-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
390+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
392391
REQUIRE(result.size() == 2); // Two routes
393392
REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward
394393
REQUIRE(result[1].Size() == 2); //
@@ -466,7 +465,7 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
466465
currentSetOfInputs.emplace_back(std::move(messageSet2));
467466
REQUIRE(currentSetOfInputs.size() == 2);
468467

469-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
468+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
470469
REQUIRE(result.size() == 2); // Two routes
471470
REQUIRE(result[0].Size() == 2); //
472471
REQUIRE(result[1].Size() == 2); //
@@ -529,7 +528,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
529528
REQUIRE(messageSet.size() == 1);
530529
currentSetOfInputs.emplace_back(std::move(messageSet));
531530

532-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
531+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
533532
REQUIRE(result.size() == 2); // Two routes
534533
REQUIRE(result[0].Size() == 0); // Two messages per route
535534
REQUIRE(result[1].Size() == 2); // Two messages per route
@@ -541,7 +540,7 @@ TEST_CASE("ForwardInputsSplitPayload")
541540
dh.dataOrigin = "TST";
542541
dh.dataDescription = "A";
543542
dh.subSpecification = 0;
544-
dh.splitPayloadIndex = 0;
543+
dh.splitPayloadIndex = 2;
545544
dh.splitPayloadParts = 2;
546545

547546
o2::header::DataHeader dh2;
@@ -611,7 +610,7 @@ TEST_CASE("ForwardInputsSplitPayload")
611610
REQUIRE(messageSet.size() == 2);
612611
currentSetOfInputs.emplace_back(std::move(messageSet));
613612

614-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
613+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
615614
REQUIRE(result.size() == 2); // Two routes
616615
CHECK(result[0].Size() == 2); // No messages on this route
617616
CHECK(result[1].Size() == 3);
@@ -657,7 +656,7 @@ TEST_CASE("ForwardInputEOSSingleRoute")
657656
REQUIRE(messageSet.size() == 1);
658657
currentSetOfInputs.emplace_back(std::move(messageSet));
659658

660-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
659+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
661660
REQUIRE(result.size() == 1); // One route
662661
REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
663662
}
@@ -702,7 +701,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute")
702701
REQUIRE(messageSet.size() == 1);
703702
currentSetOfInputs.emplace_back(std::move(messageSet));
704703

705-
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume);
704+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
706705
REQUIRE(result.size() == 1); // One route
707706
REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
708707
}

0 commit comments

Comments
 (0)