Skip to content

Commit 478251f

Browse files
committed
DPL: adapt reconstruction CCDB Fetcher to use the new CCDBFetcherHelper
1 parent e26ee52 commit 478251f

File tree

1 file changed

+39
-284
lines changed

1 file changed

+39
-284
lines changed

Framework/CCDBSupport/src/CCDBHelpers.cxx

Lines changed: 39 additions & 284 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
// All rights not expressly granted are reserved.
44
//
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "CCDBHelpers.h"
13+
#include "CCDBFetcherHelper.h"
1314
#include "Framework/DeviceSpec.h"
1415
#include "Framework/Logger.h"
1516
#include "Framework/TimingInfo.h"
@@ -22,171 +23,21 @@
2223
#include "Framework/Signpost.h"
2324
#include <TError.h>
2425
#include <TMemFile.h>
26+
#include <ctime>
2527

2628
O2_DECLARE_DYNAMIC_LOG(ccdb);
2729

2830
namespace o2::framework
2931
{
3032

31-
struct CCDBFetcherHelper {
32-
struct CCDBCacheInfo {
33-
std::string etag;
34-
size_t cacheValidUntil = 0;
35-
size_t cachePopulatedAt = 0;
36-
size_t cacheMiss = 0;
37-
size_t cacheHit = 0;
38-
size_t minSize = -1ULL;
39-
size_t maxSize = 0;
40-
int lastCheckedTF = 0;
41-
};
42-
43-
struct RemapMatcher {
44-
std::string path;
45-
};
46-
47-
struct RemapTarget {
48-
std::string url;
49-
};
50-
51-
std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
52-
std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
53-
std::string createdNotBefore = "0";
54-
std::string createdNotAfter = "3385078236000";
55-
std::unordered_map<std::string, o2::ccdb::CcdbApi> apis;
56-
std::vector<OutputRoute> routes;
57-
std::unordered_map<std::string, std::string> remappings;
58-
uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check
59-
int queryPeriodGlo = 1;
60-
int queryPeriodFactor = 1;
61-
int64_t timeToleranceMS = 5000;
62-
63-
o2::ccdb::CcdbApi& getAPI(const std::string& path)
64-
{
65-
// find the first = sign in the string. If present drop everything after it
66-
// and between it and the previous /.
67-
auto pos = path.find('=');
68-
if (pos == std::string::npos) {
69-
auto entry = remappings.find(path);
70-
return apis[entry == remappings.end() ? "" : entry->second];
71-
}
72-
auto pos2 = path.rfind('/', pos);
73-
if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
74-
throw runtime_error_f("Malformed path %s", path.c_str());
75-
}
76-
auto entry = remappings.find(path.substr(0, pos2));
77-
return apis[entry == remappings.end() ? "" : entry->second];
78-
}
79-
};
80-
81-
bool isPrefix(std::string_view prefix, std::string_view full)
33+
// Fill valid routes. Here we consider only the routes which have
34+
// Lifetime::Condition. Notice the way we do it for analysis will be
35+
// different.
36+
namespace
8237
{
83-
return prefix == full.substr(0, prefix.size());
84-
}
85-
86-
CCDBHelpers::ParserResult CCDBHelpers::parseRemappings(char const* str)
87-
{
88-
std::unordered_map<std::string, std::string> remappings;
89-
std::string currentUrl = "";
90-
91-
enum ParsingStates {
92-
IN_BEGIN,
93-
IN_BEGIN_URL,
94-
IN_BEGIN_TARGET,
95-
IN_END_TARGET,
96-
IN_END_URL
97-
};
98-
ParsingStates state = IN_BEGIN;
99-
100-
while (true) {
101-
switch (state) {
102-
case IN_BEGIN: {
103-
if (*str == 0) {
104-
return {remappings, ""};
105-
}
106-
state = IN_BEGIN_URL;
107-
}
108-
case IN_BEGIN_URL: {
109-
if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
110-
return {remappings, "URL should start with either http:// or https:// or file://"};
111-
}
112-
state = IN_END_URL;
113-
} break;
114-
case IN_END_URL: {
115-
char const* c = strchr(str, '=');
116-
if (c == nullptr) {
117-
return {remappings, "Expecting at least one target path, missing `='?"};
118-
}
119-
if ((c - str) == 0) {
120-
return {remappings, "Empty url"};
121-
}
122-
currentUrl = std::string_view(str, c - str);
123-
state = IN_BEGIN_TARGET;
124-
str = c + 1;
125-
} break;
126-
case IN_BEGIN_TARGET: {
127-
if (*str == 0) {
128-
return {remappings, "Empty target"};
129-
}
130-
state = IN_END_TARGET;
131-
} break;
132-
case IN_END_TARGET: {
133-
char const* c = strpbrk(str, ",;");
134-
if (c == nullptr) {
135-
if (remappings.count(str)) {
136-
return {remappings, fmt::format("Path {} requested more than once.", str)};
137-
}
138-
remappings[std::string(str)] = currentUrl;
139-
return {remappings, ""};
140-
}
141-
if ((c - str) == 0) {
142-
return {remappings, "Empty target"};
143-
}
144-
auto key = std::string(str, c - str);
145-
if (remappings.count(str)) {
146-
return {remappings, fmt::format("Path {} requested more than once.", key)};
147-
}
148-
remappings[key] = currentUrl;
149-
if (*c == ';') {
150-
state = IN_BEGIN_URL;
151-
} else {
152-
state = IN_BEGIN_TARGET;
153-
}
154-
str = c + 1;
155-
} break;
156-
}
157-
}
158-
}
159-
160-
void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options, std::vector<o2::framework::OutputRoute> const& outputRoutes)
38+
void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes)
16139
{
16240
std::unordered_map<std::string, bool> accountedSpecs;
163-
auto defHost = options.get<std::string>("condition-backend");
164-
auto checkRate = options.get<int>("condition-tf-per-query");
165-
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
166-
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
167-
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
168-
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
169-
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
170-
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
171-
auto remapString = options.get<std::string>("condition-remap");
172-
CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str());
173-
if (!result.error.empty()) {
174-
throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
175-
}
176-
helper.remappings = result.remappings;
177-
helper.apis[""].init(defHost); // default backend
178-
LOGP(info, "Initialised default CCDB host {}", defHost);
179-
//
180-
for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping
181-
if (helper.apis.find(entry.second) == helper.apis.end()) {
182-
helper.apis[entry.second].init(entry.second);
183-
LOGP(info, "Initialised custom CCDB host {}", entry.second);
184-
}
185-
LOGP(info, "{} is remapped to {}", entry.first, entry.second);
186-
}
187-
helper.createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
188-
helper.createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
189-
19041
for (auto& route : outputRoutes) {
19142
if (route.matcher.lifetime != Lifetime::Condition) {
19243
continue;
@@ -205,6 +56,7 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti
20556
}
20657
}
20758
}
59+
} // namespace
20860

