Skip to content

Commit ad28495

Browse files
committed
Optional ordering of objects in CCDBPopulator
If an option --ordering-latency <T> with positive value T (in ms) is provided than every incoming object will be buffered and uploaded only if no object with the same CCDB path and earlier start of validity was received in preceding T ms. All remaining cached objects are uploaded at EOR (or stop() method call).
1 parent 37adc60 commit ad28495

File tree

2 files changed

+215
-108
lines changed

2 files changed

+215
-108
lines changed

CCDB/include/CCDB/CcdbObjectInfo.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ class CcdbObjectInfo
9393
[[nodiscard]] long getEndValidityTimestamp() const { return mEnd; }
9494
void setEndValidityTimestamp(long end) { mEnd = end; }
9595

96+
bool operator<(const CcdbObjectInfo& other) const
97+
{
98+
return mStart < other.mStart;
99+
}
100+
101+
bool operator>(const CcdbObjectInfo& other) const
102+
{
103+
return mStart > other.mStart;
104+
}
105+
96106
private:
97107
std::string mObjType{}; // object type (e.g. class)
98108
std::string mFileName{}; // file name in the CCDB
@@ -107,4 +117,17 @@ class CcdbObjectInfo
107117

108118
} // namespace o2::ccdb
109119

120+
namespace std
121+
{
122+
// defining std::hash for InteractionRecord to be used with std containers
123+
template <>
124+
struct hash<o2::ccdb::CcdbObjectInfo> {
125+
public:
126+
size_t operator()(const o2::ccdb::CcdbObjectInfo& info) const
127+
{
128+
return info.getStartValidityTimestamp();
129+
}
130+
};
131+
} // namespace std
132+
110133
#endif // O2_CCDB_CCDBOBJECTINFO_H_

Detectors/Calibration/workflow/CCDBPopulatorSpec.h

Lines changed: 192 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#include "CommonUtils/NameConf.h"
3232
#include <unordered_map>
3333
#include <chrono>
34+
#include <vector>
35+
#include <utility>
36+
#include <map>
3437

