Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ concept is_spawns = requires(T t) {
/// The actual expressions have to be set in init() for the configurable expression
/// columns, used to define the table

template <is_dynamically_spawnable T>
template <is_dynamically_spawnable T, bool DELAYED = false>
struct Defines : decltype(transformBase<T>()) {
static constexpr bool delayed = DELAYED;
using spawnable_t = T;
using metadata = decltype(transformBase<T>())::metadata;
using extension_t = typename metadata::extension_table_t;
Expand Down Expand Up @@ -373,13 +374,26 @@ struct Defines : decltype(transformBase<T>()) {
std::array<o2::framework::expressions::Projector, N> projectors;
std::shared_ptr<gandiva::Projector> projector = nullptr;
std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(placeholders_pack_t{}));
std::shared_ptr<arrow::Schema> inputSchema = nullptr;

bool needRecompilation = false;

void recompile()
{
projector = framework::expressions::createProjectorHelper(N, projectors.data(), inputSchema, schema->fields());
}
};

template <is_dynamically_spawnable T>
using DefinesDelayed = Defines<T, true>;

template <typename T>
concept is_defines = requires(T t) {
typename T::metadata;
requires std::same_as<decltype(t.pack()), typename T::placeholders_pack_t>;
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
requires std::same_as<decltype(t.needRecompilation), bool>;
&T::recompile;
};

/// Policy to control index building
Expand Down
37 changes: 37 additions & 0 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,50 @@ bool prepareOutput(ProcessingContext& context, T& builds)

template <is_defines T>
bool prepareOutput(ProcessingContext& context, T& defines)
requires(T::delayed == false)
{
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
if (originalTable->schema()->fields().empty() == true) {
using base_table_t = typename T::base_table_t::table_t;
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
}
if (defines.inputSchema == nullptr) {
defines.inputSchema = originalTable->schema();
}
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;

defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
o2::aod::label<metadata::extension_table_t::ref>(),
defines.projectors.data(),
defines.projector,
defines.schema));
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
return true;
}

template <typename T>
bool prepareDelayedOutput(ProcessingContext&, T&)
{
return false;
}

template <is_defines T>
requires(T::delayed == true)
bool prepareDelayedOutput(ProcessingContext& context, T& defines)
{
if (defines.needRecompilation) {
defines.recompile();
}
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
if (originalTable->schema()->fields().empty() == true) {
using base_table_t = typename T::base_table_t::table_t;
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
}
if (defines.inputSchema == nullptr) {
defines.inputSchema = originalTable->schema();
}
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;

defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
return false;
},
*task.get());
// prepare delayed outputs
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
// finalize outputs
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
};
Expand Down
14 changes: 14 additions & 0 deletions Framework/Core/test/test_Concepts.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ struct P {
PROCESS_SWITCH(P, process1, "", true);
};

namespace o2::aod
{
namespace ct
{
DECLARE_SOA_CONFIGURABLE_EXPRESSION_COLUMN(Test, test, float, "test");
}
DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE(TracksMore, TracksIU, "TRKMORE", ct::Test);
} // namespace o2::aod

TEST_CASE("IdentificationConcepts")
{
// ASoA
Expand Down Expand Up @@ -122,6 +131,11 @@ TEST_CASE("IdentificationConcepts")
Builds<o2::aod::Run3MatchedSparse> bld;
REQUIRE(is_builds<decltype(bld)>);

Defines<o2::aod::TracksMore> def;
DefinesDelayed<o2::aod::TracksMore> ddef;
REQUIRE(is_defines<decltype(def)>);
REQUIRE(is_defines<decltype(ddef)>);

OutputObj<TH1F> oo{"test"};
REQUIRE(is_outputobj<decltype(oo)>);

Expand Down