Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ concept has_metadata = is_metadata_trait<T> && not_void<typename T::metadata>;
template <typename T>
concept has_extension = is_metadata<T> && not_void<typename T::extension_table_t>;

template <typename T>
concept has_configurable_extension = has_extension<T> && requires(T t) { typename T::configurable_t; requires std::same_as<std::true_type, typename T::configurable_t>; };

template <typename T>
concept is_spawnable_column = std::same_as<typename T::spawnable_t, std::true_type>;

Expand Down Expand Up @@ -2355,7 +2358,7 @@ O2HASH("TEST/0");
DECLARE_SOA_BITMAP_COLUMN_FULL(_Name_, _Getter_, _Size_, "f" #_Name_)

/// An 'expression' column. i.e. a column that can be calculated from other
/// columns with gandiva based on supplied C++ expression.
/// columns with gandiva based on static C++ expression.
#define DECLARE_SOA_EXPRESSION_COLUMN_FULL(_Name_, _Getter_, _Type_, _Label_, _Expression_) \
struct _Name_ : o2::soa::Column<_Type_, _Name_> { \
static constexpr const char* mLabel = _Label_; \
Expand Down Expand Up @@ -2393,6 +2396,38 @@ O2HASH("TEST/0");
#define DECLARE_SOA_EXPRESSION_COLUMN(_Name_, _Getter_, _Type_, _Expression_) \
DECLARE_SOA_EXPRESSION_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_, _Expression_);

/// A configurable 'expression' column. i.e. a column that can be calculated from other
/// columns with gandiva based on dynamically supplied C++ expression or a string definition.
#define DECLARE_SOA_CONFIGURABLE_EXPRESSION_COLUMN(_Name_, _Getter_, _Type_, _Label_) \
struct _Name_ : o2::soa::Column<_Type_, _Name_> { \
static constexpr const char* mLabel = _Label_; \
static constexpr const int32_t mHash = _Label_ ""_h; \
using base = o2::soa::Column<_Type_, _Name_>; \
using type = _Type_; \
using column_t = _Name_; \
using spawnable_t = std::true_type; \
_Name_(arrow::ChunkedArray const* column) \
: o2::soa::Column<_Type_, _Name_>(o2::soa::ColumnIterator<type>(column)) \
{ \
} \
\
_Name_() = default; \
_Name_(_Name_ const& other) = default; \
_Name_& operator=(_Name_ const& other) = default; \
\
decltype(auto) _Getter_() const \
{ \
return *mColumnIterator; \
} \
\
decltype(auto) get() const \
{ \
return _Getter_(); \
} \
}; \
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, o2::framework::TypeIdHelpers::uniqueId<_Name_>(), \
o2::framework::expressions::selectArrowType<_Type_>() }

