Skip to content

Commit daa8075

Browse files
committed
DPL Analysis: write HistogramRegistry incrementally
This should reduce big spikes at the end of the processing when a large HistogramRegistry is serialised.
1 parent 17ae0d0 commit daa8075

File tree

6 files changed

+105
-127
lines changed

6 files changed

+105
-127
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
#include "Framework/Monitoring.h"
2525

2626
#include <Monitoring/Monitoring.h>
27+
#include <TDirectory.h>
2728
#include <TFile.h>
2829
#include <TFile.h>
2930
#include <TTree.h>
3031
#include <TMap.h>
3132
#include <TObjString.h>
3233
#include <arrow/table.h>
34+
#include <chrono>
3335

3436
namespace o2::framework::writers
3537
{
@@ -46,6 +48,7 @@ struct InputObjectRoute {
4648
struct InputObject {
4749
TClass* kind = nullptr;
4850
void* obj = nullptr;
51+
std::string container;
4952
std::string name;
5053
int count = -1;
5154
};
@@ -312,6 +315,13 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
312315
obj.obj = tm.ReadObjectAny(obj.kind);
313316
auto* named = static_cast<TNamed*>(obj.obj);
314317
obj.name = named->GetName();
318+
// If we have a folder, we assume the first element of the path
319+
// to be the name of the registry.
320+
if (sourceType == HistogramRegistrySource) {
321+
obj.container = objh->containerName;
322+
} else {
323+
obj.container = obj.name;
324+
}
315325
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
316326
if (hpos == tskmap.end()) {
317327
LOG(error) << "No task found for hash " << hash;
@@ -324,8 +334,8 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
324334
return;
325335
}
326336
auto objects = opos->bindings;
327-
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
328-
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
337+
if (std::find(objects.begin(), objects.end(), obj.container) == objects.end()) {
338+
LOG(error) << "No container " << obj.container << " in map for task " << taskname;
329339
return;
330340
}
331341
auto nameHash = runtime_hash(obj.name.c_str());
@@ -334,7 +344,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
334344
// If it's the first one, we just add it to the list.
335345
if (existing == inputObjects->end()) {
336346
obj.count = objh->mPipelineSize;
337-
inputObjects->push_back(std::make_pair(key, obj));
347+
inputObjects->emplace_back(key, obj);
338348
existing = inputObjects->end() - 1;
339349
} else {
340350
obj.count = existing->second.count;
@@ -375,43 +385,23 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
375385
currentFile = filename;
376386
}
377387

378-
// translate the list-structure created by the registry into a directory structure within the file
379-
std::function<void(TList*, TDirectory*)> writeListToFile;
380-
writeListToFile = [&](TList* list, TDirectory* parentDir) {
381-
TIter next(list);
382-
TObject* object = nullptr;
383-
while ((object = next())) {
384-
if (object->InheritsFrom(TList::Class())) {
385-
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
386-
} else {
387-
int objSize = parentDir->WriteObjectAny(object, object->Class(), object->GetName());
388-
static int maxSizeWritten = 0;
389-
if (objSize > maxSizeWritten) {
390-
auto& monitoring = pc.services().get<Monitoring>();
391-
maxSizeWritten = objSize;
392-
monitoring.send(Metric{fmt::format("{}/{}:{}", object->ClassName(), object->GetName(), objSize), "aod-largest-object-written"}.addTag(tags::Key::Subsystem, tags::Value::DPL));
393-
}
394-
auto* written = list->Remove(object);
395-
delete written;
396-
}
388+
// FIXME: handle folders
389+
auto* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
390+
// The name contains a path...
391+
if (sourceType == HistogramRegistrySource) {
392+
TDirectory* currentFolder = currentDir;
393+
std::string objName = entry.name;
394+
auto lastSlash = entry.name.rfind('/');
395+
auto containerName = obj.container;
396+
if (lastSlash != std::string::npos) {
397+
auto dirname = entry.name.substr(0, lastSlash);
398+
objName = entry.name.substr(lastSlash + 1);
399+
containerName += "/" + dirname;
400+
currentFolder = currentDir->mkdir(containerName.c_str(), "", kTRUE);
397401
}
398-
};
399-
400-
TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
401-
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
402-
auto* outputList = static_cast<TList*>(entry.obj);
403-
outputList->SetOwner(false);
404-
405-
// if registry should live in dedicated folder a TNamed object is appended to the list
406-
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
407-
delete outputList->Last();
408-
outputList->RemoveLast();
409-
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
410-
}
411-
412-
writeListToFile(outputList, currentDir);
413-
outputList->SetOwner();
414-
delete outputList;
402+
currentFolder = currentDir->GetDirectory(containerName.c_str());
403+
currentFolder->WriteObjectAny(entry.obj, entry.kind, objName.c_str());
404+
delete (TObject*)entry.obj;
415405
entry.obj = nullptr;
416406
} else {
417407
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#ifndef FRAMEWORK_ANALYSISMANAGERS_H
1313
#define FRAMEWORK_ANALYSISMANAGERS_H
14+
#include "DataAllocator.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataSpecUtils.h"
1617
#include "Framework/GroupedCombinations.h"
@@ -247,7 +248,10 @@ template <is_histogram_registry T>
247248
bool postRunOutput(EndOfStreamContext& context, T& hr)
248249
{
249250
auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
250-
context.outputs().snapshot(hr.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *(hr.getListOfHistograms()));
251+
auto sendHistos = [deviceSpec, &context](HistogramRegistry const& self, TNamed* obj) mutable {
252+
context.outputs().snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
253+
};
254+
hr.apply(sendHistos);
251255
hr.clean();
252256
return true;
253257
}

Framework/Core/include/Framework/HistogramRegistry.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,15 @@ class HistogramRegistry
173173
template <typename T>
174174
std::shared_ptr<T> operator()(const HistName& histName);
175175

176+
// Apply @a callback on every single entry in the registry
177+
void apply(std::function<void(HistogramRegistry const&, TNamed* named)> callback) const;
176178
// return the OutputSpec associated to the HistogramRegistry
177179
OutputSpec const spec();
178180

179-
OutputRef ref(uint16_t idx, uint16_t pipelineSize);
181+
OutputRef ref(uint16_t idx, uint16_t pipelineSize) const;
180182

181183
void setHash(uint32_t hash);
182184

183-
/// returns the list of histograms, properly sorted for writing.
184-
TList* getListOfHistograms();
185-
186185
/// deletes all the histograms from the registry
187186
void clean();
188187

@@ -220,16 +219,13 @@ class HistogramRegistry
220219

221220
// helper function to find the histogram position in the registry
222221
template <typename T>
223-
uint32_t getHistIndex(const T& histName);
222+
uint32_t getHistIndex(const T& histName) const;
224223

225224
constexpr uint32_t imask(uint32_t i) const
226225
{
227226
return i & REGISTRY_BITMASK;
228227
}
229228

230-
// helper function to create resp. find the subList defined by path
231-
TList* getSubList(TList* list, std::deque<std::string>& path);
232-
233229
// helper function to split user defined path/to/hist/name string
234230
std::deque<std::string> splitPath(const std::string& pathAndNameUser);
235231

@@ -431,7 +427,7 @@ std::shared_ptr<T> HistogramRegistry::operator()(const HistName& histName)
431427
}
432428

433429
template <typename T>
434-
uint32_t HistogramRegistry::getHistIndex(const T& histName)
430+
uint32_t HistogramRegistry::getHistIndex(const T& histName) const
435431
{
436432
if (O2_BUILTIN_LIKELY(histName.hash == mRegistryKey[histName.idx])) {
437433
return histName.idx;

Framework/Core/include/Framework/OutputObjHeader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct OutputObjHeader : public BaseHeader {
4444
uint32_t mTaskHash;
4545
uint16_t mPipelineIndex = 0;
4646
uint16_t mPipelineSize = 1;
47+
// Name of the actual container for the object, e.g. the HistogramRegistry name
48+
char containerName[64] = {0};
4749

4850
constexpr OutputObjHeader()
4951
: BaseHeader(sizeof(OutputObjHeader), sHeaderType, sSerializationMethod, sVersion),

Framework/Core/src/HistogramRegistry.cxx

Lines changed: 18 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ OutputSpec const HistogramRegistry::spec()
5151
return OutputSpec{OutputLabel{mName}, "ATSK", desc, 0, Lifetime::QA};
5252
}
5353

54-
OutputRef HistogramRegistry::ref(uint16_t pipelineIndex, uint16_t pipelineSize)
54+
OutputRef HistogramRegistry::ref(uint16_t pipelineIndex, uint16_t pipelineSize) const
5555
{
56-
return OutputRef{std::string{mName}, 0, o2::header::Stack{OutputObjHeader{mPolicy, OutputObjSourceType::HistogramRegistrySource, mTaskHash, pipelineIndex, pipelineSize}}};
56+
OutputObjHeader header{mPolicy, OutputObjSourceType::HistogramRegistrySource, mTaskHash, pipelineIndex, pipelineSize};
57+
// Copy the name of the registry to the haeder.
58+
strncpy(header.containerName, mName.data(), 64);
59+
return OutputRef{std::string{mName}, 0, o2::header::Stack{header}};
5760
}
5861

5962
void HistogramRegistry::setHash(uint32_t hash)
@@ -282,87 +285,24 @@ void HistogramRegistry::print(bool showAxisDetails)
282285
LOGF(info, "");
283286
}
284287

285-
// create output structure will be propagated to file-sink
286-
TList* HistogramRegistry::getListOfHistograms()
288+
void HistogramRegistry::apply(std::function<void(HistogramRegistry const&, TNamed* named)> callback) const
287289
{
288-
TList* list = new TList();
289-
list->SetName(mName.data());
290-
291-
if (mSortHistos) {
292-
auto caseInsensitiveCompare = [](const std::string& s1, const std::string& s2) {
293-
return std::lexicographical_compare(s1.begin(), s1.end(), s2.begin(), s2.end(),
294-
[](char c1, char c2) { return std::tolower(static_cast<unsigned char>(c1)) < std::tolower(static_cast<unsigned char>(c2)); });
295-
};
296-
std::sort(mRegisteredNames.begin(), mRegisteredNames.end(), caseInsensitiveCompare);
297-
}
298-
299-
for (auto& curHistName : mRegisteredNames) {
290+
// Keep the list sorted as originally done to avoid hidden dependency on the order, for now , for now.
291+
auto finalList = mRegisteredNames;
292+
auto caseInsensitiveCompare = [](const std::string& s1, const std::string& s2) {
293+
return std::lexicographical_compare(s1.begin(), s1.end(), s2.begin(), s2.end(),
294+
[](char c1, char c2) { return std::tolower(static_cast<unsigned char>(c1)) < std::tolower(static_cast<unsigned char>(c2)); });
295+
};
296+
std::sort(finalList.begin(), finalList.end(), caseInsensitiveCompare);
297+
for (auto& curHistName : finalList) {
300298
TNamed* rawPtr = nullptr;
301299
std::visit([&](const auto& sharedPtr) { rawPtr = (TNamed*)sharedPtr.get(); }, mRegistryValue[getHistIndex(HistName{curHistName.data()})]);
302-
if (rawPtr) {
303-
std::deque<std::string> path = splitPath(rawPtr->GetName());
304-
std::string name = path.back();
305-
path.pop_back();
306-
TList* targetList{getSubList(list, path)};
307-
if (targetList) {
308-
rawPtr->SetName(name.data());
309-
targetList->Add(rawPtr);
310-
} else {
311-
LOGF(fatal, "Specified subfolder could not be created.");
312-
}
313-
}
314-
}
315-
316-
// place lists always at the top
317-
std::function<void(TList*)> moveListsToTop;
318-
moveListsToTop = [&](TList* list) {
319-
TIter next(list);
320-
TNamed* subList = nullptr;
321-
std::vector<TObject*> subLists;
322-
while ((subList = (TNamed*)next())) {
323-
if (subList->InheritsFrom(TList::Class())) {
324-
subLists.push_back(subList);
325-
moveListsToTop((TList*)subList);
326-
}
327-
}
328-
std::reverse(subLists.begin(), subLists.end());
329-
for (auto curList : subLists) {
330-
list->Remove(curList);
331-
list->AddFirst(curList);
332-
}
333-
};
334-
moveListsToTop(list);
335-
336-
// create dedicated directory containing all of the registrys histograms
337-
if (mCreateRegistryDir) {
338-
// propagate this to the writer by adding a 'flag' to the output list
339-
list->AddLast(new TNamed("createFolder", ""));
340-
}
341-
return list;
342-
}
343-
344-
// helper function to create resp. find the subList defined by path
345-
TList* HistogramRegistry::getSubList(TList* list, std::deque<std::string>& path)
346-
{
347-
if (path.empty()) {
348-
return list;
349-
}
350-
TList* targetList{nullptr};
351-
std::string nextList = path[0];
352-
path.pop_front();
353-
if (auto subList = (TList*)list->FindObject(nextList.data())) {
354-
if (subList->InheritsFrom(TList::Class())) {
355-
targetList = getSubList((TList*)subList, path);
356-
} else {
357-
return nullptr;
300+
if (!rawPtr) {
301+
// Skipping empty histograms
302+
continue;
358303
}
359-
} else {
360-
subList = new TList();
361-
subList->SetName(nextList.data());
362-
list->Add(subList);
363-
targetList = getSubList(subList, path);
304+
callback(*this, rawPtr);
364305
}
365-
return targetList;
366306
}
367307

368308
// helper function to split user defined path/to/hist/name string

Framework/TestWorkflows/src/o2TestHistograms.cxx

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ struct EtaAndClsHistogramsSimple {
4343
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
4444
Filter trackFilter = o2::aod::track::pt < 10.f;
4545

46+
HistogramRegistry registry{
47+
"registry",
48+
{
49+
{"a/b/eta", "#Eta", {HistType::kTH1F, {{100, -2.0, 2.0}}}}, //
50+
{"a/phi", "#Phi", {HistType::kTH1D, {{102, 0, 2 * M_PI}}}}, //
51+
{"c/pt", "p_{T}", {HistType::kTH1D, {{1002, -0.01, 50.1}}}}, //
52+
{"ptToPt", "#ptToPt", {HistType::kTH2F, {{100, -0.01, 10.01}, {100, -0.01, 10.01}}}} //
53+
} //
54+
};
55+
4656
void init(InitContext&)
4757
{
4858
if (!trackFilterString->empty()) {
@@ -56,6 +66,11 @@ struct EtaAndClsHistogramsSimple {
5666
for (auto& track : tracks) {
5767
etaClsH->Fill(track.eta(), track.pt());
5868
skimEx(track.pt(), track.eta());
69+
70+
registry.fill(HIST("a/b/eta"), track.eta());
71+
registry.fill(HIST("a/phi"), track.phi());
72+
registry.fill(HIST("c/pt"), track.pt());
73+
registry.fill(HIST("ptToPt"), track.pt(), track.signed1Pt());
5974
}
6075
}
6176
};
@@ -66,6 +81,16 @@ struct EtaAndClsHistogramsIUSimple {
6681
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
6782
Filter trackFilter = o2::aod::track::pt < 10.f;
6883

84+
HistogramRegistry registry{
85+
"registry",
86+
{
87+
{"a/b/eta", "#Eta", {HistType::kTH1F, {{100, -2.0, 2.0}}}}, //
88+
{"a/phi", "#Phi", {HistType::kTH1D, {{102, 0, 2 * M_PI}}}}, //
89+
{"c/pt", "p_{T}", {HistType::kTH1D, {{1002, -0.01, 50.1}}}}, //
90+
{"ptToPt", "#ptToPt", {HistType::kTH2F, {{100, -0.01, 10.01}, {100, -0.01, 10.01}}}} //
91+
} //
92+
};
93+
6994
void init(InitContext&)
7095
{
7196
if (!trackFilterString->empty()) {
@@ -79,12 +104,28 @@ struct EtaAndClsHistogramsIUSimple {
79104
for (auto& track : tracks) {
80105
etaClsH->Fill(track.eta(), track.pt());
81106
skimEx(track.pt(), track.eta());
107+
108+
registry.fill(HIST("a/b/eta"), track.eta());
109+
registry.fill(HIST("a/phi"), track.phi());
110+
registry.fill(HIST("c/pt"), track.pt());
111+
registry.fill(HIST("ptToPt"), track.pt(), track.signed1Pt());
82112
}
83113
}
84114
};
85115

86116
struct EtaAndClsHistogramsFull {
87117
OutputObj<TH3F> etaClsH{TH3F("eta_vs_cls_vs_sigmapT", "#eta vs N_{cls} vs sigma_{1/pT}", 102, -2.01, 2.01, 160, -0.5, 159.5, 100, 0, 10)};
118+
119+
HistogramRegistry registry{
120+
"registry",
121+
{
122+
{"a/b/eta", "#Eta", {HistType::kTH1F, {{100, -2.0, 2.0}}}}, //
123+
{"a/phi", "#Phi", {HistType::kTH1D, {{102, 0, 2 * M_PI}}}}, //
124+
{"c/pt", "p_{T}", {HistType::kTH1D, {{1002, -0.01, 50.1}}}}, //
125+
{"ptToPt", "#ptToPt", {HistType::kTH2F, {{100, -0.01, 10.01}, {100, -0.01, 10.01}}}} //
126+
} //
127+
};
128+
88129
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
89130
Filter trackFilter = o2::aod::track::pt < 10.f;
90131

@@ -100,6 +141,11 @@ struct EtaAndClsHistogramsFull {
100141
LOGP(info, "Invoking the run 3 one");
101142
for (auto& track : tracks) {
102143
etaClsH->Fill(track.eta(), track.tpcNClsFindable(), track.sigma1Pt());
144+
145+
registry.fill(HIST("a/b/eta"), track.eta());
146+
registry.fill(HIST("a/phi"), track.phi());
147+
registry.fill(HIST("c/pt"), track.pt());
148+
registry.fill(HIST("ptToPt"), track.pt(), track.signed1Pt());
103149
}
104150
}
105151
};

0 commit comments

Comments
 (0)