Skip to content

Commit aa79411

Browse files
committed
DPL: fix routing issues in forwarding
If one (header, payload, ...) tuple in a MessageSet was to be copied, all the subsequent ones would have been copied. If one (header, payload, ...) tuple got redirected to more than one destination, all the subsequent ones would have been redirected there.
1 parent 1cd002b commit aa79411

File tree

2 files changed

+66
-95
lines changed

2 files changed

+66
-95
lines changed

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 65 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -221,129 +221,101 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221
}
222222
}
223223

224-
static auto toBeForwardedHeader = [](void* header) -> bool {
225-
// If is now possible that the record is not complete when
226-
// we forward it, because of a custom completion policy.
227-
// this means that we need to skip the empty entries in the
228-
// record for being forwarded.
229-
if (header == nullptr) {
230-
return false;
231-
}
232-
auto dh = o2::header::get<header::DataHeader*>(header);
233-
if (!dh) {
234-
return false;
235-
}
236-
bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237-
!o2::header::get<DomainInfoHeader*>(header) &&
238-
o2::header::get<DataProcessingHeader*>(header);
239-
// DataHeader is there. Complain if we have unexpected headers present / missing
240-
if (!retval) {
241-
LOGP(error, "Dropping data because of malformed header structure");
242-
}
243-
return retval;
244-
};
245-
246-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247-
FairMQDeviceProxy& proxy,
248-
std::unique_ptr<fair::mq::Message>& header,
249-
std::unique_ptr<fair::mq::Message>& payload,
250-
size_t total,
251-
bool consume) {
252-
if (header.get() == nullptr) {
253-
// Missing an header is not an error anymore.
254-
// it simply means that we did not receive the
255-
// given input, but we were asked to
256-
// consume existing, so we skip it.
257-
return false;
258-
}
259-
if (payload.get() == nullptr && consume == true) {
260-
// If the payload is not there, it means we already
261-
// processed it with ConsumeExisiting. Therefore we
262-
// need to do something only if this is the last consume.
263-
header.reset(nullptr);
264-
return false;
265-
}
266-
267-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268-
if (fdph == nullptr) {
269-
LOG(error) << "Data is missing DataProcessingHeader";
270-
return false;
271-
}
272-
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273-
if (fdh == nullptr) {
274-
LOG(error) << "Data is missing DataHeader";
275-
return false;
276-
}
277-
278-
// We need to find the forward route only for the first
279-
// part of a split payload. All the others will use the same.
280-
// but always check if we have a sequence of multiple payloads
281-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
283-
}
284-
return cachedForwardingChoices.empty() == false;
285-
};
286-
287-
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
288-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
224+
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot,
225+
std::vector<MessageSet>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice,
226+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
289227
{
290228
// we collect all messages per forward in a map and send them together
291229
std::vector<fair::mq::Parts> forwardedParts;
292230
forwardedParts.resize(proxy.getNumForwards());
293-
std::vector<ChannelIndex> cachedForwardingChoices{};
231+
std::vector<ChannelIndex> forwardingChoices{};
294232
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
295233
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
234+
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
297235

298236
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299237
auto& messageSet = currentSetOfInputs[ii];
300-
// In case the messageSet is empty, there is nothing to be done.
301-
if (messageSet.size() == 0) {
302-
continue;
303-
}
304-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
305-
continue;
306-
}
307-
cachedForwardingChoices.clear();
308238

309-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310-
auto& messageSet = currentSetOfInputs[ii];
239+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
311240
auto& header = messageSet.header(pi);
241+
242+
// If is now possible that the record is not complete when
243+
// we forward it, because of a custom completion policy.
244+
// this means that we need to skip the empty entries in the
245+
// record for being forwarded.
246+
if (header->GetData() == nullptr) {
247+
continue;
248+
}
249+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
250+
if (dih) {
251+
continue;
252+
}
253+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
254+
if (sih) {
255+
continue;
256+
}
257+
258+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
259+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
260+
261+
if (dph == nullptr || dh == nullptr) {
262+
// Complain only if this is not an out-of-band message
263+
LOGP(error, "Data is missing {}{}{}",
264+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
265+
continue;
266+
}
267+
312268
auto& payload = messageSet.payload(pi);
313-
auto total = messageSet.getNumberOfPayloads(pi);
314269

315-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
270+
if (payload.get() == nullptr && consume == true) {
271+
// If the payload is not there, it means we already
272+
// processed it with ConsumeExisiting. Therefore we
273+
// need to do something only if this is the last consume.
274+
header.reset(nullptr);
316275
continue;
317276
}
318277

319-
// In case of more than one forward route, we need to copy the message.
320-
// This will eventually use the same mamory if running with the same backend.
321-
if (cachedForwardingChoices.size() > 1) {
322-
copy = true;
278+
// We need to find the forward route only for the first
279+
// part of a split payload. All the others will use the same.
280+
// Therefore, we reset and recompute the forwarding choice:
281+
//
282+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
283+
// which is actually always created and handled together
284+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
285+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
286+
// we will already use the same choice in the for loop below.
287+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
288+
forwardingChoices.clear();
289+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
323290
}
324-
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
326291

327-
if (copy) {
328-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
292+
if (forwardingChoices.empty()) {
293+
// Nothing to forward go to the next messageset
294+
continue;
295+
}
296+
297+
// In case of more than one forward route, we need to copy the message.
298+
// This will eventually use the same memory if running with the same backend.
299+
if (copyByDefault || forwardingChoices.size()) {
300+
for (auto& choice : forwardingChoices) {
329301
auto&& newHeader = header->GetTransport()->CreateMessage();
330302
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
331-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
303+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
332304
newHeader->Copy(*header);
333-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
305+
forwardedParts[choice.value].AddPart(std::move(newHeader));
334306

335307
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336308
auto&& newPayload = header->GetTransport()->CreateMessage();
337309
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
310+
forwardedParts[choice.value].AddPart(std::move(newPayload));
339311
}
340312
}
341313
} else {
342314
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
343-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
344-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
315+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
316+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
345317
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
318+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
347319
}
348320
}
349321
}

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,8 +644,7 @@ TEST_CASE("ForwardInputsSplitPayload")
644644
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
645645
REQUIRE(result.size() == 2); // Two routes
646646
CHECK(result[0].Size() == 2); // No messages on this route
647-
CHECK(result[1].Size() == 5); // FIXME: Multipart matching has side effects also for the elements
648-
// CHECK(result[1].Size() == 3); // FIXME: the correct forwarding is that only the multipart goes to the same route
647+
CHECK(result[1].Size() == 3);
649648
}
650649

651650
TEST_CASE("ForwardInputEOSSingleRoute")

0 commit comments

Comments
 (0)