/// An index column is a column of indices to elements / of another table named
/// _Name_##s. The column name will be _Name_##Id and will always be stored in
/// "fIndex"#_Table_#[_Suffix_]. If _Suffix_ is not empty it has to begin
Expand Down Expand Up @@ -3104,6 +3139,32 @@ consteval auto getIndexTargets()
O2HASH(#_Name_ "Extension"); \
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, #_Name_ "Extension", _Table_, "AOD", "EX" _Description_, 0, __VA_ARGS__)

#define DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE_FULL(_Name_, _Label_, _OriginalTable_, _Origin_, _Desc_, _Version_, ...) \
O2HASH(_Desc_ "/" #_Version_); \
template <typename O> \
using _Name_##CfgExtensionFrom = soa::Table<o2::aod::Hash<_Label_ ""_h>, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \
using _Name_##CfgExtension = _Name_##CfgExtensionFrom<o2::aod::Hash<_Origin_ ""_h>>; \
template <typename O = o2::aod::Hash<_Origin_ ""_h>> \
struct _Name_##CfgExtensionMetadataFrom : TableMetadata<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, __VA_ARGS__> { \
using base_table_t = _OriginalTable_; \
using extension_table_t = _Name_##CfgExtensionFrom<O>; \
using placeholders_pack_t = framework::pack<__VA_ARGS__>; \
using configurable_t = std::true_type; \
static constexpr auto sources = _OriginalTable_::originals; \
}; \
using _Name_##CfgExtensionMetadata = _Name_##CfgExtensionMetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
template <> \
struct MetadataTrait<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>> { \
using metadata = _Name_##CfgExtensionMetadata; \
}; \
template <typename O> \
using _Name_##From = o2::soa::JoinFull<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, _OriginalTable_, _Name_##CfgExtensionFrom<O>>; \
using _Name_ = _Name_##From<o2::aod::Hash<_Origin_ ""_h>>;

#define DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
O2HASH(#_Name_ "CfgExtension"); \
DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE_FULL(_Name_, #_Name_ "CfgExtension", _Table_, "AOD", "EX" _Description_, 0, __VA_ARGS__)

#define DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, _Origin_, _Version_, _Desc_, _Exclusive_, ...) \
O2HASH(#_Name_); \
O2HASH(_Desc_ "/" #_Version_); \
Expand Down
54 changes: 53 additions & 1 deletion Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ struct TableTransform {
template <typename T>
concept is_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;

template <typename T>
concept is_dynamically_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_configurable_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;

template <is_spawnable T>
constexpr auto transformBase()
{
Expand Down Expand Up @@ -282,12 +285,60 @@ struct Spawns : decltype(transformBase<T>()) {
}
std::shared_ptr<typename T::table_t> table = nullptr;
std::shared_ptr<extension_t> extension = nullptr;
std::shared_ptr<gandiva::Projector> projector = nullptr;
};

template <typename T>
concept is_spawns = requires(T t) {
typename T::metadata;
requires std::same_as<decltype(t.pack()), typename T::expression_pack_t>;
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
};

/// This helper struct allows you to declare extended tables with dynamically-supplied
/// expressions to be created by the task
/// The actual expressions have to be set in init() for the configurable expression
/// columns, used to define the table

template <is_dynamically_spawnable T>
struct Defines : decltype(transformBase<T>()) {
using spawnable_t = T;
using metadata = decltype(transformBase<T>())::metadata;
using extension_t = typename metadata::extension_table_t;
using base_table_t = typename metadata::base_table_t;
using placeholders_pack_t = typename metadata::placeholders_pack_t;
static constexpr size_t N = framework::pack_size(placeholders_pack_t{});

constexpr auto pack()
{
return placeholders_pack_t{};
}

typename T::table_t* operator->()
{
return table.get();
}
typename T::table_t const& operator*() const
{
return *table;
}

auto asArrowTable()
{
return extension->asArrowTable();
}
std::shared_ptr<typename T::table_t> table = nullptr;
std::shared_ptr<extension_t> extension = nullptr;

std::array<o2::framework::expressions::Projector, N> projectors;
std::shared_ptr<gandiva::Projector> projector = nullptr;
};

template <typename T>
concept is_defines = requires(T t) {
typename T::metadata;
requires std::same_as<decltype(t.pack()), typename T::placeholders_pack_t>;
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
};

/// Policy to control index building
Expand Down Expand Up @@ -744,7 +795,8 @@ template <soa::is_table T, soa::is_spawnable_column... Cs>
auto Extend(T const& table)
{
using output_t = Join<T, soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension"), table.asArrowTable()}, 0};
static std::shared_ptr<gandiva::Projector> projector = nullptr;
return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension", projector), table.asArrowTable()}, 0};
}

/// Template function to attach dynamic columns on-the-fly (e.g. inside
Expand Down
66 changes: 45 additions & 21 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,30 @@ bool requestInputs(std::vector<InputSpec>&, T const&)
}

template <is_spawns T>
bool requestInputs(std::vector<InputSpec>& inputs, T const& spawns)
const char* controlOption()
{
auto base_specs = spawns.base_specs();
for (auto base_spec : base_specs) {
base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:spawn"}, VariantType::Bool, true, {"\"\""}});
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
}
return true;
return "control:spawn";
}

template <is_builds T>
bool requestInputs(std::vector<InputSpec>& inputs, T const& builds)
const char* controlOption()
{
auto base_specs = builds.base_specs();
return "control:build";
}

template <is_defines T>
const char* controlOption()
{
return "control:define";
}

template <typename T>
requires(is_spawns<T> || is_builds<T> || is_defines<T>)
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
{
auto base_specs = entity.base_specs();
for (auto base_spec : base_specs) {
base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:build"}, VariantType::Bool, true, {"\"\""}});
base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
}
return true;
Expand Down Expand Up @@ -219,17 +227,11 @@ bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
return true;
}

