Skip to content

Commit 70b1040

Browse files
authored
DPL: modernize workflow construction code using ranges (#14907)
1 parent 528a5a5 commit 70b1040

File tree

5 files changed

+114
-185
lines changed

5 files changed

+114
-185
lines changed

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@ struct AnalysisSupportHelpers {
3939
std::vector<InputSpec> const& requestedSpecials,
4040
std::vector<InputSpec>& requestedAODs,
4141
DataProcessorSpec& publisher);
42-
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector<OutputSpec> const& providedSpecials,
43-
std::vector<InputSpec> const& requestedSpecials,
44-
std::vector<InputSpec>& requestedAODs,
45-
std::vector<InputSpec>& requestedDYNs,
46-
DataProcessorSpec& publisher);
4742
static void addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
4843
std::vector<InputSpec>& requestedAODs,
4944
std::vector<InputSpec>& requestedDYNs,

Framework/Core/include/Framework/DataSpecViews.h

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,32 @@ static auto filter_not_matching(auto const& provided)
3131
return std::views::filter([&provided](auto const& input) { return std::none_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); });
3232
}
3333

34+
static auto filter_matching(auto const& provided)
35+
{
36+
return std::views::filter([&provided](auto const& input) { return std::any_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); });
37+
}
38+
39+
static auto filter_string_params_with(std::string match)
40+
{
41+
return std::views::filter([match](auto const& param) {
42+
return (param.type == VariantType::String) && (param.name.find(match) != std::string::npos);
43+
});
44+
}
45+
46+
static auto input_to_output_specs()
47+
{
48+
return std::views::transform([](auto const& input) {
49+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
50+
return OutputSpec{concrete.origin, concrete.description, concrete.subSpec, input.lifetime, input.metadata};
51+
});
52+
}
53+
54+
static auto params_to_input_specs()
55+
{
56+
return std::views::transform([](auto const& param) {
57+
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
58+
});
59+
}
3460
} // namespace o2::framework::views
3561
//
3662
namespace o2::framework::sinks
@@ -54,14 +80,29 @@ struct update_input_list {
5480
template <std::ranges::input_range R>
5581
friend Container& operator|(R&& r, update_input_list self)
5682
{
57-
for (auto& item : r) {
83+
for (auto const& item : r) {
5884
auto copy = item;
5985
DataSpecUtils::updateInputList(self.c, std::move(copy));
6086
}
6187
return self.c;
6288
}
6389
};
6490

91+
template <class Container>
92+
struct update_output_list {
93+
Container& c;
94+
// ends the pipeline, returns the container
95+
template <std::ranges::input_range R>
96+
friend Container& operator|(R&& r, update_output_list self)
97+
{
98+
for (auto const& item : r) {
99+
auto copy = item;
100+
DataSpecUtils::updateOutputList(self.c, std::move(copy));
101+
}
102+
return self.c;
103+
}
104+
};
105+
65106
} // namespace o2::framework::sinks
66107

67108
#endif // O2_FRAMEWORK_DATASPECVIEWS_H_

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 37 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111

1212
#include "Framework/AnalysisSupportHelpers.h"
1313
#include "Framework/DataOutputDirector.h"
14-
#include "Framework/OutputObjHeader.h"
15-
#include "Framework/ControlService.h"
16-
#include "Framework/EndOfStreamContext.h"
17-
#include "Framework/DeviceSpec.h"
14+
#include "Framework/DataSpecViews.h"
1815
#include "Framework/PluginManager.h"
1916
#include "Framework/ConfigContext.h"
2017
#include "WorkflowHelpers.h"
@@ -129,109 +126,59 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector<OutputSpec> c
129126
std::vector<InputSpec> const& requestedInputs,
130127
DataProcessorSpec& publisher)
131128
{
132-
auto matchingOutputFor = [](InputSpec const& requested) {
133-
return [&requested](OutputSpec const& provided) {
134-
return DataSpecUtils::match(requested, provided);
135-
};
136-
};
137-
for (InputSpec const& requested : requestedInputs) {
138-
auto provided = std::find_if(providedOutputs.begin(),
139-
providedOutputs.end(),
140-
matchingOutputFor(requested));
141-
142-
if (provided != providedOutputs.end()) {
143-
continue;
144-
}
145-
146-
auto inList = std::find_if(publisher.outputs.begin(),
147-
publisher.outputs.end(),
148-
matchingOutputFor(requested));
149-
if (inList != publisher.outputs.end()) {
150-
continue;
151-
}
152-
153-
auto concrete = DataSpecUtils::asConcreteDataMatcher(requested);
154-
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata);
155-
}
129+
requestedInputs |
130+
views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided
131+
views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered
132+
views::input_to_output_specs() |
133+
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
156134
}
157135

158136
void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec> const& providedSpecials,
159137
std::vector<InputSpec> const& requestedSpecials,
160138
std::vector<InputSpec>& requestedAODs,
161139
DataProcessorSpec& publisher)
162140
{
163-
for (auto& input : requestedSpecials) {
164-
if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) {
165-
return DataSpecUtils::match(input, x);
166-
})) {
167-
continue;
168-
}
169-
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
170-
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
171-
for (auto& i : input.metadata) {
172-
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
173-
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
174-
auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec);
175-
if (j == publisher.inputs.end()) {
176-
publisher.inputs.push_back(spec);
177-
}
178-
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
179-
}
180-
}
141+
requestedSpecials |
142+
views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided
143+
views::input_to_output_specs() |
144+
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
145+
146+
std::vector<InputSpec> additionalInputs;
147+
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
148+
input.metadata |
149+
views::filter_string_params_with("input:") |
150+
views::params_to_input_specs() |
151+
sinks::update_input_list{additionalInputs}; // store into a temporary
181152
}
153+
additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs
154+
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
182155
}
183156