3538
namespace o2
3639
{
@@ -39,132 +42,212 @@ namespace calibration
3942

4043
class CCDBPopulator : public o2::framework::Task
4144
{
45+
public:
4246
using CcdbObjectInfo = o2::ccdb::CcdbObjectInfo;
4347
using CcdbApi = o2::ccdb::CcdbApi;
4448

45-
public:
46-
void init(o2::framework::InitContext& ic) final
47-
{
48-
mCCDBpath = ic.options().get<std::string>("ccdb-path");
49-
mSSpecMin = ic.options().get<std::int64_t>("sspec-min");
50-
mSSpecMax = ic.options().get<std::int64_t>("sspec-max");
51-
mFatalOnFailure = ic.options().get<bool>("fatal-on-failure");
52-
mValidateUpload = ic.options().get<bool>("validate-upload");
53-
mThrottlingDelayMS = ic.options().get<std::int64_t>("throttling-delay");
54-
mAPI.init(mCCDBpath);
55-
}
49+
using BLOB = std::vector<char>;
50+
using TBLOB = std::pair<long, BLOB>; // pair of creation time and object to upload
51+
using OBJCACHE = std::map<CcdbObjectInfo, TBLOB>;
52+
53+
void init(o2::framework::InitContext& ic) final;
54+
void run(o2::framework::ProcessingContext& pc) final;
55+
void endOfStream(o2::framework::EndOfStreamContext& ec) final;
56+
void stop() final;
57+
58+
void checkCache(long delay);
59+
void doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached = false);
60+
void logAsNeeded(long nowMS, const std::string& path, std::string& msg);
5661

57-
void run(o2::framework::ProcessingContext& pc) final
58-
{
59-
int nSlots = pc.inputs().getNofParts(0);
60-
if (nSlots != pc.inputs().getNofParts(1)) {
61-
LOGP(alarm, "Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.inputs().getNofParts(1));
62-
return;
63-
} else if (nSlots == 0) {
64-
LOG(alarm) << "0 slots received";
65-
return;
62+
private:
63+
CcdbApi mAPI;
64+
long mThrottlingDelayMS = 0; // LOG(important) at most once per this period for given path
65+
int mOrderingLatencyMS = -1; // if >0, bufferize and upload if no object with smaller SOV was received in this time interval in ms
66+
bool mFatalOnFailure = true; // produce fatal on failed upload
67+
bool mValidateUpload = false; // validate upload by querying its headers
68+
bool mEnded = false;
69+
std::unordered_map<std::string, std::pair<long, int>> mThrottling;
70+
std::unordered_map<std::string, OBJCACHE> mOrdCache;
71+
std::int64_t mSSpecMin = -1; // min subspec to accept
72+
std::int64_t mSSpecMax = -1; // max subspec to accept
73+
std::string mCCDBpath = "http://ccdb-test.cern.ch:8080"; // CCDB path
74+
int mRunNoFromDH = 0;
75+
std::string mRunNoStr = {};
76+
};
77+
78+
void CCDBPopulator::init(o2::framework::InitContext& ic)
79+
{
80+
mCCDBpath = ic.options().get<std::string>("ccdb-path");
81+
mSSpecMin = ic.options().get<std::int64_t>("sspec-min");
82+
mSSpecMax = ic.options().get<std::int64_t>("sspec-max");
83+
mFatalOnFailure = ic.options().get<bool>("fatal-on-failure");
84+
mValidateUpload = ic.options().get<bool>("validate-upload");
85+
mThrottlingDelayMS = ic.options().get<std::int64_t>("throttling-delay");
86+
mOrderingLatencyMS = ic.options().get<int>("ordering-latency");
87+
mAPI.init(mCCDBpath);
88+
}
89+
90+
void CCDBPopulator::run(o2::framework::ProcessingContext& pc)
91+
{
92+
int nSlots = pc.inputs().getNofParts(0);
93+
if (nSlots != pc.inputs().getNofParts(1)) {
94+
LOGP(alarm, "Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.inputs().getNofParts(1));
95+
return;
96+
} else if (nSlots == 0) {
97+
LOG(alarm) << "0 slots received";
98+
return;
99+
}
100+
mRunNoFromDH = pc.services().get<o2::framework::TimingInfo>().runNumber;
101+
if (mRunNoFromDH > 0) {
102+
mRunNoStr = std::to_string(mRunNoFromDH);
103+
}
104+
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
105+
for (int isl = 0; isl < nSlots; isl++) {
106+
auto refWrp = pc.inputs().get("clbWrapper", isl);
107+
auto refPld = pc.inputs().get("clbPayload", isl);
108+
if (!o2::framework::DataRefUtils::isValid(refWrp)) {
109+
LOGP(alarm, "Wrapper is not valid for slot {}", isl);
110+
continue;
66111
}
67-
auto runNoFromDH = pc.services().get<o2::framework::TimingInfo>().runNumber;
68-
std::string runNoStr;
69-
if (runNoFromDH > 0) {
70-
runNoStr = std::to_string(runNoFromDH);
112+
if (!o2::framework::DataRefUtils::isValid(refPld)) {
113+
LOGP(alarm, "Payload is not valid for slot {}", isl);
114+
continue;
71115
}
72-
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
73-
std::map<std::string, std::string> metadata;
74-
for (int isl = 0; isl < nSlots; isl++) {
75-
auto refWrp = pc.inputs().get("clbWrapper", isl);
76-
auto refPld = pc.inputs().get("clbPayload", isl);
77-
if (!o2::framework::DataRefUtils::isValid(refWrp)) {
78-
LOGP(alarm, "Wrapper is not valid for slot {}", isl);
79-
continue;
80-
}
81-
if (!o2::framework::DataRefUtils::isValid(refPld)) {
82-
LOGP(alarm, "Payload is not valid for slot {}", isl);
116+
if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) { // there is a selection
117+
auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
118+
if (ss < mSSpecMin || ss > mSSpecMax) {
83119
continue;
84120
}
85-
if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) { // there is a selection
86-
auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
87-
if (ss < mSSpecMin || ss > mSSpecMax) {
88-
continue;
89-
}
90-
}
91-
const auto wrp = pc.inputs().get<CcdbObjectInfo*>(refWrp);
92-
const auto pld = pc.inputs().get<gsl::span<char>>(refPld); // this is actually an image of TMemFile
93-
if (!wrp) {
94-
LOGP(alarm, "No CcdbObjectInfo info for {} at slot {}",
95-
o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
96-
continue;
97-
}
98-
const auto* md = &wrp->getMetaData();
99-
if (runNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
100-
metadata = *md; // clone since the md from the message is const
101-
metadata[o2::base::NameConf::CCDBRunTag.data()] = runNoStr;
102-
md = &metadata;
103-
}
104-
std::string msg = fmt::format("Storing in ccdb {}/{} of size {} valid for {} : {}", wrp->getPath(), wrp->getFileName(), pld.size(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
105-
auto& lastLog = mThrottling[wrp->getPath()];
106-
if (lastLog.first + mThrottlingDelayMS < nowMS) {
107-
if (lastLog.second) {
108-
msg += fmt::format(" ({} uploads were logged as INFO)", lastLog.second);
109-
lastLog.second = 0;
110-
}
111-
lastLog.first = nowMS;
112-
LOG(important) << msg;
121+
}
122+
const auto wrp = pc.inputs().get<CcdbObjectInfo*>(refWrp);
123+
const auto pld = pc.inputs().get<gsl::span<char>>(refPld); // this is actually an image of TMemFile
124+
if (!wrp) {
125+
LOGP(alarm, "No CcdbObjectInfo info for {} at slot {}",
126+
o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
127+
continue;
128+
}
129+
if (mOrderingLatencyMS <= 0) { // ordering is not requested
130+
doUpload(*wrp, pld);
131+
} else {
132+
auto& pathCache = mOrdCache[wrp->getPath()];
133+
auto stt = pathCache.emplace(*wrp, std::make_pair(nowMS, std::vector<char>(pld.size())));
134+
if (stt.second) { // insertion success
135+
stt.first->second.second.assign(pld.begin(), pld.end());
136+
std::string msg = fmt::format("Bufferizing for ordering ccdb object {}/{} of size {} valid for {} : {}",
137+
wrp->getPath(), wrp->getFileName(), pld.size(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
138+
logAsNeeded(nowMS, wrp->getPath(), msg);
113139
} else {
114-
lastLog.second++;
115-
LOG(info) << msg;
140+
bool v = stt.first != pathCache.end();
141+
LOGP(error, "failed to bufferize a {} object with SOV={}/EOV={} received at {}, conflicting with previously bufferized one SOV={}/EOV={} received at {}",
142+
wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp(), nowMS,
143+
v ? std::to_string(stt.first->first.getStartValidityTimestamp()) : std::string{"N/A"},
144+
v ? std::to_string(stt.first->first.getEndValidityTimestamp()) : std::string{"N/A"},
145+
v ? std::to_string(stt.first->second.first) : std::string{"N/A"});
116146
}
147+
}
148+
}
149+
if (mOrderingLatencyMS > 0) {
150+
checkCache(mOrderingLatencyMS);
151+
}
152+
}
117153

118-
auto uploadTS = o2::ccdb::getCurrentTimestamp();
119-
120-
int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp->getFileName(), wrp->getObjectType(), wrp->getPath(),
121-
*md, wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
122-
if (res) {
123-
if (mFatalOnFailure) {
124-
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
125-
} else {
126-
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
127-
}
128-
}
129-
// do we need to override previous object?
130-
if (wrp->isAdjustableEOV() && !mAPI.isSnapshotMode()) {
131-
o2::ccdb::adjustOverriddenEOV(mAPI, *wrp.get());
154+
void CCDBPopulator::checkCache(long delay)
155+
{
156+
// check if some entries in cache are ripe enough to upload
157+
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
158+
for (auto& pathCache : mOrdCache) { // loop over paths
159+
if (delay < 0 && pathCache.second.size()) {
160+
LOGP(important, "Uploading {} cached objects for path {}", pathCache.second.size(), pathCache.first);
161+
}
162+
for (auto it = pathCache.second.begin(); it != pathCache.second.end();) { // loop over objects of the path
163+
if (nowMS - it->second.first > delay) {
164+
doUpload(it->first, {it->second.second.data(), it->second.second.size()}, true);
165+
it = pathCache.second.erase(it);
166+
} else {
167+
break;
132168
}
133-
// if requested, make sure that the new object can be queried
134-
if (mValidateUpload || wrp->getValidateUpload()) {
135-
constexpr long MAXDESYNC = 3;
136-
auto headers = mAPI.retrieveHeaders(wrp->getPath(), {}, wrp->getStartValidityTimestamp() + (wrp->getEndValidityTimestamp() - wrp->getStartValidityTimestamp()) / 2);
137-
if (headers.empty() ||
138-
std::atol(headers["Created"].c_str()) < uploadTS - MAXDESYNC ||
139-
std::atol(headers["Valid-From"].c_str()) != wrp->getStartValidityTimestamp() ||
140-
std::atol(headers["Valid-Until"].c_str()) != wrp->getEndValidityTimestamp()) {
141-
if (mFatalOnFailure) {
142-
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
143-
} else {
144-
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
145-
}
146-
} else {
147-
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
148-
}
169+
}
170+
}
171+
}
172+
173+
void CCDBPopulator::doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached)
174+
{
175+
std::string msg = fmt::format("Storing in ccdb {}{}/{} of size {} valid for {} : {}", cached ? "cached " : "", wrp.getPath(), wrp.getFileName(), pld.size(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
176+
auto uploadTS = o2::ccdb::getCurrentTimestamp();
177+
logAsNeeded(uploadTS, wrp.getPath(), msg);
178+
std::map<std::string, std::string> metadata;
179+
const auto* md = &wrp.getMetaData();
180+
if (mRunNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
181+
metadata = *md; // clone since the md from the message is const
182+
metadata[o2::base::NameConf::CCDBRunTag.data()] = mRunNoStr;
183+
md = &metadata;
184+
}
185+
int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp.getFileName(), wrp.getObjectType(), wrp.getPath(), *md, wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
186+
if (res) {
187+
if (mFatalOnFailure) {
188+
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
189+
} else {
190+
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
191+
}
192+
}
193+
// if requested, make sure that the new object can be queried
194+
if (mValidateUpload || wrp.getValidateUpload()) {
195+
constexpr long MAXDESYNC = 3;
196+
auto headers = mAPI.retrieveHeaders(wrp.getPath(), {}, wrp.getStartValidityTimestamp() + (wrp.getEndValidityTimestamp() - wrp.getStartValidityTimestamp()) / 2);
197+
if (headers.empty() ||
198+
std::atol(headers["Created"].c_str()) < uploadTS - MAXDESYNC ||
199+
std::atol(headers["Valid-From"].c_str()) != wrp.getStartValidityTimestamp() ||
200+
std::atol(headers["Valid-Until"].c_str()) != wrp.getEndValidityTimestamp()) {
201+
if (mFatalOnFailure) {
202+
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
203+
} else {
204+
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
149205
}
206+
} else {
207+
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
208+
}
209+
}
210+
}
211+
212+
void CCDBPopulator::logAsNeeded(long nowMS, const std::string& path, std::string& msg)
213+
{
214+
auto& lastLog = mThrottling[path];
215+
if (lastLog.first + mThrottlingDelayMS < nowMS) {
216+
if (lastLog.second) {
217+
msg += fmt::format(" ({} uploads were logged as INFO)", lastLog.second);
218+
lastLog.second = 0;
150219
}
220+
lastLog.first = nowMS;
221+
LOG(important) << msg;
222+
} else {
223+
lastLog.second++;
224+
LOG(info) << msg;
151225
}
226+
}
152227

153-
void endOfStream(o2::framework::EndOfStreamContext& ec) final
154-
{
155-
LOG(info) << "EndOfStream received";
228+
void CCDBPopulator::endOfStream(o2::framework::EndOfStreamContext& ec)
229+
{
230+
if (mEnded) {
231+
return;
232+
}
233+
mEnded = true;
234+
LOG(info) << "EndOfStream received";
235+
if (mOrderingLatencyMS > 0) {
236+
checkCache(-mOrderingLatencyMS); // force
156237
}
238+
}
157239

158-
private:
159-
CcdbApi mAPI;
160-
long mThrottlingDelayMS = 0; // LOG(important) at most once per this period for given path
161-
bool mFatalOnFailure = true; // produce fatal on failed upload
162-
bool mValidateUpload = false; // validate upload by querying its headers
163-
std::unordered_map<std::string, std::pair<long, int>> mThrottling;
164-
std::int64_t mSSpecMin = -1; // min subspec to accept
165-
std::int64_t mSSpecMax = -1; // max subspec to accept
166-
std::string mCCDBpath = "http://ccdb-test.cern.ch:8080"; // CCDB path
167-
};
240+
void CCDBPopulator::stop()
241+
{
242+
if (mEnded) {
243+
return;
244+
}
245+
mEnded = true;
246+
LOG(info) << "Forced stop";
247+
if (mOrderingLatencyMS > 0) {
248+
checkCache(-mOrderingLatencyMS); // force
249+
}
250+
}
168251

169252
} // namespace calibration
170253

@@ -186,6 +269,7 @@ DataProcessorSpec getCCDBPopulatorDeviceSpec(const std::string& defCCDB, const s
186269
{"ccdb-path", VariantType::String, defCCDB, {"Path to CCDB"}},
187270
{"sspec-min", VariantType::Int64, -1L, {"min subspec to accept"}},
188271
{"sspec-max", VariantType::Int64, -1L, {"max subspec to accept"}},
272+
{"ordering-latency", VariantType::Int, -1, {"if enabled (positive) bufferize object and upload it if no object with smaller SOV received in given waiting time (ms)"}},
189273
{"throttling-delay", VariantType::Int64, 300000L, {"produce important type log at most once per this period in ms for each CCDB path"}},
190274
{"validate-upload", VariantType::Bool, false, {"valider upload by querying its headers"}},
191275
{"fatal-on-failure", VariantType::Bool, false, {"do not produce fatal on failed upload"}}}};

0 commit comments

Comments
 (0)