template <is_spawns T>
bool appendOutput(std::vector<OutputSpec>& outputs, T& spawns, uint32_t)
{
outputs.emplace_back(spawns.spec());
return true;
}

template <is_builds T>
bool appendOutput(std::vector<OutputSpec>& outputs, T& builds, uint32_t)
template <typename T>
requires(is_spawns<T> || is_builds<T> || is_defines<T>)
bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
{
outputs.emplace_back(builds.spec());
outputs.emplace_back(entity.spec());
return true;
}

Expand Down Expand Up @@ -286,7 +288,7 @@ bool prepareOutput(ProcessingContext& context, T& spawns)
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
}

spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>()));
spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), spawns.projector));
spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}));
return true;
}
Expand All @@ -298,6 +300,21 @@ bool prepareOuput(ProcessingContext& context, T& builds)
return builds.template build<typename T::buildable_t::indexing_t>(builds.pack(), extractOriginals<metadata::sources.size(), metadata::sources>(context));
}

template <is_defines T>
bool prepareOutput(ProcessingContext& context, T& defines)
{
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context));
if (originalTable->schema()->fields().empty() == true) {
using base_table_t = typename T::base_table_t::table_t;
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
}

defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), defines.projectors.data(), defines.projector));
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}));
return true;
}

template <typename T>
bool finalizeOutput(ProcessingContext&, T&)
{
Expand Down Expand Up @@ -333,6 +350,13 @@ bool finalizeOutput(ProcessingContext& context, T& builds)
return true;
}

template <is_defines T>
bool finalizeOutput(ProcessingContext& context, T& defines)
{
context.outputs().adopt(defines.output(), defines.asArrowTable());
return true;
}

/// Service handling
template <typename T>
bool addService(std::vector<ServiceSpec>&, T&)
Expand Down
6 changes: 3 additions & 3 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());

auto& callbacks = ic.services().get<CallbackService>();
auto endofdatacb = [task](EndOfStreamContext& eosContext) {
auto eoscb = [task](EndOfStreamContext& eosContext) {
homogeneous_apply_refs([&eosContext](auto& element) {
analysis_task_parsers::postRunService(eosContext, element);
analysis_task_parsers::postRunOutput(eosContext, element);
Expand All @@ -585,13 +585,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};

callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
callbacks.set<CallbackService::Id::EndOfStream>(eoscb);

/// update configurables in filters and partitions
homogeneous_apply_refs(
[&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
*task.get());
/// create for filters gandiva trees matched to schemas and store the pointers into expressionInfos
/// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
homogeneous_apply_refs([&expressionInfos](auto& element) {
return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
},
Expand Down
11 changes: 10 additions & 1 deletion Framework/Core/include/Framework/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ inline Node ifnode(Node&& condition_, Configurable<L1> const& then_, Configurabl

/// A struct, containing the root of the expression tree
struct Filter {
Filter() = default;

Filter(Node&& node_) : node{std::make_unique<Node>(std::forward<Node>(node_))}
{
(void)designateSubtrees(node.get());
Expand All @@ -413,7 +415,14 @@ struct Filter {
{
(void)designateSubtrees(node.get());
}
std::unique_ptr<Node> node;

Filter& operator=(Filter&& other) noexcept
{
node = std::move(other.node);
return *this;
}

std::unique_ptr<Node> node = nullptr;

size_t designateSubtrees(Node* node, size_t index = 0);
};
Expand Down
Loading