Skip to content

Commit d54ec0d

Browse files
committed
DPL: more refactoring of the forwarding code
Use a single helper function to improve readability.
1 parent 93ff0dc commit d54ec0d

File tree

4 files changed

+83
-147
lines changed

4 files changed

+83
-147
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ struct DataProcessingHelpers {
5353
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
5454
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
5555
/// Helper to route messages for forwarding
56-
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
57-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume);
56+
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
57+
bool copy, bool consume);
5858
};
5959
} // namespace o2::framework
6060
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,10 +588,12 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
588588
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589589
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590590
auto& proxy = registry.get<FairMQDeviceProxy>();
591-
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy, consume);
592591

593592
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
594-
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
593+
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
594+
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
595+
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume);
596+
595597
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
596598
if (forwardedParts[fi].Size() == 0) {
597599
continue;

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 64 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -221,129 +221,99 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221
}
222222
}
223223

224-
static auto toBeForwardedHeader = [](void* header) -> bool {
225-
// If is now possible that the record is not complete when
226-
// we forward it, because of a custom completion policy.
227-
// this means that we need to skip the empty entries in the
228-
// record for being forwarded.
229-
if (header == nullptr) {
230-
return false;
231-
}
232-
auto dh = o2::header::get<header::DataHeader*>(header);
233-
if (!dh) {
234-
return false;
235-
}
236-
bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237-
!o2::header::get<DomainInfoHeader*>(header) &&
238-
o2::header::get<DataProcessingHeader*>(header);
239-
// DataHeader is there. Complain if we have unexpected headers present / missing
240-
if (!retval) {
241-
LOGP(error, "Dropping data because of malformed header structure");
242-
}
243-
return retval;
244-
};
245-
246-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247-
FairMQDeviceProxy& proxy,
248-
std::unique_ptr<fair::mq::Message>& header,
249-
std::unique_ptr<fair::mq::Message>& payload,
250-
size_t total,
251-
bool consume) {
252-
if (header.get() == nullptr) {
253-
// Missing an header is not an error anymore.
254-
// it simply means that we did not receive the
255-
// given input, but we were asked to
256-
// consume existing, so we skip it.
257-
return false;
258-
}
259-
if (payload.get() == nullptr && consume == true) {
260-
// If the payload is not there, it means we already
261-
// processed it with ConsumeExisiting. Therefore we
262-
// need to do something only if this is the last consume.
263-
header.reset(nullptr);
264-
return false;
265-
}
266-
267-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268-
if (fdph == nullptr) {
269-
LOG(error) << "Data is missing DataProcessingHeader";
270-
return false;
271-
}
272-
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273-
if (fdh == nullptr) {
274-
LOG(error) << "Data is missing DataHeader";
275-
return false;
276-
}
277-
278-
// We need to find the forward route only for the first
279-
// part of a split payload. All the others will use the same.
280-
// but always check if we have a sequence of multiple payloads
281-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
283-
}
284-
return cachedForwardingChoices.empty() == false;
285-
};
286-
287-
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
288-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
224+
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy,
225+
std::vector<MessageSet>& currentSetOfInputs,
226+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
289227
{
290228
// we collect all messages per forward in a map and send them together
291229
std::vector<fair::mq::Parts> forwardedParts;
292230
forwardedParts.resize(proxy.getNumForwards());
293-
std::vector<ChannelIndex> cachedForwardingChoices{};
231+
std::vector<ChannelIndex> forwardingChoices{};
294232
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
295-
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
297233

298234
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299235
auto& messageSet = currentSetOfInputs[ii];
300-
// In case the messageSet is empty, there is nothing to be done.
301-
if (messageSet.size() == 0) {
302-
continue;
303-
}
304-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
305-
continue;
306-
}
307-
cachedForwardingChoices.clear();
236+
forwardingChoices.clear();
308237

309-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310-
auto& messageSet = currentSetOfInputs[ii];
238+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
311239
auto& header = messageSet.header(pi);
240+
241+
// If is now possible that the record is not complete when
242+
// we forward it, because of a custom completion policy.
243+
// this means that we need to skip the empty entries in the
244+
// record for being forwarded.
245+
if (header->GetData() == nullptr) {
246+
continue;
247+
}
248+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
249+
if (dih) {
250+
continue;
251+
}
252+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
253+
if (sih) {
254+
continue;
255+
}
256+
257+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
258+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
259+
260+
if (dph == nullptr || dh == nullptr) {
261+
// Complain only if this is not an out-of-band message
262+
LOGP(error, "Data is missing {}{}{}",
263+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
264+
continue;
265+
}
266+
312267
auto& payload = messageSet.payload(pi);
313-
auto total = messageSet.getNumberOfPayloads(pi);
314268

315-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
269+
if (payload.get() == nullptr && consume == true) {
270+
// If the payload is not there, it means we already
271+
// processed it with ConsumeExisiting. Therefore we
272+
// need to do something only if this is the last consume.
273+
header.reset(nullptr);
316274
continue;
317275
}
318276

319-
// In case of more than one forward route, we need to copy the message.
320-
// This will eventually use the same mamory if running with the same backend.
321-
if (cachedForwardingChoices.size() > 1) {
322-
copy = true;
277+
// We need to find the forward route only for the first
278+
// part of a split payload. All the others will use the same.
279+
// Therefore, we reset and recompute the forwarding choice:
280+
//
281+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
282+
// which is actually always created and handled together
283+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
284+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
285+
// we will already use the same choice in the for loop below.
286+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
287+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
323288
}
324-
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
326289

327-
if (copy) {
328-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
290+
if (forwardingChoices.empty()) {
291+
// Nothing to forward go to the next messageset
292+
continue;
293+
}
294+
295+
// In case of more than one forward route, we need to copy the message.
296+
// This will eventually use the same memory if running with the same backend.
297+
if (copyByDefault || forwardingChoices.size() > 1) {
298+
for (auto& choice : forwardingChoices) {
329299
auto&& newHeader = header->GetTransport()->CreateMessage();
330300
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
331-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
301+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
332302
newHeader->Copy(*header);
333-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
303+
forwardedParts[choice.value].AddPart(std::move(newHeader));
334304

335305
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336306
auto&& newPayload = header->GetTransport()->CreateMessage();
337307
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
308+
forwardedParts[choice.value].AddPart(std::move(newPayload));
339309
}
340310
}
341311
} else {
342312
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
343-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
344-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
313+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
314+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
345315
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
316+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
347317
}
348318
}
349319
}

0 commit comments

Comments
 (0)