Skip to content

Commit 8e4ed02

Browse files
aalkinalibuild
andauthored
DPL Analysis: add configurable expression columns (#14141)
Co-authored-by: ALICE Action Bot <alibuild@cern.ch>
1 parent 56dc84e commit 8e4ed02

File tree

10 files changed

+310
-99
lines changed

10 files changed

+310
-99
lines changed

Framework/Core/include/Framework/ASoA.h

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,9 @@ concept has_metadata = is_metadata_trait<T> && not_void<typename T::metadata>;
425425
template <typename T>
426426
concept has_extension = is_metadata<T> && not_void<typename T::extension_table_t>;
427427

428+
template <typename T>
429+
concept has_configurable_extension = has_extension<T> && requires(T t) { typename T::configurable_t; requires std::same_as<std::true_type, typename T::configurable_t>; };
430+
428431
template <typename T>
429432
concept is_spawnable_column = std::same_as<typename T::spawnable_t, std::true_type>;
430433

@@ -2355,7 +2358,7 @@ O2HASH("TEST/0");
23552358
DECLARE_SOA_BITMAP_COLUMN_FULL(_Name_, _Getter_, _Size_, "f" #_Name_)
23562359

23572360
/// An 'expression' column. i.e. a column that can be calculated from other
2358-
/// columns with gandiva based on supplied C++ expression.
2361+
/// columns with gandiva based on static C++ expression.
23592362
#define DECLARE_SOA_EXPRESSION_COLUMN_FULL(_Name_, _Getter_, _Type_, _Label_, _Expression_) \
23602363
struct _Name_ : o2::soa::Column<_Type_, _Name_> { \
23612364
static constexpr const char* mLabel = _Label_; \
@@ -2393,6 +2396,38 @@ O2HASH("TEST/0");
23932396
#define DECLARE_SOA_EXPRESSION_COLUMN(_Name_, _Getter_, _Type_, _Expression_) \
23942397
DECLARE_SOA_EXPRESSION_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_, _Expression_);
23952398

2399+
/// A configurable 'expression' column. i.e. a column that can be calculated from other
2400+
/// columns with gandiva based on dynamically supplied C++ expression or a string definition.
2401+
#define DECLARE_SOA_CONFIGURABLE_EXPRESSION_COLUMN(_Name_, _Getter_, _Type_, _Label_) \
2402+
struct _Name_ : o2::soa::Column<_Type_, _Name_> { \
2403+
static constexpr const char* mLabel = _Label_; \
2404+
static constexpr const int32_t mHash = _Label_ ""_h; \
2405+
using base = o2::soa::Column<_Type_, _Name_>; \
2406+
using type = _Type_; \
2407+
using column_t = _Name_; \
2408+
using spawnable_t = std::true_type; \
2409+
_Name_(arrow::ChunkedArray const* column) \
2410+
: o2::soa::Column<_Type_, _Name_>(o2::soa::ColumnIterator<type>(column)) \
2411+
{ \
2412+
} \
2413+
\
2414+
_Name_() = default; \
2415+
_Name_(_Name_ const& other) = default; \
2416+
_Name_& operator=(_Name_ const& other) = default; \
2417+
\
2418+
decltype(auto) _Getter_() const \
2419+
{ \
2420+
return *mColumnIterator; \
2421+
} \
2422+
\
2423+
decltype(auto) get() const \
2424+
{ \
2425+
return _Getter_(); \
2426+
} \
2427+
}; \
2428+
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, o2::framework::TypeIdHelpers::uniqueId<_Name_>(), \
2429+
o2::framework::expressions::selectArrowType<_Type_>() }
2430+
23962431
/// An index column is a column of indices to elements / of another table named
23972432
/// _Name_##s. The column name will be _Name_##Id and will always be stored in
23982433
/// "fIndex"#_Table_#[_Suffix_]. If _Suffix_ is not empty it has to begin
@@ -3104,6 +3139,32 @@ consteval auto getIndexTargets()
31043139
O2HASH(#_Name_ "Extension"); \
31053140
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, #_Name_ "Extension", _Table_, "AOD", "EX" _Description_, 0, __VA_ARGS__)
31063141

3142+
#define DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE_FULL(_Name_, _Label_, _OriginalTable_, _Origin_, _Desc_, _Version_, ...) \
3143+
O2HASH(_Desc_ "/" #_Version_); \
3144+
template <typename O> \
3145+
using _Name_##CfgExtensionFrom = soa::Table<o2::aod::Hash<_Label_ ""_h>, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \
3146+
using _Name_##CfgExtension = _Name_##CfgExtensionFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3147+
template <typename O = o2::aod::Hash<_Origin_ ""_h>> \
3148+
struct _Name_##CfgExtensionMetadataFrom : TableMetadata<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, __VA_ARGS__> { \
3149+
using base_table_t = _OriginalTable_; \
3150+
using extension_table_t = _Name_##CfgExtensionFrom<O>; \
3151+
using placeholders_pack_t = framework::pack<__VA_ARGS__>; \
3152+
using configurable_t = std::true_type; \
3153+
static constexpr auto sources = _OriginalTable_::originals; \
3154+
}; \
3155+
using _Name_##CfgExtensionMetadata = _Name_##CfgExtensionMetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3156+
template <> \
3157+
struct MetadataTrait<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>> { \
3158+
using metadata = _Name_##CfgExtensionMetadata; \
3159+
}; \
3160+
template <typename O> \
3161+
using _Name_##From = o2::soa::JoinFull<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, _OriginalTable_, _Name_##CfgExtensionFrom<O>>; \
3162+
using _Name_ = _Name_##From<o2::aod::Hash<_Origin_ ""_h>>;
3163+
3164+
#define DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
3165+
O2HASH(#_Name_ "CfgExtension"); \
3166+
DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE_FULL(_Name_, #_Name_ "CfgExtension", _Table_, "AOD", "EX" _Description_, 0, __VA_ARGS__)
3167+
31073168
#define DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, _Origin_, _Version_, _Desc_, _Exclusive_, ...) \
31083169
O2HASH(#_Name_); \
31093170
O2HASH(_Desc_ "/" #_Version_); \

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ struct TableTransform {
247247
template <typename T>
248248
concept is_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
249249

250+
template <typename T>
251+
concept is_dynamically_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_configurable_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
252+
250253
template <is_spawnable T>
251254
constexpr auto transformBase()
252255
{
@@ -282,12 +285,60 @@ struct Spawns : decltype(transformBase<T>()) {
282285
}
283286
std::shared_ptr<typename T::table_t> table = nullptr;
284287
std::shared_ptr<extension_t> extension = nullptr;
288+
std::shared_ptr<gandiva::Projector> projector = nullptr;
285289
};
286290

287291
template <typename T>
288292
concept is_spawns = requires(T t) {
289293
typename T::metadata;
290294
requires std::same_as<decltype(t.pack()), typename T::expression_pack_t>;
295+
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
296+
};
297+
298+
/// This helper struct allows you to declare extended tables with dynamically-supplied
299+
/// expressions to be created by the task
300+
/// The actual expressions have to be set in init() for the configurable expression
301+
/// columns, used to define the table
302+
303+
template <is_dynamically_spawnable T>
304+
struct Defines : decltype(transformBase<T>()) {
305+
using spawnable_t = T;
306+
using metadata = decltype(transformBase<T>())::metadata;
307+
using extension_t = typename metadata::extension_table_t;
308+
using base_table_t = typename metadata::base_table_t;
309+
using placeholders_pack_t = typename metadata::placeholders_pack_t;
310+
static constexpr size_t N = framework::pack_size(placeholders_pack_t{});
311+
312+
constexpr auto pack()
313+
{
314+
return placeholders_pack_t{};
315+
}
316+
317+
typename T::table_t* operator->()
318+
{
319+
return table.get();
320+
}
321+
typename T::table_t const& operator*() const
322+
{
323+
return *table;
324+
}
325+
326+
auto asArrowTable()
327+
{
328+
return extension->asArrowTable();
329+
}
330+
std::shared_ptr<typename T::table_t> table = nullptr;
331+
std::shared_ptr<extension_t> extension = nullptr;
332+
333+
std::array<o2::framework::expressions::Projector, N> projectors;
334+
std::shared_ptr<gandiva::Projector> projector = nullptr;
335+
};
336+
337+
template <typename T>
338+
concept is_defines = requires(T t) {
339+
typename T::metadata;
340+
requires std::same_as<decltype(t.pack()), typename T::placeholders_pack_t>;
341+
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
291342
};
292343

293344
/// Policy to control index building
@@ -744,7 +795,8 @@ template <soa::is_table T, soa::is_spawnable_column... Cs>
744795
auto Extend(T const& table)
745796
{
746797
using output_t = Join<T, soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
747-
return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension"), table.asArrowTable()}, 0};
798+
static std::shared_ptr<gandiva::Projector> projector = nullptr;
799+
return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension", projector), table.asArrowTable()}, 0};
748800
}
749801

750802
/// Template function to attach dynamic columns on-the-fly (e.g. inside

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -141,22 +141,30 @@ bool requestInputs(std::vector<InputSpec>&, T const&)
141141
}
142142

143143
template <is_spawns T>
144-
bool requestInputs(std::vector<InputSpec>& inputs, T const& spawns)
144+
const char* controlOption()
145145
{
146-
auto base_specs = spawns.base_specs();
147-
for (auto base_spec : base_specs) {
148-
base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:spawn"}, VariantType::Bool, true, {"\"\""}});
149-
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
150-
}
151-
return true;
146+
return "control:spawn";
152147
}
153148

154149
template <is_builds T>
155-
bool requestInputs(std::vector<InputSpec>& inputs, T const& builds)
150+
const char* controlOption()
156151
{
157-
auto base_specs = builds.base_specs();
152+
return "control:build";
153+
}
154+
155+
template <is_defines T>
156+
const char* controlOption()
157+
{
158+
return "control:define";
159+
}
160+
161+
template <typename T>
162+
requires(is_spawns<T> || is_builds<T> || is_defines<T>)
163+
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
164+
{
165+
auto base_specs = entity.base_specs();
158166
for (auto base_spec : base_specs) {
159-
base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:build"}, VariantType::Bool, true, {"\"\""}});
167+
base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
160168
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
161169
}
162170
return true;
@@ -219,17 +227,11 @@ bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
219227
return true;
220228
}
221229

