1- // Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+ // Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33// All rights not expressly granted are reserved.
44//
1010// or submit itself to any jurisdiction.
1111
1212#include " CCDBHelpers.h"
13+ #include " CCDBFetcherHelper.h"
1314#include " Framework/DeviceSpec.h"
1415#include " Framework/Logger.h"
1516#include " Framework/TimingInfo.h"
2223#include " Framework/Signpost.h"
2324#include < TError.h>
2425#include < TMemFile.h>
26+ #include < ctime>
2527
2628O2_DECLARE_DYNAMIC_LOG (ccdb);
2729
2830namespace o2 ::framework
2931{
3032
31- struct CCDBFetcherHelper {
32- struct CCDBCacheInfo {
33- std::string etag;
34- size_t cacheValidUntil = 0 ;
35- size_t cachePopulatedAt = 0 ;
36- size_t cacheMiss = 0 ;
37- size_t cacheHit = 0 ;
38- size_t minSize = -1ULL ;
39- size_t maxSize = 0 ;
40- int lastCheckedTF = 0 ;
41- };
42-
43- struct RemapMatcher {
44- std::string path;
45- };
46-
47- struct RemapTarget {
48- std::string url;
49- };
50-
51- std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
52- std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
53- std::string createdNotBefore = " 0" ;
54- std::string createdNotAfter = " 3385078236000" ;
55- std::unordered_map<std::string, o2::ccdb::CcdbApi> apis;
56- std::vector<OutputRoute> routes;
57- std::unordered_map<std::string, std::string> remappings;
58- uint32_t lastCheckedTFCounterOrbReset = 0 ; // last checkecked TFcounter for bulk check
59- int queryPeriodGlo = 1 ;
60- int queryPeriodFactor = 1 ;
61- int64_t timeToleranceMS = 5000 ;
62-
63- o2::ccdb::CcdbApi& getAPI (const std::string& path)
64- {
65- // find the first = sign in the string. If present drop everything after it
66- // and between it and the previous /.
67- auto pos = path.find (' =' );
68- if (pos == std::string::npos) {
69- auto entry = remappings.find (path);
70- return apis[entry == remappings.end () ? " " : entry->second ];
71- }
72- auto pos2 = path.rfind (' /' , pos);
73- if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0 ) {
74- throw runtime_error_f (" Malformed path %s" , path.c_str ());
75- }
76- auto entry = remappings.find (path.substr (0 , pos2));
77- return apis[entry == remappings.end () ? " " : entry->second ];
78- }
79- };
80-
81- bool isPrefix (std::string_view prefix, std::string_view full)
82- {
83- return prefix == full.substr (0 , prefix.size ());
84- }
85-
86- CCDBHelpers::ParserResult CCDBHelpers::parseRemappings (char const * str)
87- {
88- std::unordered_map<std::string, std::string> remappings;
89- std::string currentUrl = " " ;
90-
91- enum ParsingStates {
92- IN_BEGIN,
93- IN_BEGIN_URL,
94- IN_BEGIN_TARGET,
95- IN_END_TARGET,
96- IN_END_URL
97- };
98- ParsingStates state = IN_BEGIN;
99-
100- while (true ) {
101- switch (state) {
102- case IN_BEGIN: {
103- if (*str == 0 ) {
104- return {remappings, " " };
105- }
106- state = IN_BEGIN_URL;
107- }
108- case IN_BEGIN_URL: {
109- if ((strncmp (" http://" , str, 7 ) != 0 ) && (strncmp (" https://" , str, 8 ) != 0 && (strncmp (" file://" , str, 7 ) != 0 ))) {
110- return {remappings, " URL should start with either http:// or https:// or file://" };
111- }
112- state = IN_END_URL;
113- } break ;
114- case IN_END_URL: {
115- char const * c = strchr (str, ' =' );
116- if (c == nullptr ) {
117- return {remappings, " Expecting at least one target path, missing `='?" };
118- }
119- if ((c - str) == 0 ) {
120- return {remappings, " Empty url" };
121- }
122- currentUrl = std::string_view (str, c - str);
123- state = IN_BEGIN_TARGET;
124- str = c + 1 ;
125- } break ;
126- case IN_BEGIN_TARGET: {
127- if (*str == 0 ) {
128- return {remappings, " Empty target" };
129- }
130- state = IN_END_TARGET;
131- } break ;
132- case IN_END_TARGET: {
133- char const * c = strpbrk (str, " ,;" );
134- if (c == nullptr ) {
135- if (remappings.count (str)) {
136- return {remappings, fmt::format (" Path {} requested more than once." , str)};
137- }
138- remappings[std::string (str)] = currentUrl;
139- return {remappings, " " };
140- }
141- if ((c - str) == 0 ) {
142- return {remappings, " Empty target" };
143- }
144- auto key = std::string (str, c - str);
145- if (remappings.count (str)) {
146- return {remappings, fmt::format (" Path {} requested more than once." , key)};
147- }
148- remappings[key] = currentUrl;
149- if (*c == ' ;' ) {
150- state = IN_BEGIN_URL;
151- } else {
152- state = IN_BEGIN_TARGET;
153- }
154- str = c + 1 ;
155- } break ;
156- }
157- }
158- }
159-
160- void initialiseHelper (CCDBFetcherHelper& helper, ConfigParamRegistry const & options, std::vector<o2::framework::OutputRoute> const & outputRoutes)
161- {
33+ // Fill valid routes. Here we consider only the routes which have
34+ // Lifetime::Condition. Notice the way we do it for analysis will be
35+ // different.
36+ namespace {
37+ void fillValidRoutes (CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const & outputRoutes) {
16238 std::unordered_map<std::string, bool > accountedSpecs;
163- auto defHost = options.get <std::string>(" condition-backend" );
164- auto checkRate = options.get <int >(" condition-tf-per-query" );
165- auto checkMult = options.get <int >(" condition-tf-per-query-multiplier" );
166- helper.timeToleranceMS = options.get <int64_t >(" condition-time-tolerance" );
167- helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int >::max ();
168- helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1 ;
169- LOGP (info, " CCDB Backend at: {}, validity check for every {} TF{}" , defHost, helper.queryPeriodGlo , helper.queryPeriodFactor == 1 ? std::string{} : fmt::format (" , (query for high-rate objects downscaled by {})" , helper.queryPeriodFactor ));
170- LOGP (info, " Hook to enable signposts for CCDB messages at {}" , (void *)&private_o2_log_ccdb->stacktrace );
171- auto remapString = options.get <std::string>(" condition-remap" );
172- CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings (remapString.c_str ());
173- if (!result.error .empty ()) {
174- throw runtime_error_f (" Error while parsing remapping string %s" , result.error .c_str ());
175- }
176- helper.remappings = result.remappings ;
177- helper.apis [" " ].init (defHost); // default backend
178- LOGP (info, " Initialised default CCDB host {}" , defHost);
179- //
180- for (auto & entry : helper.remappings ) { // init api instances for every host seen in the remapping
181- if (helper.apis .find (entry.second ) == helper.apis .end ()) {
182- helper.apis [entry.second ].init (entry.second );
183- LOGP (info, " Initialised custom CCDB host {}" , entry.second );
184- }
185- LOGP (info, " {} is remapped to {}" , entry.first , entry.second );
186- }
187- helper.createdNotBefore = std::to_string (options.get <int64_t >(" condition-not-before" ));
188- helper.createdNotAfter = std::to_string (options.get <int64_t >(" condition-not-after" ));
189-
19039 for (auto & route : outputRoutes) {
19140 if (route.matcher .lifetime != Lifetime::Condition) {
19241 continue ;
@@ -205,6 +54,7 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti
20554 }
20655 }
20756}
57+ }
20858
20959auto getOrbitResetTime (o2::pmr::vector<char > const & v) -> Long64_t
21060{
@@ -225,136 +75,12 @@ auto getOrbitResetTime(o2::pmr::vector<char> const& v) -> Long64_t
22575 return (*ctp)[0 ];
22676};
22777
228- bool isOnlineRun (DataTakingContext const & dtc)
229- {
230- return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
231- }
232-
233- auto populateCacheWith (std::shared_ptr<CCDBFetcherHelper> const & helper,
234- int64_t timestamp,
235- TimingInfo& timingInfo,
236- DataTakingContext& dtc,
237- DataAllocator& allocator) -> void
238- {
239- std::string ccdbMetadataPrefix = " ccdb-metadata-" ;
240- int objCnt = -1 ;
241- // We use the timeslice, so that we hook into the same interval as the rest of the
242- // callback.
243- static bool isOnline = isOnlineRun (dtc);
244-
245- auto sid = _o2_signpost_id_t {(int64_t )timingInfo.timeslice };
246- O2_SIGNPOST_START (ccdb, sid, " populateCacheWith" , " Starting to populate cache with CCDB objects" );
247- for (auto & route : helper->routes ) {
248- int64_t timestampToUse = timestamp;
249- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Fetching object for route %{public}s" , DataSpecUtils::describe (route.matcher ).data ());
250- objCnt++;
251- auto concrete = DataSpecUtils::asConcreteDataMatcher (route.matcher );
252- Output output{concrete.origin , concrete.description , concrete.subSpec };
253- auto && v = allocator.makeVector <char >(output);
254- std::map<std::string, std::string> metadata;
255- std::map<std::string, std::string> headers;
256- std::string path = " " ;
257- std::string etag = " " ;
258- int chRate = helper->queryPeriodGlo ;
259- bool checkValidity = false ;
260- for (auto & meta : route.matcher .metadata ) {
261- if (meta.name == " ccdb-path" ) {
262- path = meta.defaultValue .get <std::string>();
263- } else if (meta.name == " ccdb-run-dependent" && meta.defaultValue .get <int >() > 0 ) {
264- if (meta.defaultValue .get <int >() == 1 ) {
265- metadata[" runNumber" ] = dtc.runNumber ;
266- } else if (meta.defaultValue .get <int >() == 2 ) {
267- timestampToUse = std::stoi (dtc.runNumber );
268- } else {
269- LOGP (fatal, " Undefined ccdb-run-dependent option {} for spec {}/{}/{}" , meta.defaultValue .get <int >(), concrete.origin .as <std::string>(), concrete.description .as <std::string>(), int (concrete.subSpec ));
270- }
271- } else if (isPrefix (ccdbMetadataPrefix, meta.name )) {
272- std::string key = meta.name .substr (ccdbMetadataPrefix.size ());
273- auto value = meta.defaultValue .get <std::string>();
274- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Adding metadata %{public}s: %{public}s to the request" , key.data (), value.data ());
275- metadata[key] = value;
276- } else if (meta.name == " ccdb-query-rate" ) {
277- chRate = meta.defaultValue .get <int >() * helper->queryPeriodFactor ;
278- }
279- }
280- const auto url2uuid = helper->mapURL2UUID .find (path);
281- if (url2uuid != helper->mapURL2UUID .end ()) {
282- etag = url2uuid->second .etag ;
283- // We check validity every chRate timeslices or if the cache is expired
284- uint64_t validUntil = url2uuid->second .cacheValidUntil ;
285- // When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
286- uint64_t cachePopulatedAt = url2uuid->second .cachePopulatedAt ;
287- // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
288- // when online.
289- bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
290- checkValidity = (std::abs (int (timingInfo.tfCounter - url2uuid->second .lastCheckedTF )) >= chRate) && (isOnline || cacheExpired);
291- } else {
292- checkValidity = true ; // never skip check if the cache is empty
293- }
294-
295- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " checkValidity is %{public}s for tfID %d of %{public}s" , checkValidity ? " true" : " false" , timingInfo.tfCounter , path.data ());
296-
297- const auto & api = helper->getAPI (path);
298- if (checkValidity && (!api.isSnapshotMode () || etag.empty ())) { // in the snapshot mode the object needs to be fetched only once
299- LOGP (detail, " Loading {} for timestamp {}" , path, timestampToUse);
300- api.loadFileToMemory (v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter , helper->createdNotBefore );
301- if ((headers.count (" Error" ) != 0 ) || (etag.empty () && v.empty ())) {
302- LOGP (fatal, " Unable to find CCDB object {}/{}" , path, timestampToUse);
303- // FIXME: I should send a dummy message.
304- continue ;
305- }
306- // printing in case we find a default entry
307- if (headers.find (" default" ) != headers.end ()) {
308- LOGP (detail, " ******** Default entry used for {} ********" , path);
309- }
310- helper->mapURL2UUID [path].lastCheckedTF = timingInfo.tfCounter ;
311- if (etag.empty ()) {
312- helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
313- helper->mapURL2UUID [path].cachePopulatedAt = timestampToUse;
314- helper->mapURL2UUID [path].cacheMiss ++;
315- helper->mapURL2UUID [path].minSize = std::min (v.size (), helper->mapURL2UUID [path].minSize );
316- helper->mapURL2UUID [path].maxSize = std::max (v.size (), helper->mapURL2UUID [path].maxSize );
317- api.appendFlatHeader (v, headers);
318- auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB );
319- helper->mapURL2DPLCache [path] = cacheId;
320- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
321- continue ;
322- }
323- if (v.size ()) { // but should be overridden by fresh object
324- // somewhere here pruneFromCache should be called
325- helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
326- helper->mapURL2UUID [path].cachePopulatedAt = timestampToUse;
327- helper->mapURL2UUID [path].cacheValidUntil = headers[" Cache-Valid-Until" ].empty () ? 0 : std::stoul (headers[" Cache-Valid-Until" ]);
328- helper->mapURL2UUID [path].cacheMiss ++;
329- helper->mapURL2UUID [path].minSize = std::min (v.size (), helper->mapURL2UUID [path].minSize );
330- helper->mapURL2UUID [path].maxSize = std::max (v.size (), helper->mapURL2UUID [path].maxSize );
331- api.appendFlatHeader (v, headers);
332- auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB );
333- helper->mapURL2DPLCache [path] = cacheId;
334- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
335- // one could modify the adoptContainer to take optional old cacheID to clean:
336- // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
337- continue ;
338- } else {
339- // Only once the etag is actually used, we get the information on how long the object is valid
340- helper->mapURL2UUID [path].cacheValidUntil = headers[" Cache-Valid-Until" ].empty () ? 0 : std::stoul (headers[" Cache-Valid-Until" ]);
341- }
342- }
343- // cached object is fine
344- auto cacheId = helper->mapURL2DPLCache [path];
345- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Reusing %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
346- helper->mapURL2UUID [path].cacheHit ++;
347- allocator.adoptFromCache (output, cacheId, header::gSerializationMethodCCDB );
348- // the outputBuffer was not used, can we destroy it?
349- }
350- O2_SIGNPOST_END (ccdb, sid, " populateCacheWith" , " Finished populating cache with CCDB objects" );
351- };
352-
35378AlgorithmSpec CCDBHelpers::fetchFromCCDB ()
35479{
35580 return adaptStateful ([](CallbackService& callbacks, ConfigParamRegistry const & options, DeviceSpec const & spec) {
35681 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
357- initialiseHelper (*helper, options, spec.outputs );
82+ CCDBFetcherHelper::initialiseHelper (*helper, options);
83+ fillValidRoutes (*helper, spec.outputs );
35884 // / Add a callback on stop which dumps the statistics for the caching per
35985 // / path
36086 callbacks.set <CallbackService::Id::Stop>([helper]() {
@@ -453,8 +179,35 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
453179 // Fetch the rest of the objects.
454180 O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " fetchFromCCDB" , " Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
455181 dtc.runNumber .data (), orbitResetTime, timingInfo.creation , timestamp, timingInfo.firstTForbit );
182+ std::vector<CCDBFetcherHelper::FetchOp> ops;
183+ int runNumber = 0 ;
184+ std::string ccdbMetadataPrefix = " ccdb-metadata-" ;
185+ for (auto &route : helper->routes ) {
186+ CCDBFetcherHelper::FetchOp op{.spec = route.matcher , .timestamp = timestamp, .runNumber = std::stoi (dtc.runNumber )};
187+ for (auto & meta : route.matcher .metadata ) {
188+ if (meta.name == " ccdb-path" ) {
189+ op.url = meta.defaultValue .get <std::string>();
190+ } else if (meta.name == " ccdb-run-dependent" && meta.defaultValue .get <int >() > 0 ) {
191+ if (meta.defaultValue .get <int >() == 1 ) {
192+ op.runNumber = runNumber;
193+ } else if (meta.defaultValue .get <int >() == 2 ) {
194+ op.timestamp = runNumber;
195+ } else {
196+ LOGP (fatal, " Undefined ccdb-run-dependent option {} for spec {}" , meta.defaultValue .get <int >(), DataSpecUtils::describe (route.matcher ));
197+ }
198+ } else if (meta.name .starts_with (ccdbMetadataPrefix)) {
199+ std::string key = meta.name .substr (ccdbMetadataPrefix.size ());
200+ auto value = meta.defaultValue .get <std::string>();
201+ O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Adding metadata %{public}s: %{public}s to the request" , key.data (), value.data ());
202+ op.metadata .push_back ({key, value});
203+ } else if (meta.name == " ccdb-query-rate" ) {
204+ op.queryRate = meta.defaultValue .get <int >() * helper->queryPeriodFactor ;
205+ }
206+ }
207+ ops.push_back (op);
208+ }
456209
457- populateCacheWith (helper, timestamp , timingInfo, dtc, allocator);
210+ CCDBFetcherHelper:: populateCacheWith (helper, ops , timingInfo, dtc, allocator);
458211 O2_SIGNPOST_END (ccdb, _o2_signpost_id_t {(int64_t )timingInfo.timeslice }, " fetchFromCCDB" , " Fetching CCDB objects" );
459212 }); });
460213}
0 commit comments