Skip to content

Commit 5f56801

Browse files
committed
DPL: fix a number of inconsistencies in the forwarding logic
If a message came together with a DomainInfoHeader or a SourceInfoHeader, it would have not been forwarded. If one (header, payload, ...) tuple in a MessageSet was to be copied, all the subsequent ones would have been copied. If one (header, payload, ...) tuple got redirected to more than one destination, all the subsequent ones would have been redirected there.
1 parent 769d464 commit 5f56801

File tree

3 files changed

+101
-110
lines changed

3 files changed

+101
-110
lines changed

DataFormats/Headers/include/Headers/DataHeader.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ struct BaseHeader {
373373
uint32_t flags;
374374
struct {
375375
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
376-
flagsReserved : 15, // reserved for future use
376+
flagsReserved : 14, // reserved for future use. MUST be filled with 0s.
377+
flagsDisabled : 1, // header should be ignored if this is 1
377378
flagsDerivedHeader : 16; // reserved for usage by the derived header
378379
};
379380
};
@@ -467,15 +468,20 @@ auto get(const std::byte* buffer, size_t /*len*/ = 0)
467468
// otherwise, we keep the code related to the exception outside the header file.
468469
// Note: Can not check on size because the O2 data model requires variable size headers
469470
// to be supported.
470-
if (current->sanityCheck(HeaderValueType::sVersion)) {
471+
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
472+
// If the first header matches and it's enabled, we return it
473+
// otherwise we look for more.
471474
return reinterpret_cast<HeaderConstPtrType>(current);
472475
}
473476
}
474477
auto* prev = current;
475478
while ((current = current->next())) {
476479
prev = current;
477480
if (current->description == HeaderValueType::sHeaderType) {
478-
if (current->sanityCheck(HeaderValueType::sVersion)) {
481+
// This is needed to allow disabling some headers from being picked up
482+
// even if they are matching. This is handy to have a quick
483+
// way to disable a sub headers without having to drop them.
484+
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
479485
return reinterpret_cast<HeaderConstPtrType>(current);
480486
}
481487
}

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 88 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -221,136 +221,124 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221
}
222222
}
223223

224-
static auto toBeForwardedHeader = [](void* header) -> bool {
225-
// If is now possible that the record is not complete when
226-
// we forward it, because of a custom completion policy.
227-
// this means that we need to skip the empty entries in the
228-
// record for being forwarded.
229-
if (header == nullptr) {
230-
return false;
231-
}
232-
auto sih = o2::header::get<SourceInfoHeader*>(header);
233-
if (sih) {
234-
return false;
235-
}
236-
237-
auto dih = o2::header::get<DomainInfoHeader*>(header);
238-
if (dih) {
239-
return false;
240-
}
241-
242-
auto dh = o2::header::get<header::DataHeader*>(header);
243-
if (!dh) {
244-
return false;
245-
}
246-
auto dph = o2::header::get<DataProcessingHeader*>(header);
247-
if (!dph) {
248-
return false;
249-
}
250-
return true;
251-
};
252-
253-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
254-
FairMQDeviceProxy& proxy,
255-
std::unique_ptr<fair::mq::Message>& header,
256-
std::unique_ptr<fair::mq::Message>& payload,
257-
size_t total,
258-
bool consume) {
259-
if (header.get() == nullptr) {
260-
// Missing an header is not an error anymore.
261-
// it simply means that we did not receive the
262-
// given input, but we were asked to
263-
// consume existing, so we skip it.
264-
return false;
265-
}
266-
if (payload.get() == nullptr && consume == true) {
267-
// If the payload is not there, it means we already
268-
// processed it with ConsumeExisiting. Therefore we
269-
// need to do something only if this is the last consume.
270-
header.reset(nullptr);
271-
return false;
272-
}
273-
274-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
275-
if (fdph == nullptr) {
276-
LOG(error) << "Data is missing DataProcessingHeader";
277-
return false;
278-
}
279-
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
280-
if (fdh == nullptr) {
281-
LOG(error) << "Data is missing DataHeader";
282-
return false;
283-
}
284-
285-
// We need to find the forward route only for the first
286-
// part of a split payload. All the others will use the same.
287-
// but always check if we have a sequence of multiple payloads
288-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
289-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
290-
}
291-
return cachedForwardingChoices.empty() == false;
292-
};
293-
294-
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
295-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
224+
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot,
225+
std::vector<MessageSet>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice,
226+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
296227
{
297228
// we collect all messages per forward in a map and send them together
298229
std::vector<fair::mq::Parts> forwardedParts;
299230
forwardedParts.resize(proxy.getNumForwards());
300-
std::vector<ChannelIndex> cachedForwardingChoices{};
231+
std::vector<ChannelIndex> forwardingChoices{};
301232
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
302233
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
303-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
234+
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
304235

305236
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
306237
auto& messageSet = currentSetOfInputs[ii];
307-
// In case the messageSet is empty, there is nothing to be done.
308-
if (messageSet.size() == 0) {
309-
continue;
310-
}
311-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
312-
continue;
313-
}
314-
cachedForwardingChoices.clear();
315238

