Skip to content

Commit 2ca4db7

Browse files
authored
DPL: make sure Lifetime::Sporadic is kept (#14434)
1 parent 2008bc4 commit 2ca4db7

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

Framework/Core/include/Framework/FairMQDeviceProxy.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class FairMQDeviceProxy
4040
void bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
4141
std::vector<ForwardRoute> const& forwards, fair::mq::Device& device);
4242

43+
/// Retrieve the transport associated to a given route.
44+
[[nodiscard]] OutputRoute const& getOutputRoute(RouteIndex routeIndex) const { return mOutputs.at(routeIndex.value); }
4345
/// Retrieve the transport associated to a given route.
4446
[[nodiscard]] fair::mq::TransportFactory* getOutputTransport(RouteIndex routeIndex) const;
4547
/// Retrieve the transport associated to a given route.

Framework/Core/src/DataAllocator.cxx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/CompilerBuiltins.h"
12+
#include "Framework/Lifetime.h"
1213
#include "Framework/TableBuilder.h"
1314
#include "Framework/TableTreeHelpers.h"
1415
#include "Framework/DataAllocator.h"
@@ -121,8 +122,12 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
121122
dh.runNumber = timingInfo.runNumber;
122123

123124
DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
124-
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
125125
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
126+
auto lifetime = proxy.getOutputRoute(routeIndex).matcher.lifetime;
127+
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
128+
// Messages associated to sporatic output we always keep, since they are most likely histograms / condition
129+
// objects which need to be kept at the end of stream.
130+
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= (lifetime == Lifetime::Sporadic) ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
126131
auto* transport = proxy.getOutputTransport(routeIndex);
127132

128133
auto channelAlloc = o2::pmr::getTransportAllocator(transport);

0 commit comments

Comments
 (0)