|
16 | 16 | #include "MemoryResources/MemoryResources.h" |
17 | 17 | #include "Framework/FairMQDeviceProxy.h" |
18 | 18 | #include "Headers/DataHeader.h" |
| 19 | +#include "Headers/DataHeaderHelpers.h" |
19 | 20 | #include "Headers/Stack.h" |
20 | 21 | #include "Framework/Logger.h" |
21 | 22 | #include "Framework/SendingPolicy.h" |
|
31 | 32 | #include "Framework/ControlService.h" |
32 | 33 | #include "Framework/DataProcessingContext.h" |
33 | 34 | #include "Framework/DeviceStateEnums.h" |
| 35 | +#include "Headers/DataHeader.h" |
| 36 | +#include "Framework/DataProcessingHeader.h" |
34 | 37 |
|
35 | 38 | #include <fairmq/Device.h> |
36 | 39 | #include <fairmq/Channel.h> |
|
41 | 44 | O2_DECLARE_DYNAMIC_LOG(device); |
42 | 45 | // Stream which keeps track of the calibration lifetime logic |
43 | 46 | O2_DECLARE_DYNAMIC_LOG(calibration); |
| 47 | +O2_DECLARE_DYNAMIC_LOG(forwarding); |
44 | 48 |
|
45 | 49 | namespace o2::framework |
46 | 50 | { |
@@ -217,4 +221,134 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi |
217 | 221 | } |
218 | 222 | } |
219 | 223 |
|
| 224 | +static auto toBeForwardedHeader = [](void* header) -> bool { |
| 225 | + // If is now possible that the record is not complete when |
| 226 | + // we forward it, because of a custom completion policy. |
| 227 | + // this means that we need to skip the empty entries in the |
| 228 | + // record for being forwarded. |
| 229 | + if (header == nullptr) { |
| 230 | + return false; |
| 231 | + } |
| 232 | + auto dh = o2::header::get<header::DataHeader*>(header); |
| 233 | + if (!dh) { |
| 234 | + return false; |
| 235 | + } |
| 236 | + bool retval = !o2::header::get<SourceInfoHeader*>(header) && |
| 237 | + !o2::header::get<DomainInfoHeader*>(header) && |
| 238 | + o2::header::get<DataProcessingHeader*>(header); |
| 239 | + // DataHeader is there. Complain if we have unexpected headers present / missing |
| 240 | + if (!retval) { |
| 241 | + LOGP(error, "Dropping data because of malformed header structure"); |
| 242 | + } |
| 243 | + return retval; |
| 244 | +}; |
| 245 | + |
| 246 | +static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices, |
| 247 | + FairMQDeviceProxy& proxy, |
| 248 | + std::unique_ptr<fair::mq::Message>& header, |
| 249 | + std::unique_ptr<fair::mq::Message>& payload, |
| 250 | + size_t total, |
| 251 | + bool consume) { |
| 252 | + if (header.get() == nullptr) { |
| 253 | + // Missing an header is not an error anymore. |
| 254 | + // it simply means that we did not receive the |
| 255 | + // given input, but we were asked to |
| 256 | + // consume existing, so we skip it. |
| 257 | + return false; |
| 258 | + } |
| 259 | + if (payload.get() == nullptr && consume == true) { |
| 260 | + // If the payload is not there, it means we already |
| 261 | + // processed it with ConsumeExisiting. Therefore we |
| 262 | + // need to do something only if this is the last consume. |
| 263 | + header.reset(nullptr); |
| 264 | + return false; |
| 265 | + } |
| 266 | + |
| 267 | + auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData()); |
| 268 | + if (fdph == nullptr) { |
| 269 | + LOG(error) << "Data is missing DataProcessingHeader"; |
| 270 | + return false; |
| 271 | + } |
| 272 | + auto fdh = o2::header::get<header::DataHeader*>(header->GetData()); |
| 273 | + if (fdh == nullptr) { |
| 274 | + LOG(error) << "Data is missing DataHeader"; |
| 275 | + return false; |
| 276 | + } |
| 277 | + |
| 278 | + // We need to find the forward route only for the first |
| 279 | + // part of a split payload. All the others will use the same. |
| 280 | + // but always check if we have a sequence of multiple payloads |
| 281 | + if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) { |
| 282 | + proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime); |
| 283 | + } |
| 284 | + return cachedForwardingChoices.empty() == false; |
| 285 | +}; |
| 286 | + |
| 287 | +std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs, |
| 288 | + TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume) |
| 289 | +{ |
| 290 | + // we collect all messages per forward in a map and send them together |
| 291 | + std::vector<fair::mq::Parts> forwardedParts; |
| 292 | + forwardedParts.resize(proxy.getNumForwards()); |
| 293 | + std::vector<ChannelIndex> cachedForwardingChoices{}; |
| 294 | + O2_SIGNPOST_ID_GENERATE(sid, forwarding); |
| 295 | + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", |
| 296 | + slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); |
| 297 | + |
| 298 | + for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { |
| 299 | + auto& messageSet = currentSetOfInputs[ii]; |
| 300 | + // In case the messageSet is empty, there is nothing to be done. |
| 301 | + if (messageSet.size() == 0) { |
| 302 | + continue; |
| 303 | + } |
| 304 | + if (!toBeForwardedHeader(messageSet.header(0)->GetData())) { |
| 305 | + continue; |
| 306 | + } |
| 307 | + cachedForwardingChoices.clear(); |
| 308 | + |
| 309 | + for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) { |
| 310 | + auto& messageSet = currentSetOfInputs[ii]; |
| 311 | + auto& header = messageSet.header(pi); |
| 312 | + auto& payload = messageSet.payload(pi); |
| 313 | + auto total = messageSet.getNumberOfPayloads(pi); |
| 314 | + |
| 315 | + if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) { |
| 316 | + continue; |
| 317 | + } |
| 318 | + |
| 319 | + // In case of more than one forward route, we need to copy the message. |
| 320 | + // This will eventually use the same mamory if running with the same backend. |
| 321 | + if (cachedForwardingChoices.size() > 1) { |
| 322 | + copy = true; |
| 323 | + } |
| 324 | + auto* dh = o2::header::get<header::DataHeader*>(header->GetData()); |
| 325 | + auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData()); |
| 326 | + |
| 327 | + if (copy) { |
| 328 | + for (auto& cachedForwardingChoice : cachedForwardingChoices) { |
| 329 | + auto&& newHeader = header->GetTransport()->CreateMessage(); |
| 330 | + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.", |
| 331 | + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value); |
| 332 | + newHeader->Copy(*header); |
| 333 | + forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader)); |
| 334 | + |
| 335 | + for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { |
| 336 | + auto&& newPayload = header->GetTransport()->CreateMessage(); |
| 337 | + newPayload->Copy(*messageSet.payload(pi, payloadIndex)); |
| 338 | + forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload)); |
| 339 | + } |
| 340 | + } |
| 341 | + } else { |
| 342 | + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.", |
| 343 | + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value); |
| 344 | + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); |
| 345 | + for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { |
| 346 | + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); |
| 347 | + } |
| 348 | + } |
| 349 | + } |
| 350 | + } |
| 351 | + return forwardedParts; |
| 352 | +}; |
| 353 | + |
220 | 354 | } // namespace o2::framework |
0 commit comments