Skip to content

Commit 015c38a

Browse files
authored
Pass CCDB Headers together with binary blob (#13575)
1 parent 39489be commit 015c38a

File tree

5 files changed

+96
-3
lines changed

5 files changed

+96
-3
lines changed

CCDB/include/CCDB/CcdbApi.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,9 @@ class CcdbApi //: public DatabaseInterface
392392
// Loads files from alien and cvmfs into given destination.
393393
bool loadLocalContentToMemory(o2::pmr::vector<char>& dest, std::string& url) const;
394394

395+
// add annotated flattened headers in the end of the blob
396+
static void appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers);
397+
395398
// the failure to load the file to memory is signaled by 0 size and non-0 capacity
396399
static bool isMemoryFileInvalid(const o2::pmr::vector<char>& v) { return v.size() == 0 && v.capacity() > 0; }
397400
template <typename T>
@@ -610,6 +613,16 @@ class CcdbApi //: public DatabaseInterface
610613
return getSnapshotDir(topdir, path) + '/' + sfile;
611614
}
612615

616+
template <typename MAP> // can be either std::map or std::multimap
617+
static size_t getFlatHeaderSize(const MAP& Headers)
618+
{
619+
size_t hsize = sizeof(int) + sizeof(FlatHeaderAnnot); // annotation size
620+
for (auto& h : Headers) {
621+
hsize += h.first.length() + h.second.length() + 2; // 2*(string_buffer + terminating null character)
622+
}
623+
return hsize;
624+
}
625+
613626
// tmp helper and single point of entry for a CURL perform call
614627
// helps to switch between easy handle perform and multi handles in a single place
615628
CURLcode CURL_perform(CURL* handle) const;
@@ -632,6 +645,8 @@ class CcdbApi //: public DatabaseInterface
632645
size_t mCurlTimeoutDownload = 15; // download timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD, updated according to the deployment mode
633646
size_t mCurlTimeoutUpload = 15; // upload timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD, updated according to the deployment mode
634647

648+
static constexpr char FlatHeaderAnnot[] = "$HEADER$"; // annotation for flat header
649+
635650
ClassDefNV(CcdbApi, 1);
636651
};
637652