222-
template <is_spawns T>
223-
bool appendOutput(std::vector<OutputSpec>& outputs, T& spawns, uint32_t)
224-
{
225-
outputs.emplace_back(spawns.spec());
226-
return true;
227-
}
228-
229-
template <is_builds T>
230-
bool appendOutput(std::vector<OutputSpec>& outputs, T& builds, uint32_t)
230+
template <typename T>
231+
requires(is_spawns<T> || is_builds<T> || is_defines<T>)
232+
bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
231233
{
232-
outputs.emplace_back(builds.spec());
234+
outputs.emplace_back(entity.spec());
233235
return true;
234236
}
235237

@@ -286,7 +288,7 @@ bool prepareOutput(ProcessingContext& context, T& spawns)
286288
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
287289
}
288290

289-
spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>()));
291+
spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), spawns.projector));
290292
spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}));
291293
return true;
292294
}
@@ -298,6 +300,21 @@ bool prepareOuput(ProcessingContext& context, T& builds)
298300
return builds.template build<typename T::buildable_t::indexing_t>(builds.pack(), extractOriginals<metadata::sources.size(), metadata::sources>(context));
299301
}
300302

303+
template <is_defines T>
304+
bool prepareOutput(ProcessingContext& context, T& defines)
305+
{
306+
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
307+
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context));
308+
if (originalTable->schema()->fields().empty() == true) {
309+
using base_table_t = typename T::base_table_t::table_t;
310+
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
311+
}
312+
313+
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), defines.projectors.data(), defines.projector));
314+
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}));
315+
return true;
316+
}
317+
301318
template <typename T>
302319
bool finalizeOutput(ProcessingContext&, T&)
303320
{
@@ -333,6 +350,13 @@ bool finalizeOutput(ProcessingContext& context, T& builds)
333350
return true;
334351
}
335352

