Skip to content

Commit d375e63

Browse files
authored
DPL Analysis: centralised CCDB support in analysis (#14567)
Thanks to the newly added binary view columns we can finally support proper CCDB integration in analysis. In order to do so, the user needs to create a TIMESTAMPED table, i.e. a table which is an extension of another one where the timestamps for each rows are provided. The extra columns of such timestamped table will be CCDB columns where the iterator of each provides access for one specified CCDB object. Notice the PR duplicates the CCDB code run in reconstruction and the additional device will never be added to the topology if running online, so there are no expected side effects.
1 parent 012946b commit d375e63

18 files changed

+1027
-42
lines changed
Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,14 +9,21 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
14+
src/CCDBFetcherHelper.cxx
1415
src/CCDBHelpers.cxx
16+
src/AnalysisCCDBHelpers.cxx
1517
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1618
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
1719

18-
o2_add_test(CCDBHelpers NAME test_Framework_test_CCDBHelpers
19-
SOURCES test/test_CCDBHelpers.cxx
20-
COMPONENT_NAME Framework
21-
LABELS framework
22-
PUBLIC_LINK_LIBRARIES O2::Framework O2::FrameworkCCDBSupport)
20+
add_executable(o2-test-framework-ccdbsupport
21+
test/test_CCDBHelpers.cxx)
22+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Framework)
23+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::FrameworkCCDBSupport)
24+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Catch2)
25+
26+
get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
27+
set_property(TARGET o2-test-framework-ccdbsupport PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
28+
29+
add_test(NAME framework:ccdbsupport COMMAND o2-test-framework-ccdbsupport --skip-benchmarks)
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "AnalysisCCDBHelpers.h"
13+
#include "CCDBFetcherHelper.h"
14+
#include "Framework/DeviceSpec.h"
15+
#include "Framework/TimingInfo.h"
16+
#include "Framework/ConfigParamRegistry.h"
17+
#include "Framework/DataTakingContext.h"
18+
#include "Framework/RawDeviceService.h"
19+
#include "Framework/Output.h"
20+
#include "Framework/Signpost.h"
21+
#include "Framework/AnalysisContext.h"
22+
#include "Framework/ConfigContext.h"
23+
#include "Framework/ConfigContext.h"
24+
#include <arrow/array/builder_binary.h>
25+
#include <arrow/type.h>
26+
#include <arrow/type_fwd.h>
27+
#include <arrow/util/key_value_metadata.h>
28+
#include <arrow/table.h>
29+
#include <arrow/array.h>
30+
#include <arrow/builder.h>
31+
#include <fmt/base.h>
32+
#include <ctime>
33+
#include <memory>
34+
#include <unordered_map>
35+
36+
O2_DECLARE_DYNAMIC_LOG(ccdb);
37+
38+
namespace o2::framework
39+
{
40+
// Fill valid routes. Notice that for analysis the timestamps are associated to
41+
// a ATIM table and there might be multiple CCDB objects of the same kind for
42+
// dataframe.
43+
// For this reason rather than matching the Lifetime::Condition, we match the
44+
// origin.
45+
namespace
46+
{
47+
void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
48+
{
49+
for (auto& route : outputRoutes) {
50+
auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher);
51+
if (originMatcher.origin != header::DataOrigin{"ATIM"}) {
52+
continue;
53+
}
54+
auto specStr = DataSpecUtils::describe(route.matcher);
55+
if (bindings.find(specStr) != bindings.end()) {
56+
continue;
57+
}
58+
bindings[specStr] = helper.routes.size();
59+
helper.routes.push_back(route);
60+
LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
61+
for (auto& metadata : route.matcher.metadata) {
62+
if (metadata.type == VariantType::String) {
63+
LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
64+
}
65+
}
66+
}
67+
}
68+
} // namespace
69+
70+
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
71+
{
72+
auto& ac = ctx.services().get<AnalysisContext>();
73+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
74+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
75+
76+
for (auto& input : ac.analysisCCDBInputs) {
77+
std::vector<std::shared_ptr<arrow::Field>> fields;
78+
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
79+
schemaMetadata->Append("outputBinding", input.binding);
80+
81+
for (auto& m : input.metadata) {
82+
// Save the list of input tables
83+
if (m.name.starts_with("input:")) {
84+
auto name = m.name.substr(6);
85+
schemaMetadata->Append("sourceTable", name);
86+
continue;
87+
}
88+
// Ignore the non ccdb: entries
89+
if (!m.name.starts_with("ccdb:")) {
90+
continue;
91+
}
92+
// Create the schema of the output
93+
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
94+
metadata->Append("url", m.defaultValue.asString());
95+
auto columnName = m.name.substr(strlen("ccdb:"));
96+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
97+
}
98+
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
99+
}
100+
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
101+
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
102+
CCDBFetcherHelper::initialiseHelper(*helper, options);
103+
std::unordered_map<std::string, int> bindings;
104+
fillValidRoutes(*helper, spec.outputs, bindings);
105+
106+
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
107+
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
108+
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
109+
for (auto& schema : schemas) {
110+
std::vector<CCDBFetcherHelper::FetchOp> ops;
111+
auto inputBinding = *schema->metadata()->Get("sourceTable");
112+
auto outRouteDesc = *schema->metadata()->Get("outputRoute");
113+
std::string outBinding = *schema->metadata()->Get("outputBinding");
114+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
115+
"Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
116+
outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
117+
auto ref = inputs.get<TableConsumer>(inputBinding);
118+
auto table = ref->asArrowTable();
119+
// FIXME: make the fTimestamp column configurable.
120+
auto timestampColumn = table->GetColumnByName("fTimestamp");
121+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
122+
"There are %zu bindings available", bindings.size());
123+
for (auto& binding : bindings) {
124+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
125+
"* %{public}s: %d",
126+
binding.first.c_str(), binding.second);
127+
}
128+
int outputRouteIndex = bindings.at(outRouteDesc);
129+
auto& spec = helper->routes[outputRouteIndex].matcher;
130+
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
131+
for (auto& _ : schema->fields()) {
132+
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
133+
}
134+
135+
for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
136+
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
137+
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
138+
139+
for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
140+
ops.clear();
141+
int64_t timestamp = timestamps[ri];
142+
for (auto& field : schema->fields()) {
143+
auto url = *field->metadata()->Get("url");
144+
// Time to actually populate the blob
145+
ops.push_back({
146+
.spec = spec,
147+
.url = url,
148+
.timestamp = timestamp,
149+
.runNumber = 1,
150+
.runDependent = 0,
151+
.queryRate = 0,
152+
});
153+
}
154+
auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
155+
O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
156+
"Got %zu responses from server.",
157+
responses.size());
158+
if (builders.size() != responses.size()) {
159+
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
160+
}
161+
arrow::Status result;
162+
for (size_t bi = 0; bi < responses.size(); bi++) {
163+
auto& builder = builders[bi];
164+
auto& response = responses[bi];
165+
char const* address = reinterpret_cast<char const*>(response.id.value);
166+
result &= builder->Append(std::string_view(address, response.size));
167+
}
168+
if (!result.ok()) {
169+
LOGP(fatal, "Error adding results from CCDB");
170+
}
171+
O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
172+
}
173+
}
174+
arrow::ArrayVector arrays;
175+
for (auto& builder : builders) {
176+
arrays.push_back(*builder->Finish());
177+
}
178+
auto outTable = arrow::Table::Make(schema, arrays);
179+
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
180+
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
181+
}
182+
183+
O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
184+
});
185+
});
186+
}
187+
188+
} // namespace o2::framework
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_
12+
#define O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_
13+
14+
#include "Framework/AlgorithmSpec.h"
15+
16+
namespace o2::framework
17+
{
18+
19+
struct AnalysisCCDBHelpers {
20+
static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx);
21+
};
22+
23+
} // namespace o2::framework
24+
25+
#endif // O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_

0 commit comments

Comments
 (0)