Skip to content

Commit 35a1907

Browse files
committed
WIP DPL Analysis: centralised CCDB support in analysis
Thanks to the newly added binary view columns we can finally support proper CCDB integration in analysis. In order to do so, the user needs to create a TIMESTAMPED table, i.e. a table which is an extension of another one where the timestamps for each rows are provided. The extra columns of such timestamped table will be CCDB columns where the iterator of each provides access for one specified CCDB object.
1 parent 44f7163 commit 35a1907

File tree

12 files changed

+320
-8
lines changed

12 files changed

+320
-8
lines changed

Framework/CCDBSupport/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,9 +9,10 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
1414
src/CCDBHelpers.cxx
15+
src/AnalysisCCDBHelpers.cxx
1516
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1617
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
1718

Framework/CCDBSupport/src/Plugin.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/Plugins.h"
1212
#include "Framework/AlgorithmSpec.h"
13+
#include "AnalysisCCDBHelpers.h"
1314
#include "CCDBHelpers.h"
1415

1516
struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
@@ -19,6 +20,14 @@ struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
1920
}
2021
};
2122

23+
struct AnalysisCCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
24+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& ctx) final
25+
{
26+
return o2::framework::AnalysisCCDBHelpers::fetchFromCCDB(ctx);
27+
}
28+
};
29+
2230
DEFINE_DPL_PLUGINS_BEGIN
2331
DEFINE_DPL_PLUGIN_INSTANCE(CCDBFetcherPlugin, CustomAlgorithm);
32+
DEFINE_DPL_PLUGIN_INSTANCE(AnalysisCCDBFetcherPlugin, CustomAlgorithm);
2433
DEFINE_DPL_PLUGINS_END

Framework/Core/include/Framework/ASoA.h

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,6 +1274,11 @@ concept with_sources = requires {
12741274
T::sources.size();
12751275
};
12761276

1277+
template <typename T>
1278+
concept with_ccdb_urls = requires {
1279+
T::ccdb_urls.size();
1280+
};
1281+
12771282
template <typename T>
12781283
concept with_base_table = not_void<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata::base_table_t>;
12791284

@@ -2248,11 +2253,14 @@ ColumnGetterFunction<R, typename T::iterator> getColumnGetterByLabel(const std::
22482253

22492254
namespace o2::aod
22502255
{
2256+
// If you get an error about not satisfying is_origin_hash, you need to add
2257+
// an entry here.
22512258
O2ORIGIN("AOD");
22522259
O2ORIGIN("AOD1");
22532260
O2ORIGIN("AOD2");
22542261
O2ORIGIN("DYN");
22552262
O2ORIGIN("IDX");
2263+
O2ORIGIN("TIM");
22562264
O2ORIGIN("JOIN");
22572265
O2HASH("JOIN/0");
22582266
O2ORIGIN("CONC");
@@ -2313,6 +2321,46 @@ consteval static std::string_view namespace_prefix()
23132321
}; \
23142322
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }
23152323

2324+
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2325+
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
2326+
static constexpr const char* mLabel = _Label_; \
2327+
static constexpr const char* query = _CCDBQuery_; \
2328+
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
2329+
using base = o2::soa::Column<std::span<std::byte>, _Name_>; \
2330+
using type = std::span<std::byte>; \
2331+
using column_t = _Name_; \
2332+
_Name_(arrow::ChunkedArray const* column) \
2333+
: o2::soa::Column<std::span<std::byte>, _Name_>(o2::soa::ColumnIterator<std::span<std::byte>>(column)) \
2334+
{ \
2335+
} \
2336+
\
2337+
_Name_() = default; \
2338+
_Name_(_Name_ const& other) = default; \
2339+
_Name_& operator=(_Name_ const& other) = default; \
2340+
\
2341+
decltype(auto) _Getter_() const \
2342+
{ \
2343+
static std::byte* payload = nullptr; \
2344+
static _ConcreteType_* deserialised = nullptr; \
2345+
auto span = *mColumnIterator; \
2346+
if (payload != (std::byte*)span.data()) { \
2347+
payload = (std::byte*)span.data(); \
2348+
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
2349+
deserialised = (_ConcreteType_*)f.ReadObjectAny(TClass::GetClass(#_ConcreteType_)); \
2350+
} \
2351+
return *deserialised; \
2352+
} \
2353+
\
2354+
decltype(auto) \
2355+
get() const \
2356+
{ \
2357+
return _Getter_(); \
2358+
} \
2359+
};
2360+
2361+
#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2362+
DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_)
2363+
23162364
#define DECLARE_SOA_COLUMN(_Name_, _Getter_, _Type_) \
23172365
DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_)
23182366

