Skip to content

Commit f10bf6e

Browse files
authored
DPL: oldest possible timeframe triggered CompletionPolicy (#15046)
This will trigger the processing whenever a given slot will not receive data anymore in virtue of its timeslice being past the oldest possible timeframe.
1 parent bf8a402 commit f10bf6e

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

Framework/Core/include/Framework/CompletionPolicyHelpers.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
22
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
// All rights not expressly granted are reserved.
44
//
@@ -54,6 +54,12 @@ struct CompletionPolicyHelpers {
5454
}
5555
static CompletionPolicy consumeWhenAny(std::string matchName);
5656

57+
// Consume all the data captured until the oldest possible timeframe
58+
// in input indicates that nothing else can be added to this timeslice.
59+
// Useful in case of wildcards which multiplex multiple subspecs on the
60+
// same input.
61+
static CompletionPolicy consumeWhenPastOldestPossibleTimeframe(const char* name, CompletionPolicy::Matcher matcher);
62+
5763
/// When any of the parts of the record have been received, consume them.
5864
static CompletionPolicy consumeWhenAnyWithAllConditions(const char* name, CompletionPolicy::Matcher matcher);
5965
/// Default matcher applies for all devices

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
22
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
// All rights not expressly granted are reserved.
44
//
@@ -11,6 +11,7 @@
1111

1212
#include "Framework/CompletionPolicyHelpers.h"
1313
#include "Framework/CompletionPolicy.h"
14+
#include "Framework/DataProcessingHeader.h"
1415
#include "Framework/InputSpan.h"
1516
#include "Framework/DeviceSpec.h"
1617
#include "Framework/CompilerBuiltins.h"
@@ -263,6 +264,32 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* na
263264
return CompletionPolicy{name, matcher, callback, false};
264265
}
265266

267+
CompletionPolicy CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe(const char* name, CompletionPolicy::Matcher matcher)
268+
{
269+
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
270+
size_t currentTimeslice = -1;
271+
for (auto& input : inputs) {
272+
if (input.header == nullptr) {
273+
continue;
274+
}
275+
o2::framework::DataProcessingHeader const* dph = o2::header::get<o2::framework::DataProcessingHeader*>(input.header);
276+
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
277+
currentTimeslice = dph->startTime;
278+
break;
279+
}
280+
}
281+
282+
auto& timesliceIndex = ref.get<TimesliceIndex>();
283+
auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
284+
285+
if (currentTimeslice >= oldestPossibleTimeslice) {
286+
return CompletionPolicy::CompletionOp::Retry;
287+
}
288+
return CompletionPolicy::CompletionOp::Consume;
289+
};
290+
return CompletionPolicy{name, matcher, callback, false};
291+
}
292+
266293
CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
267294
{
268295
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {

0 commit comments

Comments
 (0)