20961
auto getOrbitResetTime(o2::pmr::vector<char> const& v) -> Long64_t
21062
{
@@ -225,136 +77,12 @@ auto getOrbitResetTime(o2::pmr::vector<char> const& v) -> Long64_t
22577
return (*ctp)[0];
22678
};
22779

228-
bool isOnlineRun(DataTakingContext const& dtc)
229-
{
230-
return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
231-
}
232-
233-
auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
234-
int64_t timestamp,
235-
TimingInfo& timingInfo,
236-
DataTakingContext& dtc,
237-
DataAllocator& allocator) -> void
238-
{
239-
std::string ccdbMetadataPrefix = "ccdb-metadata-";
240-
int objCnt = -1;
241-
// We use the timeslice, so that we hook into the same interval as the rest of the
242-
// callback.
243-
static bool isOnline = isOnlineRun(dtc);
244-
245-
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
246-
O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
247-
for (auto& route : helper->routes) {
248-
int64_t timestampToUse = timestamp;
249-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data());
250-
objCnt++;
251-
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
252-
Output output{concrete.origin, concrete.description, concrete.subSpec};
253-
auto&& v = allocator.makeVector<char>(output);
254-
std::map<std::string, std::string> metadata;
255-
std::map<std::string, std::string> headers;
256-
std::string path = "";
257-
std::string etag = "";
258-
int chRate = helper->queryPeriodGlo;
259-
bool checkValidity = false;
260-
for (auto& meta : route.matcher.metadata) {
261-
if (meta.name == "ccdb-path") {
262-
path = meta.defaultValue.get<std::string>();
263-
} else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get<int>() > 0) {
264-
if (meta.defaultValue.get<int>() == 1) {
265-
metadata["runNumber"] = dtc.runNumber;
266-
} else if (meta.defaultValue.get<int>() == 2) {
267-
timestampToUse = std::stoi(dtc.runNumber);
268-
} else {
269-
LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
270-
}
271-
} else if (isPrefix(ccdbMetadataPrefix, meta.name)) {
272-
std::string key = meta.name.substr(ccdbMetadataPrefix.size());
273-
auto value = meta.defaultValue.get<std::string>();
274-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
275-
metadata[key] = value;
276-
} else if (meta.name == "ccdb-query-rate") {
277-
chRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
278-
}
279-
}
280-
const auto url2uuid = helper->mapURL2UUID.find(path);
281-
if (url2uuid != helper->mapURL2UUID.end()) {
282-
etag = url2uuid->second.etag;
283-
// We check validity every chRate timeslices or if the cache is expired
284-
uint64_t validUntil = url2uuid->second.cacheValidUntil;
285-
// When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
286-
uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
287-
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
288-
// when online.
289-
bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
290-
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
291-
} else {
292-
checkValidity = true; // never skip check if the cache is empty
293-
}
294-
295-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
296-
297-
const auto& api = helper->getAPI(path);
298-
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
299-
LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
300-
api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
301-
if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
302-
LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse);
303-
// FIXME: I should send a dummy message.
304-
continue;
305-
}
306-
// printing in case we find a default entry
307-
if (headers.find("default") != headers.end()) {
308-
LOGP(detail, "******** Default entry used for {} ********", path);
309-
}
310-
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
311-
if (etag.empty()) {
312-
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
313-
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
314-
helper->mapURL2UUID[path].cacheMiss++;
315-
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
316-
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
317-
api.appendFlatHeader(v, headers);
318-
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
319-
helper->mapURL2DPLCache[path] = cacheId;
320-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
321-
continue;
322-
}
323-
if (v.size()) { // but should be overridden by fresh object
324-
// somewhere here pruneFromCache should be called
325-
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
326-
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
327-
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
328-
helper->mapURL2UUID[path].cacheMiss++;
329-
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
330-
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
331-
api.appendFlatHeader(v, headers);
332-
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
333-
helper->mapURL2DPLCache[path] = cacheId;
334-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
335-
// one could modify the adoptContainer to take optional old cacheID to clean:
336-
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
337-
continue;
338-
} else {
339-
// Only once the etag is actually used, we get the information on how long the object is valid
340-
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
341-
}
342-
}
343-
// cached object is fine
344-
auto cacheId = helper->mapURL2DPLCache[path];
345-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
346-
helper->mapURL2UUID[path].cacheHit++;
347-
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
348-
// the outputBuffer was not used, can we destroy it?
349-
}
350-
O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
351-
};
352-
35380
AlgorithmSpec CCDBHelpers::fetchFromCCDB()
35481
{
35582
return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
35683
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
357-
initialiseHelper(*helper, options, spec.outputs);
84+
CCDBFetcherHelper::initialiseHelper(*helper, options);
85+
fillValidRoutes(*helper, spec.outputs);
35886
/// Add a callback on stop which dumps the statistics for the caching per
35987
/// path
36088
callbacks.set<CallbackService::Id::Stop>([helper]() {
@@ -453,8 +181,35 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
453181
// Fetch the rest of the objects.
454182
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
455183
dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
184+
std::vector<CCDBFetcherHelper::FetchOp> ops;
185+
int runNumber = 0;
186+
std::string ccdbMetadataPrefix = "ccdb-metadata-";
187+
for (auto &route : helper->routes) {
188+
CCDBFetcherHelper::FetchOp op{.spec = route.matcher, .timestamp = timestamp, .runNumber = std::stoi(dtc.runNumber)};
189+
for (auto& meta : route.matcher.metadata) {
190+
if (meta.name == "ccdb-path") {
191+
op.url = meta.defaultValue.get<std::string>();
192+
} else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get<int>() > 0) {
193+
if (meta.defaultValue.get<int>() == 1) {
194+
op.runNumber = runNumber;
195+
} else if (meta.defaultValue.get<int>() == 2) {
196+
op.timestamp = runNumber;
197+
} else {
198+
LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}", meta.defaultValue.get<int>(), DataSpecUtils::describe(route.matcher));
199+
}
200+
} else if (meta.name.starts_with(ccdbMetadataPrefix)) {
201+
std::string key = meta.name.substr(ccdbMetadataPrefix.size());
202+
auto value = meta.defaultValue.get<std::string>();
203+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
204+
op.metadata.push_back({key, value});
205+
} else if (meta.name == "ccdb-query-rate") {
206+
op.queryRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
207+
}
208+
}
209+
ops.push_back(op);
210+
}
456211

457-
populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
212+
CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
458213
O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
459214
}); });
460215
}

0 commit comments

Comments
 (0)