Skip to content

Commit ebc7444

Browse files
committed
DPL CCDB: move helper class to shared header
So that I can reuse it for the analysis code.
1 parent 3bf214c commit ebc7444

File tree

4 files changed

+392
-179
lines changed

4 files changed

+392
-179
lines changed

Framework/CCDBSupport/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,8 +9,9 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
14+
src/CCDBFetcherHelper.cxx
1415
src/CCDBHelpers.cxx
1516
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1617
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "CCDBFetcherHelper.h"
12+
#include "Framework/DataTakingContext.h"
13+
#include "Framework/Signpost.h"
14+
#include "Framework/DataSpecUtils.h"
15+
#include "Framework/ConfigParamRegistry.h"
16+
#include <TError.h>
17+
#include <TMemFile.h>
18+
19+
O2_DECLARE_DYNAMIC_LOG(ccdb);
20+
21+
namespace o2::framework
22+
{
23+
24+
o2::ccdb::CcdbApi& CCDBFetcherHelper::getAPI(const std::string& path)
25+
{
26+
// find the first = sign in the string. If present drop everything after it
27+
// and between it and the previous /.
28+
auto pos = path.find('=');
29+
if (pos == std::string::npos) {
30+
auto entry = remappings.find(path);
31+
return apis[entry == remappings.end() ? "" : entry->second];
32+
}
33+
auto pos2 = path.rfind('/', pos);
34+
if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
35+
throw runtime_error_f("Malformed path %s", path.c_str());
36+
}
37+
auto entry = remappings.find(path.substr(0, pos2));
38+
return apis[entry == remappings.end() ? "" : entry->second];
39+
}
40+
41+
bool isOnlineRun(DataTakingContext const& dtc)
42+
{
43+
return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
44+
}
45+
46+
void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options)
47+
{
48+
auto defHost = options.get<std::string>("condition-backend");
49+
auto checkRate = options.get<int>("condition-tf-per-query");
50+
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
51+
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
52+
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
53+
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
54+
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
55+
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
56+
auto remapString = options.get<std::string>("condition-remap");
57+
ParserResult result = parseRemappings(remapString.c_str());
58+
if (!result.error.empty()) {
59+
throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
60+
}
61+
helper.remappings = result.remappings;
62+
helper.apis[""].init(defHost); // default backend
63+
LOGP(info, "Initialised default CCDB host {}", defHost);
64+
//
65+
for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping
66+
if (helper.apis.find(entry.second) == helper.apis.end()) {
67+
helper.apis[entry.second].init(entry.second);
68+
LOGP(info, "Initialised custom CCDB host {}", entry.second);
69+
}
70+
LOGP(info, "{} is remapped to {}", entry.first, entry.second);
71+
}
72+
helper.createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
73+
helper.createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
74+
}
75+
76+
CCDBFetcherHelper::ParserResult CCDBFetcherHelper::parseRemappings(char const* str)
77+
{
78+
std::unordered_map<std::string, std::string> remappings;
79+
std::string currentUrl = "";
80+
81+
enum ParsingStates {
82+
IN_BEGIN,
83+
IN_BEGIN_URL,
84+
IN_BEGIN_TARGET,
85+
IN_END_TARGET,
86+
IN_END_URL
87+
};
88+
ParsingStates state = IN_BEGIN;
89+
90+
while (true) {
91+
switch (state) {
92+
case IN_BEGIN: {
93+
if (*str == 0) {
94+
return {remappings, ""};
95+
}
96+
state = IN_BEGIN_URL;
97+
}
98+
case IN_BEGIN_URL: {
99+
if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
100+
return {remappings, "URL should start with either http:// or https:// or file://"};
101+
}
102+
state = IN_END_URL;
103+
} break;
104+
case IN_END_URL: {
105+
char const* c = strchr(str, '=');
106+
if (c == nullptr) {
107+
return {remappings, "Expecting at least one target path, missing `='?"};
108+
}
109+
if ((c - str) == 0) {
110+
return {remappings, "Empty url"};
111+
}
112+
currentUrl = std::string_view(str, c - str);
113+
state = IN_BEGIN_TARGET;
114+
str = c + 1;
115+
} break;
116+
case IN_BEGIN_TARGET: {
117+
if (*str == 0) {
118+
return {remappings, "Empty target"};
119+
}
120+
state = IN_END_TARGET;
121+
} break;
122+
case IN_END_TARGET: {
123+
char const* c = strpbrk(str, ",;");
124+
if (c == nullptr) {
125+
if (remappings.count(str)) {
126+
return {remappings, fmt::format("Path {} requested more than once.", str)};
127+
}
128+
remappings[std::string(str)] = currentUrl;
129+
return {remappings, ""};
130+
}
131+
if ((c - str) == 0) {
132+
return {remappings, "Empty target"};
133+
}
134+
auto key = std::string(str, c - str);
135+
if (remappings.count(str)) {
136+
return {remappings, fmt::format("Path {} requested more than once.", key)};
137+
}
138+
remappings[key] = currentUrl;
139+
if (*c == ';') {
140+
state = IN_BEGIN_URL;
141+
} else {
142+
state = IN_BEGIN_TARGET;
143+
}
144+
str = c + 1;
145+
} break;
146+
}
147+
}
148+
}
149+
150+
auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
151+
std::vector<CCDBFetcherHelper::FetchOp> const& ops,
152+
TimingInfo& timingInfo,
153+
DataTakingContext& dtc,
154+
DataAllocator& allocator) -> std::vector<CCDBFetcherHelper::Response>
155+
{
156+
O2_LOG_ENABLE(ccdb);
157+
int objCnt = -1;
158+
// We use the timeslice, so that we hook into the same interval as the rest of the
159+
// callback.
160+
static bool isOnline = isOnlineRun(dtc);
161+
162+
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
163+
O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
164+
std::vector<Response> responses;
165+
for (auto& op : ops) {
166+
int64_t timestampToUse = op.timestamp;
167+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(op.spec).data());
168+
objCnt++;
169+
auto concrete = DataSpecUtils::asConcreteDataMatcher(op.spec);
170+
Output output{concrete.origin, concrete.description, concrete.subSpec};
171+
auto&& v = allocator.makeVector<char>(output);
172+
std::map<std::string, std::string> metadata;
173+
std::map<std::string, std::string> headers;
174+
std::string path = op.url;
175+
std::string etag = "";
176+
int chRate = helper->queryPeriodGlo;
177+
bool checkValidity = false;
178+
if (op.runDependent > 0) {
179+
if (op.runDependent == 1) {
180+
metadata["runNumber"] = std::format("{}", op.runNumber);
181+
} else if (op.runDependent == 2) {
182+
timestampToUse = op.runNumber;
183+
} else {
184+
LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", op.runDependent,
185+
concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
186+
}
187+
}
188+
for (auto m : op.metadata) {
189+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", m.key.data(), m.value.data());
190+
metadata[m.key] = m.value;
191+
}
192+
if (op.queryRate != 0) {
193+
chRate = op.queryRate * helper->queryPeriodFactor;
194+
}
195+
196+
const auto url2uuid = helper->mapURL2UUID.find(path);
197+
if (url2uuid != helper->mapURL2UUID.end()) {
198+
etag = url2uuid->second.etag;
199+
// We check validity every chRate timeslices or if the cache is expired
200+
uint64_t validUntil = url2uuid->second.cacheValidUntil;
201+
// When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
202+
uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
203+
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
204+
// when online.
205+
bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt);
206+
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
207+
} else {
208+
checkValidity = true; // never skip check if the cache is empty
209+
}
210+
211+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
212+
213+
const auto& api = helper->getAPI(path);
214+
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
215+
LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
216+
api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
217+
if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
218+
LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse);
219+
// FIXME: I should send a dummy message.
220+
continue;
221+
}
222+
// printing in case we find a default entry
223+
if (headers.find("default") != headers.end()) {
224+
LOGP(detail, "******** Default entry used for {} ********", path);
225+
}
226+
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
227+
if (etag.empty()) {
228+
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
229+
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
230+
helper->mapURL2UUID[path].cacheMiss++;
231+
helper->mapURL2UUID[path].size = v.size();
232+
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
233+
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
234+
auto size = v.size();
235+
api.appendFlatHeader(v, headers);
236+
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
237+
helper->mapURL2DPLCache[path] = cacheId;
238+
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
239+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size);
240+
continue;
241+
}
242+
if (v.size()) { // but should be overridden by fresh object
243+
// somewhere here pruneFromCache should be called
244+
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
245+
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
246+
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
247+
helper->mapURL2UUID[path].cacheMiss++;
248+
helper->mapURL2UUID[path].size = v.size();
249+
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
250+
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
251+
auto size = v.size();
252+
api.appendFlatHeader(v, headers);
253+
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
254+
helper->mapURL2DPLCache[path] = cacheId;
255+
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
256+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
257+
// one could modify the adoptContainer to take optional old cacheID to clean:
258+
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
259+
continue;
260+
} else {
261+
// Only once the etag is actually used, we get the information on how long the object is valid
262+
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
263+
}
264+
}
265+
// cached object is fine
266+
auto cacheId = helper->mapURL2DPLCache[path];
267+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
268+
helper->mapURL2UUID[path].cacheHit++;
269+
responses.emplace_back(Response{.id = cacheId, .size = helper->mapURL2UUID[path].size, .request = nullptr});
270+
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
271+
// the outputBuffer was not used, can we destroy it?
272+
}
273+
O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
274+
return responses;
275+
};
276+
277+
} // namespace o2::framework

0 commit comments

Comments
 (0)