Skip to content

Commit 71abb21

Browse files
committed
WIP DPL Analysis: centralised CCDB support in analysis
Thanks to the newly added binary view columns we can finally support proper CCDB integration in analysis. In order to do so, the user needs to create a TIMESTAMPED table, i.e. a table which is an extension of another one where the timestamps for each rows are provided. The extra columns of such timestamped table will be CCDB columns where the iterator of each provides access for one specified CCDB object.
1 parent d84a22c commit 71abb21

18 files changed

+1040
-41
lines changed
Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,14 +9,21 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
14+
src/CCDBFetcherHelper.cxx
1415
src/CCDBHelpers.cxx
16+
src/AnalysisCCDBHelpers.cxx
1517
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1618
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
1719

18-
o2_add_test(CCDBHelpers NAME test_Framework_test_CCDBHelpers
19-
SOURCES test/test_CCDBHelpers.cxx
20-
COMPONENT_NAME Framework
21-
LABELS framework
22-
PUBLIC_LINK_LIBRARIES O2::Framework O2::FrameworkCCDBSupport)
20+
add_executable(o2-test-framework-ccdbsupport
21+
test/test_CCDBHelpers.cxx)
22+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Framework)
23+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::FrameworkCCDBSupport)
24+
target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Catch2)
25+
26+
get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
27+
set_property(TARGET o2-test-framework-ccdbsupport PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
28+
29+
add_test(NAME framework:ccdbsupport COMMAND o2-test-framework-ccdbsupport --skip-benchmarks)
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "AnalysisCCDBHelpers.h"
13+
#include "CCDBFetcherHelper.h"
14+
#include "Framework/DeviceSpec.h"
15+
#include "Framework/TimingInfo.h"
16+
#include "Framework/ConfigParamRegistry.h"
17+
#include "Framework/DataTakingContext.h"
18+
#include "Framework/RawDeviceService.h"
19+
#include "Framework/Output.h"
20+
#include "Framework/Signpost.h"
21+
#include "Framework/AnalysisContext.h"
22+
#include "Framework/ConfigContext.h"
23+
#include "Framework/ConfigContext.h"
24+
#include <arrow/array/builder_binary.h>
25+
#include <arrow/type.h>
26+
#include <arrow/type_fwd.h>
27+
#include <arrow/util/key_value_metadata.h>
28+
#include <arrow/table.h>
29+
#include <arrow/array.h>
30+
#include <arrow/builder.h>
31+
#include <fmt/base.h>
32+
#include <ctime>
33+
#include <iostream>
34+
#include <memory>
35+
#include <unordered_map>
36+
37+
O2_DECLARE_DYNAMIC_LOG(ccdb);
38+
39+
namespace o2::framework
40+
{
41+
// Fill valid routes. Notice that for analysis the timestamps are associated to
42+
// a TIM table and there might be multiple CCDB objects of the same kind for
43+
// dataframe.
44+
// For this reason rather than matching the Lifetime::Condition, we match the
45+
// origin.
46+
namespace
47+
{
48+
void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
49+
{
50+
for (auto& route : outputRoutes) {
51+
auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher);
52+
if (originMatcher.origin != header::DataOrigin{"TIM"}) {
53+
continue;
54+
}
55+
auto specStr = DataSpecUtils::describe(route.matcher);
56+
if (bindings.find(specStr) != bindings.end()) {
57+
continue;
58+
}
59+
std::cout << specStr << " is at " << helper.routes.size() << std::endl;
60+
bindings[specStr] = helper.routes.size();
61+
helper.routes.push_back(route);
62+
LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
63+
for (auto& metadata : route.matcher.metadata) {
64+
if (metadata.type == VariantType::String) {
65+
LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
66+
}
67+
}
68+
}
69+
}
70+
} // namespace
71+
72+
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
73+
{
74+
auto& ac = ctx.services().get<AnalysisContext>();
75+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
76+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
77+
78+
for (auto& input : ac.analysisCCDBInputs) {
79+
std::vector<std::shared_ptr<arrow::Field>> fields;
80+
std::cout << input << std::endl;
81+
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
82+
schemaMetadata->Append("outputBinding", input.binding);
83+
84+
for (auto& m : input.metadata) {
85+
// Save the list of input tables
86+
if (m.name.starts_with("input:")) {
87+
auto name = m.name.substr(6);
88+
schemaMetadata->Append("sourceTable", name);
89+
std::cout << "sourceTable:" << name << " " << m.defaultValue.asString() << std::endl;
90+
continue;
91+
}
92+
// Ignore the non ccdb: entries
93+
if (!m.name.starts_with("ccdb:")) {
94+
continue;
95+
}
96+
// Create the schema of the output
97+
std::cout << m.name << " " << m.defaultValue.asString() << std::endl;
98+
99+
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
100+
metadata->Append("url", m.defaultValue.asString());
101+
std::cout << "url" << " " << m.defaultValue.asString() << std::endl;
102+
auto columnName = m.name.substr(strlen("ccdb:"));
103+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
104+
}
105+
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
106+
}
107+
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
108+
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
109+
CCDBFetcherHelper::initialiseHelper(*helper, options);
110+
std::unordered_map<std::string, int> bindings;
111+
fillValidRoutes(*helper, spec.outputs, bindings);
112+
113+
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
114+
std::cout << " Also executed" << std::endl;
115+
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
116+
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
117+
for (auto& schema : schemas) {
118+
std::vector<CCDBFetcherHelper::FetchOp> ops;
119+
auto inputBinding = *schema->metadata()->Get("sourceTable");
120+
auto outRouteDesc = *schema->metadata()->Get("outputRoute");
121+
std::string outBinding = *schema->metadata()->Get("outputBinding");
122+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
123+
"Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
124+
outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
125+
auto ref = inputs.get<TableConsumer>(inputBinding);
126+
auto table = ref->asArrowTable();
127+
std::cout << table->ToString() << std::endl;
128+
// FIXME: make the fTimestamp column configurable.
129+
auto timestampColumn = table->GetColumnByName("fTimestamp");
130+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
131+
"There are %zu bindings available", bindings.size());
132+
for (auto& binding : bindings) {
133+
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
134+
"* %{public}s: %d",
135+
binding.first.c_str(), binding.second);
136+
}
137+
std::cout << " output route is " << outRouteDesc << std::endl;
138+
std::cout << " output binding is " << outBinding << std::endl;
139+
int outputRouteIndex = bindings.at(outRouteDesc);
140+
std::cout << " Index for output is " << outputRouteIndex << std::endl;
141+
auto& spec = helper->routes[outputRouteIndex].matcher;
142+
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
143+
for (auto &_ : schema->fields()) {
144+
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
145+
}
146+
147+
for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
148+
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
149+
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
150+
151+
for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
152+
ops.clear();
153+
int64_t timestamp = timestamps[ri];
154+
for (auto& field : schema->fields()) {
155+
auto url = *field->metadata()->Get("url");
156+
std::cout << "Fetching " << field->name() << " from " << url << "/" << timestamp << std::endl;
157+
// Time to actually populate the blob
158+
ops.push_back({
159+
.spec = spec,
160+
.url = url,
161+
.timestamp = timestamp,
162+
.runNumber = 1,
163+
.runDependent = 0,
164+
.queryRate = 0,
165+
});
166+
}
167+
auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
168+
O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
169+
"Got %zu responses from server.",
170+
responses.size());
171+
if (builders.size() != responses.size()) {
172+
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
173+
}
174+
arrow::Status result;
175+
for (size_t bi = 0; bi < responses.size(); bi++) {
176+
auto &builder = builders[bi];
177+
auto &response = responses[bi];
178+
char const* address = reinterpret_cast<char const*>(response.id.value);
179+
std::cout << "Inserting view (" << (void*)address << "," << response.size << ")" << std::endl;
180+
result &= builder->Append(std::string_view(address, response.size));
181+
}
182+
if (!result.ok()) {
183+
LOGP(fatal, "Error adding results from CCDB");
184+
}
185+
O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
186+
}
187+
}
188+
arrow::ArrayVector arrays;
189+
for (auto &builder : builders) {
190+
arrays.push_back(*builder->Finish());
191+
}
192+
auto outTable = arrow::Table::Make(schema, arrays);
193+
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
194+
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
195+
}
196+
197+
O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
198+
});
199+
});
200+
}
201+
202+
} // namespace o2::framework
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_
12+
#define O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_
13+
14+
#include "Framework/AlgorithmSpec.h"
15+
16+
namespace o2::framework
17+
{
18+
19+
struct AnalysisCCDBHelpers {
20+
static AlgorithmSpec fetchFromCCDB(ConfigContext const&ctx);
21+
};
22+
23+
} // namespace o2::framework
24+
25+
#endif // O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_

0 commit comments

Comments
 (0)