Skip to content

Commit 279c652

Browse files
authored
DPL Data Model: introduce bit to keep track of data created after EoS (#13315)
1 parent 53d086a commit 279c652

File tree

5 files changed

+16
-3
lines changed

5 files changed

+16
-3
lines changed

DataFormats/Headers/include/Headers/DataHeader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ struct BaseHeader {
373373
uint32_t flags;
374374
struct {
375375
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
376-
flagsUnused : 31; // currently unused
376+
flagsReserved : 15, // reserved for future use
377+
flagsDerivedHeader : 16; // reserved for usage by the derived header
377378
};
378379
};
379380

Framework/Core/include/Framework/DataProcessingHeader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ namespace o2::framework
4242
/// @ingroup aliceo2_dataformats_dataheader
4343
struct DataProcessingHeader : public header::BaseHeader {
4444
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET = 0x8000000000000000;
45+
// The following flags are used to indicate the behavior of the data processing
46+
static constexpr int32_t KEEP_AT_EOS_FLAG = 1;
47+
4548
/// We return some number of milliseconds, offsetting int by 0x8000000000000000
4649
/// to make sure we can understand when the dummy constructor of DataProcessingHeader was
4750
/// used without overriding it with an actual real time from epoch.

Framework/Core/include/Framework/TimingInfo.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace o2::framework
2323

2424
struct TimingInfo {
2525
constexpr static ServiceKind service_kind = ServiceKind::Stream;
26-
size_t timeslice; /// the timeslice associated to current processing
26+
size_t timeslice; /// the timeslice associated to current processing
2727
uint32_t firstTForbit = -1; /// the orbit the TF begins
2828
uint32_t tfCounter = -1; // the counter associated to a TF
2929
uint32_t runNumber = -1;
@@ -36,9 +36,11 @@ struct TimingInfo {
3636
/// from a new run, as being processed by the current stream.
3737
/// FIXME: for now this is the same as the above.
3838
bool streamRunNumberChanged = false;
39+
/// Wether this kind of data should be flushed during end of stream.
40+
bool keepAtEndOfStream = false;
3941

4042
static bool timesliceIsTimer(size_t timeslice) { return timeslice > 1652945069870351; }
41-
bool isTimer() const { return timesliceIsTimer(timeslice); };
43+
[[nodiscard]] bool isTimer() const { return timesliceIsTimer(timeslice); };
4244
};
4345

4446
} // namespace o2::framework

Framework/Core/src/DataAllocator.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
121121
dh.runNumber = timingInfo.runNumber;
122122

123123
DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
124+
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
124125
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
125126
auto* transport = proxy.getOutputTransport(routeIndex);
126127

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1710,6 +1710,12 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17101710
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
17111711
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
17121712
}
1713+
1714+
auto& timingInfo = ref.get<TimingInfo>();
1715+
// We should keep the data generated at end of stream only for those
1716+
// which are not sources.
1717+
timingInfo.keepAtEndOfStream = shouldProcess;
1718+
17131719
EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
17141720

17151721
context.preEOSCallbacks(eosContext);

0 commit comments

Comments
 (0)