Skip to content

Commit f9d9c0f

Browse files
authored
DPL Analysis: rework of aod-spawner and aod-index-builder (#14854)
1 parent a8e80c1 commit f9d9c0f

25 files changed

+1507
-911
lines changed

Framework/AnalysisSupport/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ if(TARGET JAliEn::JAliEn)
1616
set(EXTRA_TARGETS XRootD::Client JAliEn::JAliEn)
1717
endif()
1818

19+
o2_add_library(FrameworkOnDemandTablesSupport
20+
SOURCES src/OnDemandPlugin.cxx
21+
src/AODReaderHelpers.cxx
22+
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
23+
PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS})
24+
1925
o2_add_library(FrameworkAnalysisSupport
2026
SOURCES src/Plugin.cxx
2127
src/DataInputDirector.cxx
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "AODReaderHelpers.h"
13+
#include "../src/ExpressionJSONHelpers.h"
14+
#include "../src/IndexJSONHelpers.h"
15+
16+
#include "Framework/AnalysisDataModel.h"
17+
#include "Framework/AnalysisHelpers.h"
18+
#include "Framework/DataProcessingHelpers.h"
19+
#include "Framework/AlgorithmSpec.h"
20+
#include "Framework/DataSpecUtils.h"
21+
#include "Framework/ConfigContext.h"
22+
#include "Framework/AnalysisContext.h"
23+
24+
namespace o2::framework::readers
25+
{
26+
namespace
27+
{
28+
struct Buildable {
29+
bool exclusive = false;
30+
std::string binding;
31+
std::vector<std::string> labels;
32+
header::DataOrigin origin;
33+
header::DataDescription description;
34+
header::DataHeader::SubSpecificationType version;
35+
std::vector<o2::soa::IndexRecord> records;
36+
std::shared_ptr<arrow::Schema> outputSchema;
37+
38+
Buildable(InputSpec const& spec)
39+
: binding{spec.binding}
40+
{
41+
auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
42+
origin = origin_;
43+
description = description_;
44+
version = version_;
45+
46+
auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-records") == 0; });
47+
std::stringstream iws(loc->defaultValue.get<std::string>());
48+
records = IndexJSONHelpers::read(iws);
49+
50+
loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-exclusive") == 0; });
51+
exclusive = loc->defaultValue.get<bool>();
52+
53+
for (auto const& r : records) {
54+
labels.emplace_back(r.label);
55+
}
56+
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
57+
std::vector<std::shared_ptr<arrow::Field>> fields;
58+
for (auto& r : recs) {
59+
fields.push_back(r.field());
60+
}
61+
return fields;
62+
}(records))
63+
->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{binding}}));
64+
}
65+
66+
framework::Builder createBuilder() const
67+
{
68+
return {
69+
exclusive,
70+
labels,
71+
records,
72+
outputSchema,
73+
origin,
74+
description,
75+
version,
76+
nullptr};
77+
}
78+
};
79+
80+
} // namespace
81+
82+
AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx)
83+
{
84+
auto& ac = ctx.services().get<AnalysisContext>();
85+
return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs](InitContext& /*ic*/) {
86+
std::vector<Buildable> buildables;
87+
for (auto& i : requested) {
88+
buildables.emplace_back(i);
89+
}
90+
std::vector<Builder> builders;
91+
for (auto& b : buildables) {
92+
builders.push_back(b.createBuilder());
93+
}
94+
return [builders](ProcessingContext& pc) mutable {
95+
auto outputs = pc.outputs();
96+
for (auto& builder : builders) {
97+
outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc));
98+
}
99+
};
100+
}};
101+
}
102+
103+
namespace
104+
{
105+
struct Spawnable {
106+
std::string binding;
107+
std::vector<std::string> labels;
108+
std::vector<expressions::Projector> projectors;
109+
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
110+
std::shared_ptr<arrow::Schema> outputSchema;
111+
std::shared_ptr<arrow::Schema> inputSchema;
112+
113+
header::DataOrigin origin;
114+
header::DataDescription description;
115+
header::DataHeader::SubSpecificationType version;
116+
117+
Spawnable(InputSpec const& spec)
118+
: binding{spec.binding}
119+
{
120+
auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
121+
origin = origin_;
122+
description = description_;
123+
version = version_;
124+
auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; });
125+
std::stringstream iws(loc->defaultValue.get<std::string>());
126+
projectors = ExpressionJSONHelpers::read(iws);
127+
128+
loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; });
129+
iws.clear();
130+
iws.str(loc->defaultValue.get<std::string>());
131+
outputSchema = ArrowJSONHelpers::read(iws);
132+
o2::framework::addLabelToSchema(outputSchema, binding.c_str());
133+
134+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
135+
for (auto& i : spec.metadata) {
136+
if (i.name.starts_with("input-schema:")) {
137+
labels.emplace_back(i.name.substr(13));
138+
iws.clear();
139+
auto json = i.defaultValue.get<std::string>();
140+
iws.str(json);
141+
schemas.emplace_back(ArrowJSONHelpers::read(iws));
142+
}
143+
}
144+
145+
std::vector<std::shared_ptr<arrow::Field>> fields;
146+
for (auto& s : schemas) {
147+
std::copy(s->fields().begin(), s->fields().end(), std::back_inserter(fields));
148+
}
149+
150+
inputSchema = std::make_shared<arrow::Schema>(fields);
151+
expressions = expressions::materializeProjectors(projectors, inputSchema, outputSchema->fields());
152+
}
153+
154+
std::shared_ptr<gandiva::Projector> makeProjector() const
155+
{
156+
std::shared_ptr<gandiva::Projector> p = nullptr;
157+
auto s = gandiva::Projector::Make(
158+
inputSchema,
159+
expressions,
160+
&p);
161+
if (!s.ok()) {
162+
throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
163+
}
164+
return p;
165+
}
166+
167+
framework::Spawner createMaker() const
168+
{
169+
return {
170+
binding,
171+
labels,
172+
expressions,
173+
makeProjector(),
174+
outputSchema,
175+
inputSchema,
176+
origin,
177+
description,
178+
version};
179+
}
180+
};
181+
182+
} // namespace
183+
184+
AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& ctx)
185+
{
186+
auto& ac = ctx.services().get<AnalysisContext>();
187+
return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) {
188+
std::vector<Spawnable> spawnables;
189+
for (auto& i : requested) {
190+
spawnables.emplace_back(i);
191+
}
192+
std::vector<Spawner> spawners;
193+
for (auto& s : spawnables) {
194+
spawners.push_back(s.createMaker());
195+
}
196+
197+
return [spawners](ProcessingContext& pc) mutable {
198+
auto outputs = pc.outputs();
199+
for (auto& spawner : spawners) {
200+
outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc));
201+
}
202+
};
203+
}};
204+
}
205+
206+
} // namespace o2::framework::readers