@@ -3188,6 +3236,43 @@ consteval auto getIndexTargets()
31883236
using metadata = _Name_##Metadata; \
31893237
};
31903238

3239+
// Declare were each row is associated to a timestamp column of an _TimestampSource_
3240+
// table.
3241+
//
3242+
// The columns of this table have to be CCDB_COLUMNS so that for each timestamp, we get a row
3243+
// which points to the specified CCDB objectes described by those columns.
3244+
#define DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, _Label_, _TimestampSource_, _TimestampColumn_, _Origin_, _Version_, _Desc_, ...) \
3245+
O2HASH(_Desc_ "/" #_Version_); \
3246+
template <typename O> \
3247+
using _Name_##TimestampFrom = soa::Table<o2::aod::Hash<_Label_ ""_h>, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \
3248+
using _Name_##Timestamp = _Name_##TimestampFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3249+
template <typename O = o2::aod::Hash<_Origin_ ""_h>> \
3250+
struct _Name_##TimestampMetadataFrom : TableMetadata<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, __VA_ARGS__> { \
3251+
using base_table_t = _TimestampSource_; \
3252+
using extension_table_t = _Name_##TimestampFrom<O>; \
3253+
static constexpr const auto ccdb_urls = []<typename... Cs>(framework::pack<Cs...>) { \
3254+
return std::array<std::string_view, sizeof...(Cs)>{Cs::query...}; \
3255+
}(framework::pack<__VA_ARGS__>{}); \
3256+
static constexpr const auto ccdb_bindings = []<typename... Cs>(framework::pack<Cs...>) { \
3257+
return std::array<std::string_view, sizeof...(Cs)>{Cs::mLabel...}; \
3258+
}(framework::pack<__VA_ARGS__>{}); \
3259+
static constexpr auto sources = _TimestampSource_::originals; \
3260+
static constexpr auto timestamp_column_label = _TimestampColumn_::mLabel; \
3261+
/*static constexpr auto timestampColumn = _TimestampColumn_;*/ \
3262+
}; \
3263+
using _Name_##TimestampMetadata = _Name_##TimestampMetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3264+
template <> \
3265+
struct MetadataTrait<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>> { \
3266+
using metadata = _Name_##TimestampMetadata; \
3267+
}; \
3268+
template <typename O> \
3269+
using _Name_##From = o2::soa::JoinFull<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, _TimestampSource_, _Name_##TimestampFrom<O>>; \
3270+
using _Name_ = _Name_##From<o2::aod::Hash<_Origin_ ""_h>>;
3271+
3272+
#define DECLARE_SOA_TIMESTAMPED_TABLE(_Name_, _TimestampSource_, _TimestampColumn_, _Version_, _Desc_, ...) \
3273+
O2HASH(#_Name_ "Timestamped"); \
3274+
DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, #_Name_ "Timestamped", _TimestampSource_, _TimestampColumn_, "TIM", _Version_, _Desc_, __VA_ARGS__)
3275+
31913276
#define DECLARE_SOA_INDEX_TABLE(_Name_, _Key_, _Description_, ...) \
31923277
DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, "IDX", 0, _Description_, false, __VA_ARGS__)
31933278

Framework/Core/include/Framework/AnalysisContext.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,24 @@ struct OutputObjectInfo {
2929
std::vector<std::string> bindings;
3030
};
3131

32-
//
32+
// This will keep track of the inputs which have
33+
// been requested and for which we will need to inject
34+
// some source device.
3335
struct AnalysisContext {
3436
std::vector<InputSpec> requestedAODs;
3537
std::vector<OutputSpec> providedAODs;
3638
std::vector<InputSpec> requestedDYNs;
3739
std::vector<OutputSpec> providedDYNs;
3840
std::vector<InputSpec> requestedIDXs;
41+
std::vector<OutputSpec> providedTIMs;
42+
std::vector<InputSpec> requestedTIMs;
3943
std::vector<OutputSpec> providedOutputObjHist;
4044
std::vector<InputSpec> spawnerInputs;
4145

46+
// These are the timestamped tables which are required to
47+
// inject the the CCDB objecs.
48+
std::vector<InputSpec> analysisCCDBInputs;
49+
4250
// Needed to created the hist writer
4351
std::vector<OutputTaskInfo> outTskMap;
4452
std::vector<OutputObjectInfo> outObjHistMap;

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#ifndef o2_framework_AnalysisHelpers_H_DEFINED
1212
#define o2_framework_AnalysisHelpers_H_DEFINED
1313

14+
#include "ConfigParamSpec.h"
1415
#include "Framework/ASoA.h"
1516
#include "Framework/DataAllocator.h"
1617
#include "Framework/IndexBuilderHelpers.h"
@@ -49,6 +50,19 @@ inline constexpr auto getSources()
4950
}.template operator()<T::sources.size(), T::sources>();
5051
}
5152

