Skip to content

Commit 9bbf6ec

Browse files
committed
DPL: more refactoring of the forwarding code
Use a single helper function to improve readability.
1 parent 91a991f commit 9bbf6ec

File tree

4 files changed

+84
-147
lines changed

4 files changed

+84
-147
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ struct DataProcessingHelpers {
5353
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
5454
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
5555
/// Helper to route messages for forwarding
56-
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
57-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume);
56+
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy,
57+
std::vector<MessageSet>& currentSetOfInputs,
58+
const bool copyByDefault, bool consume);
5859
};
5960
} // namespace o2::framework
6061
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,10 +588,12 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
588588
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589589
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590590
auto& proxy = registry.get<FairMQDeviceProxy>();
591-
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy, consume);
592591

593592
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
594-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
593+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
594+
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
595+
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume);
596+
595597
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
596598
if (forwardedParts[fi].Size() == 0) {
597599
continue;

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 64 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -228,129 +228,99 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
228228
}
229229
}
230230

231-
static auto toBeForwardedHeader = [](void* header) -> bool {
232-
// If is now possible that the record is not complete when
233-
// we forward it, because of a custom completion policy.
234-
// this means that we need to skip the empty entries in the
235-
// record for being forwarded.
236-
if (header == nullptr) {
237-
return false;
238-
}
239-
auto dh = o2::header::get<header::DataHeader*>(header);
240-
if (!dh) {
241-
return false;
242-
}
243-
bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
244-
!o2::header::get<DomainInfoHeader*>(header) &&
245-
o2::header::get<DataProcessingHeader*>(header);
246-
// DataHeader is there. Complain if we have unexpected headers present / missing
247-
if (!retval) {
248-
LOGP(error, "Dropping data because of malformed header structure");
249-
}
250-
return retval;
251-
};
252-
253-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
254-
FairMQDeviceProxy& proxy,
255-
std::unique_ptr<fair::mq::Message>& header,
256-
std::unique_ptr<fair::mq::Message>& payload,
257-
size_t total,
258-
bool consume) {
259-
if (header.get() == nullptr) {
260-
// Missing an header is not an error anymore.
261-
// it simply means that we did not receive the
262-
// given input, but we were asked to
263-
// consume existing, so we skip it.
264-
return false;
265-
}
266-
if (payload.get() == nullptr && consume == true) {
267-
// If the payload is not there, it means we already
268-
// processed it with ConsumeExisiting. Therefore we
269-
// need to do something only if this is the last consume.
270-
header.reset(nullptr);
271-
return false;
272-
}
273-
274-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
275-
if (fdph == nullptr) {
276-
LOG(error) << "Data is missing DataProcessingHeader";
277-
return false;
278-
}
279-
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
280-
if (fdh == nullptr) {
281-
LOG(error) << "Data is missing DataHeader";
282-
return false;
283-
}
284-
285-
// We need to find the forward route only for the first
286-
// part of a split payload. All the others will use the same.
287-
// but always check if we have a sequence of multiple payloads
288-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
289-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
290-
}
291-
return cachedForwardingChoices.empty() == false;
292-
};
293-
294-
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
295-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
231+
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy,
232+
std::vector<MessageSet>& currentSetOfInputs,
233+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
296234
{
297235
// we collect all messages per forward in a map and send them together
298236
std::vector<fair::mq::Parts> forwardedParts;
299237
forwardedParts.resize(proxy.getNumForwards());
300-
std::vector<ChannelIndex> cachedForwardingChoices{};
238+
std::vector<ChannelIndex> forwardingChoices{};
301239
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
302-
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
303-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
304240

305241
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
306242
auto& messageSet = currentSetOfInputs[ii];
307-
// In case the messageSet is empty, there is nothing to be done.
308-
if (messageSet.size() == 0) {
309-
continue;
310-
}
311-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
312-
continue;
313-
}
314-
cachedForwardingChoices.clear();
243+
forwardingChoices.clear();
315244

316-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
317-
auto& messageSet = currentSetOfInputs[ii];
245+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
318246
auto& header = messageSet.header(pi);
247+
248+
// If is now possible that the record is not complete when
249+
// we forward it, because of a custom completion policy.
250+
// this means that we need to skip the empty entries in the
251+
// record for being forwarded.
252+
if (header->GetData() == nullptr) {
253+
continue;
254+
}
255+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
256+
if (dih) {
257+
continue;
258+
}
259+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
260+
if (sih) {
261+
continue;
262+
}
263+
264+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
265+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
266+
267+
if (dph == nullptr || dh == nullptr) {
268+
// Complain only if this is not an out-of-band message
269+
LOGP(error, "Data is missing {}{}{}",
270+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
271+
continue;
272+
}
273+
319274
auto& payload = messageSet.payload(pi);
320-
auto total = messageSet.getNumberOfPayloads(pi);
321275

322-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
276+
if (payload.get() == nullptr && consume == true) {
277+
// If the payload is not there, it means we already
278+
// processed it with ConsumeExisiting. Therefore we
279+
// need to do something only if this is the last consume.
280+
header.reset(nullptr);
323281
continue;
324282
}
325283

326-
// In case of more than one forward route, we need to copy the message.
327-
// This will eventually use the same mamory if running with the same backend.
328-
if (cachedForwardingChoices.size() > 1) {
329-
copy = true;
284+
// We need to find the forward route only for the first
285+
// part of a split payload. All the others will use the same.
286+
// Therefore, we reset and recompute the forwarding choice:
287+
//
288+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
289+
// which is actually always created and handled together
290+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
291+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
292+
// we will already use the same choice in the for loop below.
293+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
294+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
330295
}
331-
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
332-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
333296

334-
if (copy) {
335-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
297+
if (forwardingChoices.empty()) {
298+
// Nothing to forward go to the next messageset
299+
continue;
300+
}
301+
302+
// In case of more than one forward route, we need to copy the message.
303+
// This will eventually use the same memory if running with the same backend.
304+
if (copyByDefault || forwardingChoices.size() > 1) {
305+
for (auto& choice : forwardingChoices) {
336306
auto&& newHeader = header->GetTransport()->CreateMessage();
337307
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
338-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
308+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
339309
newHeader->Copy(*header);
340-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
310+
forwardedParts[choice.value].AddPart(std::move(newHeader));
341311

342312
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
343313
auto&& newPayload = header->GetTransport()->CreateMessage();
344314
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
345-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
315+
forwardedParts[choice.value].AddPart(std::move(newPayload));
346316
}
347317
}
348318
} else {
349319
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
350-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
351-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
320+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
321+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
352322
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
353-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
323+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
354324
}
355325
}
356326
}

0 commit comments

Comments
 (0)