Skip to content

Commit 08e4115

Browse files
authored
DPL: get rid of simplified CompletionPolicy (#12450)
The full blown version has been there for a while now, and in any case it's required if one wants to access the oldest possible timeframe.
1 parent 7fe5ab6 commit 08e4115

File tree

6 files changed

+23
-29
lines changed

6 files changed

+23
-29
lines changed

Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class TPCSectorCompletionPolicy
9191
return std::regex_match(device.name.begin(), device.name.end(), std::regex(expression.c_str()));
9292
};
9393

94-
auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs) -> framework::CompletionPolicy::CompletionOp {
94+
auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs, auto const&, auto&) -> framework::CompletionPolicy::CompletionOp {
9595
unsigned long tpcSectorMask = pTpcSectorMask ? *pTpcSectorMask : 0xFFFFFFFFF;
9696
std::bitset<NSectors> validSectors = 0;
9797
bool haveMatchedInput = false;

Framework/Core/include/Framework/CompletionPolicy.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,26 +64,21 @@ struct CompletionPolicy {
6464

6565
using Matcher = std::function<bool(DeviceSpec const& device)>;
6666
using InputSetElement = DataRef;
67-
using Callback = std::function<CompletionOp(InputSpan const&)>;
6867
using CallbackFull = std::function<CompletionOp(InputSpan const&, std::vector<InputSpec> const&, ServiceRegistryRef&)>;
6968
using CallbackConfigureRelayer = std::function<void(DataRelayer&)>;
7069

7170
/// Constructor
7271
CompletionPolicy()
73-
: name{}, matcher{}, callback{} {}
72+
: name{}, matcher{}, callbackFull{} {}
7473
/// Constructor for emplace_back
75-
CompletionPolicy(std::string _name, Matcher _matcher, Callback _callback, bool _balanceChannels = true)
76-
: name(std::move(_name)), matcher(std::move(_matcher)), callback(std::move(_callback)), callbackFull{nullptr}, balanceChannels{_balanceChannels} {}
7774
CompletionPolicy(std::string _name, Matcher _matcher, CallbackFull _callback, bool _balanceChannels = true)
78-
: name(std::move(_name)), matcher(std::move(_matcher)), callback(nullptr), callbackFull{std::move(_callback)}, balanceChannels{_balanceChannels} {}
75+
: name(std::move(_name)), matcher(std::move(_matcher)), callbackFull{std::move(_callback)}, balanceChannels{_balanceChannels} {}
7976

8077
/// Name of the policy itself.
8178
std::string name = "";
8279
/// Callback to be used to understand if the policy should apply
8380
/// to the given device.
8481
Matcher matcher = nullptr;
85-
/// Actual policy which decides what to do with a partial InputRecord.
86-
Callback callback = nullptr;
8782
/// Actual policy which decides what to do with a partial InputRecord, extended version
8883
CallbackFull callbackFull = nullptr;
8984
/// A callback which allows you to configure the behavior of the data relayer associated

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
#include <cassert>
2222
#include <regex>
2323

24-
#pragma GCC diagnostic push
25-
#pragma GCC diagnostic ignored "-Wpedantic"
26-
2724
namespace o2::framework
2825
{
2926

@@ -35,7 +32,7 @@ CompletionPolicy CompletionPolicyHelpers::defineByNameOrigin(std::string const&
3532

3633
auto originReceived = std::make_shared<std::vector<uint64_t>>();
3734

38-
auto callback = [originReceived, origin, op](InputSpan const& inputRefs) -> CompletionPolicy::CompletionOp {
35+
auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
3936
// update list of the start times of inputs with origin @origin
4037
for (auto& ref : inputRefs) {
4138
if (ref.header != nullptr) {
@@ -77,7 +74,7 @@ CompletionPolicy CompletionPolicyHelpers::defineByName(std::string const& name,
7774
auto matcher = [name](DeviceSpec const& device) -> bool {
7875
return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
7976
};
80-
auto callback = [op](InputSpan const&) -> CompletionPolicy::CompletionOp {
77+
auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
8178
return op;
8279
};
8380
switch (op) {
@@ -108,7 +105,8 @@ CompletionPolicy CompletionPolicyHelpers::defineByName(std::string const& name,
108105

109106
CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, CompletionPolicy::Matcher matcher)
110107
{
111-
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
108+
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
109+
assert(inputs.size() == specs.size());
112110
for (auto& input : inputs) {
113111
if (input.header == nullptr) {
114112
return CompletionPolicy::CompletionOp::Wait;
@@ -123,7 +121,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAllOrdered(const char* name
123121
{
124122
auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
125123
auto& decongestionService = ref.get<DecongestionService>();
126-
decongestionService.orderedCompletionPolicyActive = 1;
124+
decongestionService.orderedCompletionPolicyActive = true;
127125
for (auto& input : inputs) {
128126
if (input.header == nullptr) {
129127
return CompletionPolicy::CompletionOp::Wait;
@@ -199,7 +197,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeExistingWhenAny(const char* nam
199197

200198
CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
201199
{
202-
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
200+
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
203201
for (auto& input : inputs) {
204202
if (input.header != nullptr) {
205203
return CompletionPolicy::CompletionOp::Consume;
@@ -289,7 +287,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyWithAllConditions(std::s
289287

290288
CompletionPolicy CompletionPolicyHelpers::processWhenAny(const char* name, CompletionPolicy::Matcher matcher)
291289
{
292-
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
290+
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
293291
size_t present = 0;
294292
for (auto& input : inputs) {
295293
if (input.header != nullptr) {
@@ -307,4 +305,3 @@ CompletionPolicy CompletionPolicyHelpers::processWhenAny(const char* name, Compl
307305
}
308306

309307
} // namespace o2::framework
310-
#pragma GCC diagnostic pop

Framework/Core/src/DataRelayer.cxx

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,9 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
675675
notDirty++;
676676
continue;
677677
}
678+
if (!mCompletionPolicy.callbackFull) {
679+
throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
680+
}
678681
auto partial = getPartialRecord(li);
679682
// TODO: get the data ref from message model
680683
auto getter = [&partial](size_t idx, size_t part) {
@@ -692,14 +695,8 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
692695
return partial[idx].size();
693696
};
694697
InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
695-
CompletionPolicy::CompletionOp action;
696-
if (mCompletionPolicy.callback) {
697-
action = mCompletionPolicy.callback(span);
698-
} else if (mCompletionPolicy.callbackFull) {
699-
action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
700-
} else {
701-
throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
702-
}
698+
CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
699+
703700
auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
704701
auto timeslice = std::get_if<uint64_t>(&variables.get(0));
705702
switch (action) {

Framework/Core/test/test_CompletionPolicy.cxx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <catch_amalgamated.hpp>
1313
#include "Framework/CompletionPolicy.h"
1414
#include "Framework/CompletionPolicyHelpers.h"
15+
#include "Framework/ServiceRegistry.h"
1516
#include "Headers/DataHeader.h"
1617
#include "Headers/NameHeader.h"
1718
#include "Framework/CompletionPolicy.h"
@@ -39,7 +40,9 @@ TEST_CASE("TestCompletionPolicy_callback")
3940
return true;
4041
};
4142

42-
auto callback = [&stack](InputSpan const& inputRefs) {
43+
ServiceRegistry services;
44+
45+
auto callback = [&stack](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) {
4346
for (auto const& ref : inputRefs) {
4447
auto const* header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
4548
REQUIRE(header == reinterpret_cast<o2::header::DataHeader*>(stack.data()));
@@ -53,7 +56,9 @@ TEST_CASE("TestCompletionPolicy_callback")
5356
{"test", matcher, callback}};
5457
CompletionPolicy::InputSetElement ref{nullptr, reinterpret_cast<const char*>(stack.data()), nullptr};
5558
InputSpan const& inputs{[&ref](size_t) { return ref; }, 1};
59+
std::vector<InputSpec> specs;
60+
ServiceRegistryRef servicesRef{services};
5661
for (auto& policy : policies) {
57-
policy.callback(inputs);
62+
policy.callbackFull(inputs, specs, servicesRef);
5863
}
5964
}

Framework/Core/test/test_StaggeringWorkflow.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void customize(std::vector<o2::framework::CompletionPolicy>& policies)
5353
// search for spec names starting with "processor"
5454
return spec.name.find("processor") == 0;
5555
},
56-
[](auto const&) { return o2::framework::CompletionPolicy::CompletionOp::Consume; }});
56+
[](auto const&, auto const&, auto &) { return o2::framework::CompletionPolicy::CompletionOp::Consume; }});
5757
}
5858

5959
#include "Framework/runDataProcessing.h"

0 commit comments

Comments
 (0)