Skip to content

Commit c1cd2a6

Browse files
committed
With -condition-use-slice-for-prescaling <N> use TF slice instead of TFcounter for CCDB cache validation is N!=0
If --condition-tf-per-query-multiplier value is negative, the prescaling is simply applied to tfCounter%|query_rate| (or timeslice%|query_rate| if --condition-use-slice-for-prescaling is asked) If N>0, then enforce a check if the abs difference between the last checked and current TFCounters (not slices!) exceeds N, even if the slices difference is less than the requested check rate.
1 parent 1cfbc1d commit c1cd2a6

File tree

4 files changed

+62
-15
lines changed

4 files changed

+62
-15
lines changed

Framework/CCDBSupport/src/CCDBFetcherHelper.cxx

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,20 @@ void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamR
5151
auto defHost = options.get<std::string>("condition-backend");
5252
auto checkRate = options.get<int>("condition-tf-per-query");
5353
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
54+
helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
5455
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
5556
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
56-
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
57-
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
57+
helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
58+
std::string extraCond{};
59+
if (helper.useTFSlice) {
60+
extraCond = ". Use TFSlice";
61+
if (helper.useTFSlice > 0) {
62+
extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
63+
}
64+
}
65+
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
66+
helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
67+
extraCond);
5868
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
5969
auto remapString = options.get<std::string>("condition-remap");
6070
ParserResult result = parseRemappings(remapString.c_str());
@@ -205,12 +215,21 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
205215
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
206216
// when online.
207217
bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt);
208-
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
218+
if (isOnline || cacheExpired) {
219+
if (!helper->useTFSlice) {
220+
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
221+
} else {
222+
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
223+
if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
224+
checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
225+
}
226+
}
227+
}
209228
} else {
210229
checkValidity = true; // never skip check if the cache is empty
211230
}
212231

213-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
232+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());
214233

215234
const auto& api = helper->getAPI(path);
216235
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
@@ -226,6 +245,7 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
226245
LOGP(detail, "******** Default entry used for {} ********", path);
227246
}
228247
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
248+
helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
229249
if (etag.empty()) {
230250
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
231251
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;

Framework/CCDBSupport/src/CCDBFetcherHelper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct CCDBFetcherHelper {
3333
size_t minSize = -1ULL;
3434
size_t maxSize = 0;
3535
int lastCheckedTF = 0;
36+
int lastCheckedSlice = 0;
3637
};
3738

3839
struct RemapMatcher {
@@ -94,6 +95,7 @@ struct CCDBFetcherHelper {
9495
int queryPeriodGlo = 1;
9596
int queryPeriodFactor = 1;
9697
int64_t timeToleranceMS = 5000;
98+
int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice
9799

98100
o2::ccdb::CcdbApi& getAPI(const std::string& path);
99101
static void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options);

Framework/CCDBSupport/src/CCDBHelpers.cxx

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct CCDBFetcherHelper {
3939
size_t minSize = -1ULL;
4040
size_t maxSize = 0;
4141
int lastCheckedTF = 0;
42+
int lastCheckedSlice = 0;
4243
};
4344

4445
struct RemapMatcher {
@@ -60,6 +61,7 @@ struct CCDBFetcherHelper {
6061
int queryPeriodGlo = 1;
6162
int queryPeriodFactor = 1;
6263
int64_t timeToleranceMS = 5000;
64+
int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice
6365

6466
o2::ccdb::CcdbApi& getAPI(const std::string& path)
6567
{
@@ -165,10 +167,20 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti
165167
auto defHost = options.get<std::string>("condition-backend");
166168
auto checkRate = options.get<int>("condition-tf-per-query");
167169
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
170+
helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
168171
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
169172
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
170-
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
171-
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
173+
helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
174+
std::string extraCond{};
175+
if (helper.useTFSlice) {
176+
extraCond = ". Use TFSlice";
177+
if (helper.useTFSlice > 0) {
178+
extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
179+
}
180+
}
181+
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
182+
helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
183+
extraCond);
172184
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
173185
auto remapString = options.get<std::string>("condition-remap");
174186
CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str());
@@ -276,7 +288,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
276288
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
277289
metadata[key] = value;
278290
} else if (meta.name == "ccdb-query-rate") {
279-
chRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
291+
chRate = std::max(1, meta.defaultValue.get<int>()) * helper->queryPeriodFactor;
280292
}
281293
}
282294
const auto url2uuid = helper->mapURL2UUID.find(path);
@@ -289,12 +301,21 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
289301
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
290302
// when online.
291303
bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
292-
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
304+
if (isOnline || cacheExpired) {
305+
if (!helper->useTFSlice) {
306+
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
307+
} else {
308+
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
309+
if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
310+
checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
311+
}
312+
}
313+
}
293314
} else {
294315
checkValidity = true; // never skip check if the cache is empty
295316
}
296317

297-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
318+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());
298319

299320
const auto& api = helper->getAPI(path);
300321
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
@@ -310,6 +331,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
310331
LOGP(detail, "******** Default entry used for {} ********", path);
311332
}
312333
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
334+
helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
313335
if (etag.empty()) {
314336
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
315337
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
@@ -382,21 +404,22 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
382404
std::map<std::string, std::string> metadata;
383405
std::map<std::string, std::string> headers;
384406
std::string etag;
385-
bool checkValidity = std::abs(int(timingInfo.tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
407+
int32_t counter = helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter;
408+
bool checkValidity = std::abs(int(counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
386409
const auto url2uuid = helper->mapURL2UUID.find(path);
387410
if (url2uuid != helper->mapURL2UUID.end()) {
388411
etag = url2uuid->second.etag;
389412
} else {
390413
checkValidity = true; // never skip check if the cache is empty
391414
}
392-
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tfID %d of %{public}s",
393-
checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
415+
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tf%{public}s %d of %{public}s",
416+
checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", counter, path.data());
394417
Output output{"CTP", "OrbitReset", 0};
395418
Long64_t newOrbitResetTime = orbitResetTime;
396419
auto&& v = allocator.makeVector<char>(output);
397420
const auto& api = helper->getAPI(path);
398421
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
399-
helper->lastCheckedTFCounterOrbReset = timingInfo.tfCounter;
422+
helper->lastCheckedTFCounterOrbReset = counter;
400423
api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
401424
if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
402425
LOGP(fatal, "Unable to find CCDB object {}/{}", path, timingInfo.creation);

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
177177
{"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
178178
{"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
179179
{"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
180-
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}},
180+
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
181+
{"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
181182
{"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
182183
{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
183184
{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
@@ -195,7 +196,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
195196
{"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
196197
{"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
197198
{"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
198-
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}},
199+
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
200+
{"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
199201
{"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
200202
{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
201203
{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},

0 commit comments

Comments
 (0)