184157
void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
185158
std::vector<InputSpec>& requestedAODs,
186159
std::vector<InputSpec>& requestedDYNs,
187160
DataProcessorSpec& publisher)
188161
{
189-
for (auto& input : requestedSpecials) {
190-
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
191-
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
192-
for (auto& i : input.metadata) {
193-
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
194-
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
195-
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
196-
if (j == publisher.inputs.end()) {
197-
publisher.inputs.push_back(spec);
198-
}
199-
if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
200-
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
201-
} else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
202-
DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
203-
}
204-
}
205-
}
162+
requestedSpecials |
163+
views::input_to_output_specs() |
164+
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
165+
166+
std::vector<InputSpec> additionalInputs;
167+
for (auto const& input : requestedSpecials) {
168+
input.metadata |
169+
views::filter_string_params_with("input:") |
170+
views::params_to_input_specs() |
171+
sinks::update_input_list{additionalInputs}; // store into a temporary
206172
}
207-
}
208173

209-
void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
210-
std::vector<OutputSpec> const& providedSpecials,
211-
std::vector<InputSpec> const& requestedSpecials,
212-
std::vector<InputSpec>& requestedAODs,
213-
std::vector<InputSpec>& requestedDYNs,
214-
DataProcessorSpec& publisher)
215-
{
216-
for (auto& input : requestedSpecials) {
217-
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
218-
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
219-
// FIXME: good enough for now...
220-
for (auto& i : input.metadata) {
221-
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
222-
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
223-
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
224-
if (j == publisher.inputs.end()) {
225-
publisher.inputs.push_back(spec);
226-
}
227-
if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
228-
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
229-
} else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
230-
DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
231-
}
232-
}
233-
}
234-
}
174+
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
175+
// FIXME: until we have a single list of pairs
176+
additionalInputs |
177+
views::partial_match_filter(AODOrigins) |
178+
sinks::update_input_list{requestedAODs}; // update requestedAODs
179+
additionalInputs |
180+
views::partial_match_filter(header::DataOrigin{"DYN"}) |
181+
sinks::update_input_list{requestedDYNs}; // update requestedDYNs
235182
}
236183

237184
// =============================================================================

Framework/Core/src/ArrowSupport.cxx

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -595,23 +595,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
595595
ac.providedTIMs.clear();
596596
ac.requestedTIMs.clear();
597597

598-
599598
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
600599
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
601600

602601
if (builder != workflow.end()) {
603602
// collect currently requested IDXs
604603
ac.requestedIDXs.clear();
605-
for (auto& d : workflow) {
606-
if (d.name == builder->name) {
607-
continue;
608-
}
609-
for (auto& i : d.inputs) {
610-
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) {
611-
auto copy = i;
612-
DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
613-
}
614-
}
604+
for (auto& d : workflow | views::exclude_by_name(builder->name)) {
605+
d.inputs |
606+
views::partial_match_filter(header::DataOrigin{"IDX"}) |
607+
sinks::update_input_list{ac.requestedIDXs};
615608
}
616609
// recreate inputs and outputs
617610
builder->inputs.clear();
@@ -624,37 +617,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
624617

625618
if (spawner != workflow.end()) {
626619
// collect currently requested DYNs
627-
for (auto& d : workflow) {
628-
if (d.name == spawner->name) {
629-
continue;
630-
}
631-
for (auto const& i : d.inputs) {
632-
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) {
633-
auto copy = i;
634-
DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
635-
}
636-
}
637-
for (auto const& o : d.outputs) {
638-
if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) {
639-
ac.providedDYNs.emplace_back(o);
640-
}
641-
}
620+
for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
621+
d.inputs |
622+
views::partial_match_filter(header::DataOrigin{"DYN"}) |
623+
sinks::update_input_list{ac.requestedDYNs};
624+
d.outputs |
625+
views::partial_match_filter(header::DataOrigin{"DYN"}) |
626+
sinks::append_to{ac.providedDYNs};
642627
}
643628
std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
644629
std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
645630
ac.spawnerInputs.clear();
646-
for (auto& input : ac.requestedDYNs) {
647-
if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
648-
ac.spawnerInputs.emplace_back(input);
649-
}
650-
}
631+
ac.requestedDYNs |
632+
views::filter_not_matching(ac.providedDYNs) |
633+
sinks::append_to{ac.spawnerInputs};
651634
// recreate inputs and outputs
652635
spawner->outputs.clear();
653636
spawner->inputs.clear();
637+
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
654638
// replace AlgorithmSpec
655639
// FIXME: it should be made more generic, so it does not need replacement...
656640
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
657-
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
658641
}
659642

660643
if (analysisCCDB != workflow.end()) {
@@ -675,7 +658,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
675658
// FIXME: it should be made more generic, so it does not need replacement...
676659
// FIXME how can I make the lookup depend on DYN tables as well??
677660
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
678-
AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
661+
AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
679662
}
680663

681664
if (writer != workflow.end()) {
@@ -686,12 +669,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
686669
// If reader and/or builder were adjusted, remove unneeded outputs
687670
// update currently requested AODs
688671
for (auto& d : workflow) {
689-
for (auto const& i : d.inputs) {
690-
if (DataSpecUtils::partialMatch(i, AODOrigins)) {
691-
auto copy = i;
692-
DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
693-
}
694-
}
672+
d.inputs |
673+
views::partial_match_filter(AODOrigins) |
674+
sinks::update_input_list{ac.requestedAODs};
695675
}
696676

697677
// remove unmatched outputs
@@ -705,8 +685,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
705685
}
706686
}
707687

708-
709-
710688
// replace writer as some outputs may have become dangling and some are now consumed
711689
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
712690

0 commit comments

Comments
 (0)