53+
template <soa::with_ccdb_urls T>
54+
inline constexpr auto getCCDBUrls()
55+
{
56+
std::vector<framework::ConfigParamSpec> result;
57+
for (size_t i = 0; i < T::ccdb_urls.size(); ++i) {
58+
result.push_back({std::string{"ccdb:"} + std::string{T::ccdb_bindings[i]},
59+
framework::VariantType::String,
60+
T::ccdb_urls[i],
61+
{"\"\""}});
62+
}
63+
return result;
64+
}
65+
5266
template <soa::with_sources T>
5367
constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
5468
{
@@ -67,18 +81,40 @@ constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
6781
{
6882
return {};
6983
}
84+
85+
template <soa::with_ccdb_urls T>
86+
constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
87+
{
88+
std::vector<framework::ConfigParamSpec> results = getCCDBUrls<T>();
89+
std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
90+
auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
91+
results.erase(last, results.end());
92+
return results;
93+
}
94+
95+
template <typename T>
96+
constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
97+
{
98+
return {};
99+
}
70100
} // namespace
71101

72102
template <TableRef R>
73103
constexpr auto tableRef2InputSpec()
74104
{
105+
std::vector<framework::ConfigParamSpec> metadata;
106+
auto m = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
107+
metadata.insert(metadata.end(), m.begin(), m.end());
108+
auto ccdbMetadata = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
109+
metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
110+
75111
return framework::InputSpec{
76112
o2::aod::label<R>(),
77113
o2::aod::origin<R>(),
78114
o2::aod::description(o2::aod::signature<R>()),
79115
R.version,
80116
framework::Lifetime::Timeframe,
81-
getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>()};
117+
metadata};
82118
}
83119

84120
template <TableRef R>

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ struct AnalysisSupportHelpers {
3939
std::vector<InputSpec> const& requestedSpecials,
4040
std::vector<InputSpec>& requestedAODs,
4141
DataProcessorSpec& publisher);
42+
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector<OutputSpec> const& providedSpecials,
43+
std::vector<InputSpec> const& requestedSpecials,
44+
std::vector<InputSpec>& requestedAODs,
45+
std::vector<InputSpec>& requestedDYNs,
46+
DataProcessorSpec& publisher);
4247
static void addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
4348
std::vector<InputSpec>& requestedAODs,
4449
std::vector<InputSpec>& requestedDYNs,

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ concept is_enumeration = is_enumeration_v<std::decay_t<T>>;
6565

6666
// Helper struct which builds a DataProcessorSpec from
6767
// the contents of an AnalysisTask...
68-
namespace {
68+
namespace
69+
{
6970
struct AnalysisDataProcessorBuilder {
7071
template <soa::is_iterator G, typename... Args>
7172
static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled)
@@ -417,7 +418,7 @@ struct AnalysisDataProcessorBuilder {
417418
std::invoke(processingFunction, task, g, std::get<A>(at)...);
418419
}
419420
};
420-
}
421+
} // namespace
421422

422423
struct SetDefaultProcesses {
423424
std::vector<std::pair<std::string, bool>> map;
@@ -429,7 +430,8 @@ struct TaskName {
429430
std::string value;
430431
};
431432

432-
namespace {
433+
namespace
434+
{
433435
template <typename T, typename... A>
434436
auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args)
435437
{
@@ -493,7 +495,7 @@ auto getTaskNameSetProcesses(std::string& outputName, A... args)
493495
return task;
494496
}
495497

496-
}
498+
} // namespace
497499

498500
/// Adaptor to make an AlgorithmSpec from a o2::framework::Task
499501
///

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,35 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
207207
}
208208
}
209209

210+
void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
211+
std::vector<OutputSpec> const& providedSpecials,
212+
std::vector<InputSpec> const& requestedSpecials,
213+
std::vector<InputSpec>& requestedAODs,
214+
std::vector<InputSpec>& requestedDYNs,
215+
DataProcessorSpec& publisher)
216+
{
217+
for (auto& input : requestedSpecials) {
218+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
219+
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
220+
// FIXME: good enough for now...
221+
for (auto& i : input.metadata) {
222+
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
223+
auto value = i.defaultValue.get<std::string>();
224+
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
225+
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
226+
if (j == publisher.inputs.end()) {
227+
publisher.inputs.push_back(spec);
228+
}
229+
if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
230+
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
231+
} else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
232+
DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
233+
}
234+
}
235+
}
236+
}
237+
}
238+
210239
// =============================================================================
211240
DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
212241
{

0 commit comments

Comments
 (0)