CCDB/src/CcdbApi.cxx

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,12 +1687,15 @@ void CcdbApi::scheduleDownload(RequestContext& requestContext, size_t* requestCo
16871687
ho.counter++;
16881688
try {
16891689
if (chunk.capacity() < chunk.size() + realsize) {
1690+
// estimate headers size when converted to annotated text string
1691+
const char hannot[] = "header";
1692+
size_t hsize = getFlatHeaderSize(ho.header);
16901693
auto cl = ho.header.find("Content-Length");
16911694
if (cl != ho.header.end()) {
16921695
size_t sizeFromHeader = std::stol(cl->second);
1693-
sz = std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1696+
sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
16941697
} else {
1695-
sz = std::max(chunk.size() * 2, chunk.size() + realsize);
1698+
sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
16961699
// LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz);
16971700
}
16981701
chunk.reserve(sz);
@@ -1885,6 +1888,25 @@ void CcdbApi::loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& p
18851888
vectoredLoadFileToMemory(contexts);
18861889
}
18871890

1891+
void CcdbApi::appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers)
1892+
{
1893+
size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1894+
dest.resize(cnt + hsize);
1895+
auto addString = [&dest, &cnt](const std::string& s) {
1896+
for (char c : s) {
1897+
dest[cnt++] = c;
1898+
}
1899+
dest[cnt++] = 0;
1900+
};
1901+
1902+
for (auto& h : headers) {
1903+
addString(h.first);
1904+
addString(h.second);
1905+
}
1906+
*reinterpret_cast<int*>(&dest[cnt]) = hsize; // store size
1907+
std::memcpy(&dest[cnt + sizeof(int)], FlatHeaderAnnot, sizeof(FlatHeaderAnnot)); // annotate the flattened headers map
1908+
}
1909+
18881910
void CcdbApi::navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const
18891911
{
18901912
LOGP(debug, "loadFileToMemory {} ETag=[{}]", requestContext.path, requestContext.etag);

Framework/CCDBSupport/src/CCDBHelpers.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
260260
helper->mapURL2UUID[path].cacheMiss++;
261261
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
262262
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
263+
api.appendFlatHeader(v, headers);
263264
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
264265
helper->mapURL2DPLCache[path] = cacheId;
265266
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
@@ -273,6 +274,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
273274
helper->mapURL2UUID[path].cacheMiss++;
274275
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
275276
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
277+
api.appendFlatHeader(v, headers);
276278
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
277279
helper->mapURL2DPLCache[path] = cacheId;
278280
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
@@ -395,6 +397,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
395397
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
396398
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
397399
newOrbitResetTime = getOrbitResetTime(v);
400+
api.appendFlatHeader(v, headers);
398401
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
399402
helper->mapURL2DPLCache[path] = cacheId;
400403
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
@@ -405,6 +408,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
405408
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
406409
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
407410
newOrbitResetTime = getOrbitResetTime(v);
411+
api.appendFlatHeader(v, headers);
408412
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
409413
helper->mapURL2DPLCache[path] = cacheId;
410414
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);

Framework/Core/include/Framework/DataRefUtils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ struct DataRefUtils {
179179
}
180180
// Decode a CCDB object using the CcdbApi.
181181
static void* decodeCCDB(DataRef const& ref, std::type_info const& info);
182+
static std::map<std::string, std::string> extractCCDBHeaders(DataRef const& ref);
182183

183184
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef& ref)
184185
{

Framework/Core/src/DataRefUtils.cxx

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include <typeinfo>
13+
#include <cstring>
1314
#include "Framework/DataRefUtils.h"
1415
#include "Framework/RuntimeError.h"
1516
#include "Framework/Logger.h"
@@ -80,11 +81,29 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo)
8081
Int_t previousErrorLevel = gErrorIgnoreLevel;
8182
gErrorIgnoreLevel = kFatal;
8283
auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
83-
TMemFile memFile("name", const_cast<char*>(ref.payload), dh->payloadSize, "READ");
84+
const char* buff = const_cast<char*>(ref.payload);
85+
size_t flSize = dh->payloadSize;
86+
// does it have a flattened headers map attached in the end?
87+
constexpr char FlatHeaderAnnot[] = "$HEADER$";
88+
constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot);
89+
int headerSize = 0;
90+
LOGP(debug, "DHPayloadSize={}>{} Ref:{}/{} Cmp {}:{}", dh->payloadSize, Offset, dh->dataOrigin.as<std::string>(), dh->dataDescription.as<std::string>(), std::string{buff + dh->payloadSize - sizeof(FlatHeaderAnnot)}, std::string{FlatHeaderAnnot});
91+
92+
if (dh->payloadSize >= Offset &&
93+
!std::strncmp(buff + dh->payloadSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) {
94+
headerSize = *reinterpret_cast<const int*>(buff + dh->payloadSize - Offset);
95+
}
96+
if (headerSize <= 0) {
97+
LOGP(fatal, "Anomalous flattened header size {} extracted", headerSize);
98+
}
99+
TMemFile memFile("name", const_cast<char*>(ref.payload), dh->payloadSize - headerSize, "READ");
84100
gErrorIgnoreLevel = previousErrorLevel;
85101
if (memFile.IsZombie()) {
86102
return nullptr;
87103
}
104+
105+
extractCCDBHeaders(ref);
106+
88107
TClass* tcl = TClass::GetClass(tinfo);
89108
result = extractFromTFile(memFile, tcl, "ccdb_object");
90109
if (!result) {
@@ -94,4 +113,36 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo)
94113
return result;
95114
}
96115

116+
std::map<std::string, std::string> DataRefUtils::extractCCDBHeaders(DataRef const& ref)
117+
{
118+
auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
119+
const char* buff = const_cast<char*>(ref.payload);
120+
// does it have a flattened headers map attached in the end?
121+
constexpr char FlatHeaderAnnot[] = "$HEADER$";
122+
constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot);
123+
int headerSize = 0, ss0 = 0;
124+
if (dh->payloadSize >= Offset && !std::strncmp(buff + dh->payloadSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) {
125+
headerSize = *reinterpret_cast<const int*>(buff + dh->payloadSize - Offset);
126+
}
127+
if (headerSize <= 0) {
128+
LOGP(fatal, "Anomalous flattened header size {} extracted", headerSize);
129+
}
130+
buff += dh->payloadSize - headerSize; // jump to the start of flattened header
131+
headerSize -= Offset;
132+
const char* str0 = &buff[ss0++];
133+
std::map<std::string, std::string> res;
134+
while (ss0 < headerSize) {
135+
if (buff[ss0++] == 0) {
136+
if (!str0) {
137+
str0 = &buff[ss0]; // new key string is found
138+
} else {
139+
res.emplace(std::string(str0), std::string(&buff[ss0])); // new value string found, add key value to the map
140+
LOGP(debug, "Header{} {}:{}", res.size(), std::string(str0), std::string(&buff[ss0]));
141+
str0 = nullptr;
142+
}
143+
}
144+
}
145+
return res;
146+
}
147+
97148
} // namespace o2::framework

0 commit comments

Comments
 (0)