Skip to content

Commit 38acc6f

Browse files
authored
DPL Analysis: add DefinesDelayed
Add `DefinesDelayed` allowing to set the expressions in a process function, e.g. after calculating some required input values
1 parent 96dafaa commit 38acc6f

File tree

4 files changed

+68
-1
lines changed

4 files changed

+68
-1
lines changed

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ concept is_spawns = requires(T t) {
340340
/// The actual expressions have to be set in init() for the configurable expression
341341
/// columns, used to define the table
342342

343-
template <is_dynamically_spawnable T>
343+
template <is_dynamically_spawnable T, bool DELAYED = false>
344344
struct Defines : decltype(transformBase<T>()) {
345+
static constexpr bool delayed = DELAYED;
345346
using spawnable_t = T;
346347
using metadata = decltype(transformBase<T>())::metadata;
347348
using extension_t = typename metadata::extension_table_t;
@@ -373,13 +374,26 @@ struct Defines : decltype(transformBase<T>()) {
373374
std::array<o2::framework::expressions::Projector, N> projectors;
374375
std::shared_ptr<gandiva::Projector> projector = nullptr;
375376
std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(placeholders_pack_t{}));
377+
std::shared_ptr<arrow::Schema> inputSchema = nullptr;
378+
379+
bool needRecompilation = false;
380+
381+
void recompile()
382+
{
383+
projector = framework::expressions::createProjectorHelper(N, projectors.data(), inputSchema, schema->fields());
384+
}
376385
};
377386

387+
template <is_dynamically_spawnable T>
388+
using DefinesDelayed = Defines<T, true>;
389+
378390
template <typename T>
379391
concept is_defines = requires(T t) {
380392
typename T::metadata;
381393
requires std::same_as<decltype(t.pack()), typename T::placeholders_pack_t>;
382394
requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
395+
requires std::same_as<decltype(t.needRecompilation), bool>;
396+
&T::recompile;
383397
};
384398

385399
/// Policy to control index building

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,50 @@ bool prepareOutput(ProcessingContext& context, T& builds)
307307

308308
template <is_defines T>
309309
bool prepareOutput(ProcessingContext& context, T& defines)
310+
requires(T::delayed == false)
310311
{
311312
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
312313
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
313314
if (originalTable->schema()->fields().empty() == true) {
314315
using base_table_t = typename T::base_table_t::table_t;
315316
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
316317
}
318+
if (defines.inputSchema == nullptr) {
319+
defines.inputSchema = originalTable->schema();
320+
}
321+
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
322+
323+
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
324+
o2::aod::label<metadata::extension_table_t::ref>(),
325+
defines.projectors.data(),
326+
defines.projector,
327+
defines.schema));
328+
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
329+
return true;
330+
}
331+
332+
template <typename T>
333+
bool prepareDelayedOutput(ProcessingContext&, T&)
334+
{
335+
return false;
336+
}
337+
338+
template <is_defines T>
339+
requires(T::delayed == true)
340+
bool prepareDelayedOutput(ProcessingContext& context, T& defines)
341+
{
342+
if (defines.needRecompilation) {
343+
defines.recompile();
344+
}
345+
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
346+
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
347+
if (originalTable->schema()->fields().empty() == true) {
348+
using base_table_t = typename T::base_table_t::table_t;
349+
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
350+
}
351+
if (defines.inputSchema == nullptr) {
352+
defines.inputSchema = originalTable->schema();
353+
}
317354
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
318355

319356
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
645645
return false;
646646
},
647647
*task.get());
648+
// prepare delayed outputs
649+
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
648650
// finalize outputs
649651
homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
650652
};

Framework/Core/test/test_Concepts.cxx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ struct P {
3232
PROCESS_SWITCH(P, process1, "", true);
3333
};
3434

35+
namespace o2::aod
36+
{
37+
namespace ct
38+
{
39+
DECLARE_SOA_CONFIGURABLE_EXPRESSION_COLUMN(Test, test, float, "test");
40+
}
41+
DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE(TracksMore, TracksIU, "TRKMORE", ct::Test);
42+
} // namespace o2::aod
43+
3544
TEST_CASE("IdentificationConcepts")
3645
{
3746
// ASoA
@@ -122,6 +131,11 @@ TEST_CASE("IdentificationConcepts")
122131
Builds<o2::aod::Run3MatchedSparse> bld;
123132
REQUIRE(is_builds<decltype(bld)>);
124133

134+
Defines<o2::aod::TracksMore> def;
135+
DefinesDelayed<o2::aod::TracksMore> ddef;
136+
REQUIRE(is_defines<decltype(def)>);
137+
REQUIRE(is_defines<decltype(ddef)>);
138+
125139
OutputObj<TH1F> oo{"test"};
126140
REQUIRE(is_outputobj<decltype(oo)>);
127141

0 commit comments

Comments
 (0)