Skip to content

Commit 38e15e3

Browse files
committed
DPL Analysis: add DefinesDelayed allowing to set the expressions in a process function, e.g. after calculating some required input values
1 parent 2d7a0bd commit 38e15e3

File tree

4 files changed

+65
-1
lines changed

4 files changed

+65
-1
lines changed

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ concept is_spawns = requires(T t) {
340340
/// The actual expressions have to be set in init() for the configurable expression
341341
/// columns, used to define the table
342342

343-
template <is_dynamically_spawnable T>
343+
template <is_dynamically_spawnable T, bool DELAYED = false>
344344
struct Defines : decltype(transformBase<T>()) {
345+
static constexpr bool delayed = DELAYED;
345346
using spawnable_t = T;
346347
using metadata = decltype(transformBase<T>())::metadata;
347348
using extension_t = typename metadata::extension_table_t;
@@ -373,13 +374,26 @@ struct Defines : decltype(transformBase<T>()) {
373374
std::array<o2::framework::expressions::Projector, N> projectors;
374375
std::shared_ptr<gandiva::Projector> projector = nullptr;
375376
std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(placeholders_pack_t{}));
377+
std::shared_ptr<arrow::Schema> inputSchema = nullptr;
378+
379+
bool needRecompilation = false;
380+
381+
void recompile()
382+
{
383+
projector = framework::expressions::createProjectorHelper(N, projectors.data(), inputSchema, schema->fields());
384+
}
376385
};
377386

387+
template <is_dynamically_spawnable T>
388+
using DefinesDelayed = Defines<T, true>;
389+
378390
template <typename T>
379391
concept is_defines = requires(T t) {
380392
typename T::metadata;
381393
requires std::same_as<decltype(t.pack()), typename T::placeholders_pack_t>;
382394
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
395+
requires std::same_as<decltype(t.needRecompilation), bool>;
396+
&T::recompile;
383397
};
384398

385399
/// Policy to control index building

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,49 @@ bool prepareOutput(ProcessingContext& context, T& builds)
307307

308308
template <is_defines T>
309309
bool prepareOutput(ProcessingContext& context, T& defines)
310+
requires(T::delayed == false)
310311
{
311312
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
312313
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
313314
if (originalTable->schema()->fields().empty() == true) {
314315
using base_table_t = typename T::base_table_t::table_t;
315316
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
316317
}
318+
if (defines.inputSchema == nullptr) {
319+
defines.inputSchema = originalTable->schema();
320+
}
321+
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
322+
323+
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
324+
o2::aod::label<metadata::extension_table_t::ref>(),
325+
defines.projectors.data(),
326+
defines.projector,
327+
defines.schema));
328+
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
329+
return true;
330+
}
331+
332+
template <typename T>
333+
bool prepareDelayedOutput(ProcessingContext&, T&)
334+
{
335+
return false;
336+
}
337+
338+
template <is_defines T>
339+
bool prepareDelayedOutput(ProcessingContext& context, T& defines)
340+
{
341+
if (defines.needRecompilation) {
342+
defines.recompile();
343+
}
344+
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
345+
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
346+
if (originalTable->schema()->fields().empty() == true) {
347+
using base_table_t = typename T::base_table_t::table_t;
348+
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
349+
}
350+
if (defines.inputSchema == nullptr) {
351+
defines.inputSchema = originalTable->schema();
352+
}
317353
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
318354

319355
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
643643
return false;
644644
},
645645
*task.get());
646+
// prepare delayed outputs
647+
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
646648
// finalize outputs
647649
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
648650
};

Framework/Core/test/test_Concepts.cxx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ struct P {
3232
PROCESS_SWITCH(P, process1, "", true);
3333
};
3434

35+
namespace o2::aod {
36+
namespace ct {
37+
DECLARE_SOA_CONFIGURABLE_EXPRESSION_COLUMN(Test, test, float, "test");
38+
}
39+
DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE(TracksMore, TracksIU, "TRKMORE", ct::Test);
40+
}
41+
3542
TEST_CASE("IdentificationConcepts")
3643
{
3744
// ASoA
@@ -122,6 +129,11 @@ TEST_CASE("IdentificationConcepts")
122129
Builds<o2::aod::Run3MatchedSparse> bld;
123130
REQUIRE(is_builds<decltype(bld)>);
124131

132+
Defines<o2::aod::TracksMore> def;
133+
DefinesDelayed<o2::aod::TracksMore> ddef;
134+
REQUIRE(is_defines<decltype(def)>);
135+
REQUIRE(is_defines<decltype(ddef)>);
136+
125137
OutputObj<TH1F> oo{"test"};
126138
REQUIRE(is_outputobj<decltype(oo)>);
127139

0 commit comments

Comments
 (0)