Framework/Core/include/Framework/AODReaderHelpers.h renamed to Framework/AnalysisSupport/src/AODReaderHelpers.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
namespace o2::framework::readers
1919
{
2020

21-
2221
struct AODReaderHelpers {
2322
static AlgorithmSpec rootFileReaderCallback();
2423
static AlgorithmSpec aodSpawnerCallback(ConfigContext const& ctx);
25-
static AlgorithmSpec indexBuilderCallback(std::vector<InputSpec>& requested);
24+
static AlgorithmSpec indexBuilderCallback(ConfigContext const& ctx);
2625
};
2726

2827
} // namespace o2::framework::readers
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "Framework/Plugins.h"
12+
#include "Framework/AlgorithmSpec.h"
13+
#include "AODReaderHelpers.h"
14+
15+
struct ExtendedTableSpawner : o2::framework::AlgorithmPlugin {
16+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override
17+
{
18+
return o2::framework::readers::AODReaderHelpers::aodSpawnerCallback(config);
19+
}
20+
};
21+
22+
struct IndexTableBuilder : o2::framework::AlgorithmPlugin {
23+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override
24+
{
25+
return o2::framework::readers::AODReaderHelpers::indexBuilderCallback(config);
26+
}
27+
};
28+
29+
DEFINE_DPL_PLUGINS_BEGIN
30+
DEFINE_DPL_PLUGIN_INSTANCE(ExtendedTableSpawner, CustomAlgorithm);
31+
DEFINE_DPL_PLUGIN_INSTANCE(IndexTableBuilder, CustomAlgorithm);
32+
DEFINE_DPL_PLUGINS_END

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
# or submit itself to any jurisdiction.
1111

1212
o2_add_library(Framework
13-
SOURCES src/AODReaderHelpers.cxx
14-
src/AnalysisHelpers.cxx
13+
SOURCES src/AnalysisHelpers.cxx
1514
src/AlgorithmSpec.cxx
1615
src/ArrowSupport.cxx
1716
src/ArrowTableSlicingCache.cxx
@@ -143,6 +142,7 @@ o2_add_library(Framework
143142
src/Variant.cxx
144143
src/VariantJSONHelpers.cxx
145144
src/ExpressionJSONHelpers.cxx
145+
src/IndexJSONHelpers.cxx
146146
src/VariantPropertyTreeHelpers.cxx
147147
src/WorkflowCustomizationHelpers.cxx
148148
src/WorkflowHelpers.cxx

0 commit comments

Comments
 (0)