316-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
317-
auto& messageSet = currentSetOfInputs[ii];
239+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
318240
auto& header = messageSet.header(pi);
241+
242+
// If is now possible that the record is not complete when
243+
// we forward it, because of a custom completion policy.
244+
// this means that we need to skip the empty entries in the
245+
// record for being forwarded.
246+
if (header->GetData() == nullptr) {
247+
continue;
248+
}
249+
250+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
251+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
252+
253+
if (dph == nullptr || dh == nullptr) {
254+
// Complain only if this is not an out-of-band message
255+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
256+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
257+
if (dih == nullptr || sih == nullptr) {
258+
LOGP(error, "Data is missing {}{}{}",
259+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
260+
}
261+
continue;
262+
}
263+
319264
auto& payload = messageSet.payload(pi);
320-
auto total = messageSet.getNumberOfPayloads(pi);
321265

322-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
266+
if (payload.get() == nullptr && consume == true) {
267+
// If the payload is not there, it means we already
268+
// processed it with ConsumeExisiting. Therefore we
269+
// need to do something only if this is the last consume.
270+
header.reset(nullptr);
323271
continue;
324272
}
325273

326-
// In case of more than one forward route, we need to copy the message.
327-
// This will eventually use the same mamory if running with the same backend.
328-
if (cachedForwardingChoices.size() > 1) {
329-
copy = true;
274+
// We need to find the forward route only for the first
275+
// part of a split payload. All the others will use the same.
276+
// Therefore, we reset and recompute the forwarding choice:
277+
//
278+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
279+
// which is actually always created and handled together
280+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
281+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
282+
// we will already use the same choice in the for loop below.
283+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
284+
forwardingChoices.clear();
285+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
286+
}
287+
288+
if (forwardingChoices.empty()) {
289+
// Nothing to forward go to the next messageset
290+
continue;
330291
}
331-
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
332-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
333292

334-
if (copy) {
335-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
293+
// In case of more than one forward route, we need to copy the message.
294+
// This will eventually use the same memory if running with the same backend.
295+
if (copyByDefault || forwardingChoices.size()) {
296+
for (auto& choice : forwardingChoices) {
336297
auto&& newHeader = header->GetTransport()->CreateMessage();
337298
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
338-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
299+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
339300
newHeader->Copy(*header);
340-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
301+
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
302+
if (dih) {
303+
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
304+
}
305+
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
306+
if (sih) {
307+
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
308+
}
309+
forwardedParts[choice.value].AddPart(std::move(newHeader));
341310

342311
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
343312
auto&& newPayload = header->GetTransport()->CreateMessage();
344313
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
345-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
314+
forwardedParts[choice.value].AddPart(std::move(newPayload));
346315
}
347316
}
348317
} else {
349318
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
350-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
351-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
319+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
320+
auto dih = o2::header::get<DomainInfoHeader*>(messageSet.header(pi)->GetData());
321+
auto sih = o2::header::get<SourceInfoHeader*>(messageSet.header(pi)->GetData());
322+
// We need to copy the header if it has extra timeframe accounting
323+
// information attached to it, so that we can disable it without having
324+
// a race condition in shared memory.
325+
if (dih || sih) {
326+
auto&& newHeader = header->GetTransport()->CreateMessage();
327+
newHeader->Copy(*header);
328+
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
329+
if (dih) {
330+
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
331+
}
332+
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
333+
if (sih) {
334+
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
335+
}
336+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(newHeader));
337+
} else {
338+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
339+
}
352340
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
353-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
341+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
354342
}
355343
}
356344
}

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
162162

163163
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
164164
REQUIRE(result.size() == 1); // One route
165-
REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2
165+
REQUIRE(result[0].Size() == 2);
166166
// Correct behavior below:
167167
// REQUIRE(result[0].Size() == 2);
168168
// REQUIRE(o2::header::get<SourceInfoHeader*>(result[0].At(0)->GetData()) == nullptr);
@@ -223,10 +223,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
223223

224224
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
225225
REQUIRE(result.size() == 1); // One route
226-
REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
227-
// FIXME: actually correct behavior below
228-
// REQUIRE(result[0].Size() == 2); // Two messages
229-
// REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
226+
REQUIRE(result[0].Size() == 2);
227+
REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
230228
}
231229

232230
TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
@@ -590,8 +588,7 @@ TEST_CASE("ForwardInputsSplitPayload")
590588
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
591589
REQUIRE(result.size() == 2); // Two routes
592590
CHECK(result[0].Size() == 2); // No messages on this route
593-
CHECK(result[1].Size() == 5); // FIXME: Multipart matching has side effects also for the elements
594-
// CHECK(result[1].Size() == 3); // FIXME: the correct forwarding is that only the multipart goes to the same route
591+
CHECK(result[1].Size() == 3);
595592
}
596593

597594
TEST_CASE("ForwardInputEOSSingleRoute")

0 commit comments

Comments
 (0)