Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 129 additions & 125 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -269,145 +269,149 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)

callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
auto const& ref = pc.inputs().get("x");
if (!ref.header) {
LOG(error) << "Header not found";
return;
}
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
if (!datah) {
LOG(error) << "No data header in stack";
return;
}
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
if (!ref.header) {
LOG(error) << "Header not found";
return;
}
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
if (!datah) {
LOG(error) << "No data header in stack";
return;
}

if (!ref.payload) {
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}
if (!ref.payload) {
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}

auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
if (!objh) {
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}
auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
if (!objh) {
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}

InputObject obj;
FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
tm.InitMap();
obj.kind = tm.ReadClass();
tm.SetBufferOffset(0);
tm.ResetMap();
if (obj.kind == nullptr) {
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}
InputObject obj;
FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
tm.InitMap();
obj.kind = tm.ReadClass();
tm.SetBufferOffset(0);
tm.ResetMap();
if (obj.kind == nullptr) {
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
return;
}

auto policy = objh->mPolicy;
auto sourceType = objh->mSourceType;
auto hash = objh->mTaskHash;
auto policy = objh->mPolicy;
auto sourceType = objh->mSourceType;
auto hash = objh->mTaskHash;

obj.obj = tm.ReadObjectAny(obj.kind);
auto* named = static_cast<TNamed*>(obj.obj);
obj.name = named->GetName();
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
if (hpos == tskmap.end()) {
LOG(error) << "No task found for hash " << hash;
return;
}
auto taskname = hpos->name;
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
if (opos == objmap.end()) {
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
return;
}
auto objects = opos->bindings;
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
return;
}
auto nameHash = runtime_hash(obj.name.c_str());
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
// If it's the first one, we just add it to the list.
if (existing == inputObjects->end()) {
obj.count = objh->mPipelineSize;
inputObjects->push_back(std::make_pair(key, obj));
existing = inputObjects->end() - 1;
} else {
obj.count = existing->second.count;
// Otherwise, we merge it with the existing one.
auto merger = existing->second.kind->GetMerge();
if (!merger) {
LOG(error) << "Already one unmergeable object found for " << obj.name;
obj.obj = tm.ReadObjectAny(obj.kind);
auto* named = static_cast<TNamed*>(obj.obj);
obj.name = named->GetName();
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
if (hpos == tskmap.end()) {
LOG(error) << "No task found for hash " << hash;
return;
}
TList coll;
coll.Add(static_cast<TObject*>(obj.obj));
merger(existing->second.obj, &coll, nullptr);
}
// We expect as many objects as the pipeline size, for
// a given object name and task hash.
existing->second.count -= 1;

if (existing->second.count != 0) {
return;
}
// Write the object here.
auto route = existing->first;
auto entry = existing->second;
auto file = ROOTfileNames.find(route.policy);
if (file == ROOTfileNames.end()) {
return;
}
auto filename = file->second;
if (f[route.policy] == nullptr) {
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
}
auto nextDirectory = route.directory;
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
f[route.policy]->mkdir(nextDirectory.c_str());
auto taskname = hpos->name;
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
if (opos == objmap.end()) {
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
return;
}
currentDirectory = nextDirectory;
currentFile = filename;
}
auto objects = opos->bindings;
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
return;
}
auto nameHash = runtime_hash(obj.name.c_str());
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
// If it's the first one, we just add it to the list.
if (existing == inputObjects->end()) {
obj.count = objh->mPipelineSize;
inputObjects->push_back(std::make_pair(key, obj));
existing = inputObjects->end() - 1;
} else {
obj.count = existing->second.count;
// Otherwise, we merge it with the existing one.
auto merger = existing->second.kind->GetMerge();
if (!merger) {
LOG(error) << "Already one unmergeable object found for " << obj.name;
return;
}
TList coll;
coll.Add(static_cast<TObject*>(obj.obj));
merger(existing->second.obj, &coll, nullptr);
}
// We expect as many objects as the pipeline size, for
// a given object name and task hash.
existing->second.count -= 1;

// translate the list-structure created by the registry into a directory structure within the file
std::function<void(TList*, TDirectory*)> writeListToFile;
writeListToFile = [&](TList* list, TDirectory* parentDir) {
TIter next(list);
TObject* object = nullptr;
while ((object = next())) {
if (object->InheritsFrom(TList::Class())) {
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
} else {
parentDir->WriteObjectAny(object, object->Class(), object->GetName());
auto* written = list->Remove(object);
delete written;
if (existing->second.count != 0) {
return;
}
// Write the object here.
auto route = existing->first;
auto entry = existing->second;
auto file = ROOTfileNames.find(route.policy);
if (file == ROOTfileNames.end()) {
return;
}
auto filename = file->second;
if (f[route.policy] == nullptr) {
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
}
auto nextDirectory = route.directory;
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
f[route.policy]->mkdir(nextDirectory.c_str());
}
currentDirectory = nextDirectory;
currentFile = filename;
}
};

TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
auto* outputList = static_cast<TList*>(entry.obj);
outputList->SetOwner(false);
// translate the list-structure created by the registry into a directory structure within the file
std::function<void(TList*, TDirectory*)> writeListToFile;
writeListToFile = [&](TList* list, TDirectory* parentDir) {
TIter next(list);
TObject* object = nullptr;
while ((object = next())) {
if (object->InheritsFrom(TList::Class())) {
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
} else {
parentDir->WriteObjectAny(object, object->Class(), object->GetName());
auto* written = list->Remove(object);
delete written;
}
}
};

TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
auto* outputList = static_cast<TList*>(entry.obj);
outputList->SetOwner(false);

// if registry should live in dedicated folder a TNamed object is appended to the list
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
delete outputList->Last();
outputList->RemoveLast();
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
}

// if registry should live in dedicated folder a TNamed object is appended to the list
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
delete outputList->Last();
outputList->RemoveLast();
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
writeListToFile(outputList, currentDir);
outputList->SetOwner();
delete outputList;
entry.obj = nullptr;
} else {
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
delete (TObject*)entry.obj;
entry.obj = nullptr;
}

writeListToFile(outputList, currentDir);
outputList->SetOwner();
delete outputList;
entry.obj = nullptr;
} else {
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
delete (TObject*)entry.obj;
entry.obj = nullptr;
};
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
mergePart(pc.inputs().get("x", pi));
}
};
}};
Expand Down