353+
template <is_defines T>
354+
bool finalizeOutput(ProcessingContext& context, T& defines)
355+
{
356+
context.outputs().adopt(defines.output(), defines.asArrowTable());
357+
return true;
358+
}
359+
336360
/// Service handling
337361
template <typename T>
338362
bool addService(std::vector<ServiceSpec>&, T&)

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
576576
homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());
577577

578578
auto& callbacks = ic.services().get<CallbackService>();
579-
auto endofdatacb = [task](EndOfStreamContext& eosContext) {
579+
auto eoscb = [task](EndOfStreamContext& eosContext) {
580580
homogeneous_apply_refs([&eosContext](auto& element) {
581581
analysis_task_parsers::postRunService(eosContext, element);
582582
analysis_task_parsers::postRunOutput(eosContext, element);
@@ -585,13 +585,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
585585
eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
586586
};
587587

588-
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
588+
callbacks.set<CallbackService::Id::EndOfStream>(eoscb);
589589

590590
/// update configurables in filters and partitions
591591
homogeneous_apply_refs(
592592
[&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
593593
*task.get());
594-
/// create for filters gandiva trees matched to schemas and store the pointers into expressionInfos
594+
/// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
595595
homogeneous_apply_refs([&expressionInfos](auto& element) {
596596
return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
597597
},

Framework/Core/include/Framework/Expressions.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,8 @@ inline Node ifnode(Node&& condition_, Configurable<L1> const& then_, Configurabl
404404

405405
/// A struct, containing the root of the expression tree
406406
struct Filter {
407+
Filter() = default;
408+
407409
Filter(Node&& node_) : node{std::make_unique<Node>(std::forward<Node>(node_))}
408410
{
409411
(void)designateSubtrees(node.get());
@@ -413,7 +415,14 @@ struct Filter {
413415
{
414416
(void)designateSubtrees(node.get());
415417
}
416-
std::unique_ptr<Node> node;
418+
419+
Filter& operator=(Filter&& other) noexcept
420+
{
421+
node = std::move(other.node);
422+
return *this;
423+
}
424+
425+
std::unique_ptr<Node> node = nullptr;
417426

418427
size_t designateSubtrees(Node* node, size_t index = 0);
419428
};

0 commit comments

Comments
 (0)