Skip to content

Commit 35ca22b

Browse files
authored
DPL: fix merging of pipelined devices (#14307)
Sometimes we are just too smart. Multiple messages with the same signature are coalesced in the same input if they are processed at the same time. This explains why the sleep was improving behavior: it merely staggers arrival, so that the optimisation cannot happen anymore.
1 parent bb048ef commit 35ca22b

File tree

1 file changed

+129
-125
lines changed

1 file changed

+129
-125
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 129 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -269,145 +269,149 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
269269

270270
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
271271
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
272-
auto const& ref = pc.inputs().get("x");
273-
if (!ref.header) {
274-
LOG(error) << "Header not found";
275-
return;
276-
}
277-
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
278-
if (!datah) {
279-
LOG(error) << "No data header in stack";
280-
return;
281-
}
272+
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
273+
if (!ref.header) {
274+
LOG(error) << "Header not found";
275+
return;
276+
}
277+
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
278+
if (!datah) {
279+
LOG(error) << "No data header in stack";
280+
return;
281+
}
282282

283-
if (!ref.payload) {
284-
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
285-
return;
286-
}
283+
if (!ref.payload) {
284+
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
285+
return;
286+
}
287287

288-
auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
289-
if (!objh) {
290-
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
291-
return;
292-
}
288+
auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
289+
if (!objh) {
290+
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
291+
return;
292+
}
293293

294-
InputObject obj;
295-
FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
296-
tm.InitMap();
297-
obj.kind = tm.ReadClass();
298-
tm.SetBufferOffset(0);
299-
tm.ResetMap();
300-
if (obj.kind == nullptr) {
301-
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
302-
return;
303-
}
294+
InputObject obj;
295+
FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
296+
tm.InitMap();
297+
obj.kind = tm.ReadClass();
298+
tm.SetBufferOffset(0);
299+
tm.ResetMap();
300+
if (obj.kind == nullptr) {
301+
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
302+
return;
303+
}
304304

305-
auto policy = objh->mPolicy;
306-
auto sourceType = objh->mSourceType;
307-
auto hash = objh->mTaskHash;
305+
auto policy = objh->mPolicy;
306+
auto sourceType = objh->mSourceType;
307+
auto hash = objh->mTaskHash;
308308

309-
obj.obj = tm.ReadObjectAny(obj.kind);
310-
auto* named = static_cast<TNamed*>(obj.obj);
311-
obj.name = named->GetName();
312-
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
313-
if (hpos == tskmap.end()) {
314-
LOG(error) << "No task found for hash " << hash;
315-
return;
316-
}
317-
auto taskname = hpos->name;
318-
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
319-
if (opos == objmap.end()) {
320-
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
321-
return;
322-
}
323-
auto objects = opos->bindings;
324-
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
325-
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
326-
return;
327-
}
328-
auto nameHash = runtime_hash(obj.name.c_str());
329-
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
330-
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
331-
// If it's the first one, we just add it to the list.
332-
if (existing == inputObjects->end()) {
333-
obj.count = objh->mPipelineSize;
334-
inputObjects->push_back(std::make_pair(key, obj));
335-
existing = inputObjects->end() - 1;
336-
} else {
337-
obj.count = existing->second.count;
338-
// Otherwise, we merge it with the existing one.
339-
auto merger = existing->second.kind->GetMerge();
340-
if (!merger) {
341-
LOG(error) << "Already one unmergeable object found for " << obj.name;
309+
obj.obj = tm.ReadObjectAny(obj.kind);
310+
auto* named = static_cast<TNamed*>(obj.obj);
311+
obj.name = named->GetName();
312+
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
313+
if (hpos == tskmap.end()) {
314+
LOG(error) << "No task found for hash " << hash;
342315
return;
343316
}
344-
TList coll;
345-
coll.Add(static_cast<TObject*>(obj.obj));
346-
merger(existing->second.obj, &coll, nullptr);
347-
}
348-
// We expect as many objects as the pipeline size, for
349-
// a given object name and task hash.
350-
existing->second.count -= 1;
351-
352-
if (existing->second.count != 0) {
353-
return;
354-
}
355-
// Write the object here.
356-
auto route = existing->first;
357-
auto entry = existing->second;
358-
auto file = ROOTfileNames.find(route.policy);
359-
if (file == ROOTfileNames.end()) {
360-
return;
361-
}
362-
auto filename = file->second;
363-
if (f[route.policy] == nullptr) {
364-
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
365-
}
366-
auto nextDirectory = route.directory;
367-
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
368-
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
369-
f[route.policy]->mkdir(nextDirectory.c_str());
317+
auto taskname = hpos->name;
318+
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
319+
if (opos == objmap.end()) {
320+
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
321+
return;
370322
}
371-
currentDirectory = nextDirectory;
372-
currentFile = filename;
373-
}
323+
auto objects = opos->bindings;
324+
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
325+
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
326+
return;
327+
}
328+
auto nameHash = runtime_hash(obj.name.c_str());
329+
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
330+
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
331+
// If it's the first one, we just add it to the list.
332+
if (existing == inputObjects->end()) {
333+
obj.count = objh->mPipelineSize;
334+
inputObjects->push_back(std::make_pair(key, obj));
335+
existing = inputObjects->end() - 1;
336+
} else {
337+
obj.count = existing->second.count;
338+
// Otherwise, we merge it with the existing one.
339+
auto merger = existing->second.kind->GetMerge();
340+
if (!merger) {
341+
LOG(error) << "Already one unmergeable object found for " << obj.name;
342+
return;
343+
}
344+
TList coll;
345+
coll.Add(static_cast<TObject*>(obj.obj));
346+
merger(existing->second.obj, &coll, nullptr);
347+
}
348+
// We expect as many objects as the pipeline size, for
349+
// a given object name and task hash.
350+
existing->second.count -= 1;
374351

375-
// translate the list-structure created by the registry into a directory structure within the file
376-
std::function<void(TList*, TDirectory*)> writeListToFile;
377-
writeListToFile = [&](TList* list, TDirectory* parentDir) {
378-
TIter next(list);
379-
TObject* object = nullptr;
380-
while ((object = next())) {
381-
if (object->InheritsFrom(TList::Class())) {
382-
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
383-
} else {
384-
parentDir->WriteObjectAny(object, object->Class(), object->GetName());
385-
auto* written = list->Remove(object);
386-
delete written;
352+
if (existing->second.count != 0) {
353+
return;
354+
}
355+
// Write the object here.
356+
auto route = existing->first;
357+
auto entry = existing->second;
358+
auto file = ROOTfileNames.find(route.policy);
359+
if (file == ROOTfileNames.end()) {
360+
return;
361+
}
362+
auto filename = file->second;
363+
if (f[route.policy] == nullptr) {
364+
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
365+
}
366+
auto nextDirectory = route.directory;
367+
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
368+
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
369+
f[route.policy]->mkdir(nextDirectory.c_str());
387370
}
371+
currentDirectory = nextDirectory;
372+
currentFile = filename;
388373
}
389-
};
390374

391-
TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
392-
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
393-
auto* outputList = static_cast<TList*>(entry.obj);
394-
outputList->SetOwner(false);
375+
// translate the list-structure created by the registry into a directory structure within the file
376+
std::function<void(TList*, TDirectory*)> writeListToFile;
377+
writeListToFile = [&](TList* list, TDirectory* parentDir) {
378+
TIter next(list);
379+
TObject* object = nullptr;
380+
while ((object = next())) {
381+
if (object->InheritsFrom(TList::Class())) {
382+
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
383+
} else {
384+
parentDir->WriteObjectAny(object, object->Class(), object->GetName());
385+
auto* written = list->Remove(object);
386+
delete written;
387+
}
388+
}
389+
};
390+
391+
TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
392+
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
393+
auto* outputList = static_cast<TList*>(entry.obj);
394+
outputList->SetOwner(false);
395+
396+
// if registry should live in dedicated folder a TNamed object is appended to the list
397+
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
398+
delete outputList->Last();
399+
outputList->RemoveLast();
400+
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
401+
}
395402

396-
// if registry should live in dedicated folder a TNamed object is appended to the list
397-
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
398-
delete outputList->Last();
399-
outputList->RemoveLast();
400-
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
403+
writeListToFile(outputList, currentDir);
404+
outputList->SetOwner();
405+
delete outputList;
406+
entry.obj = nullptr;
407+
} else {
408+
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
409+
delete (TObject*)entry.obj;
410+
entry.obj = nullptr;
401411
}
402-
403-
writeListToFile(outputList, currentDir);
404-
outputList->SetOwner();
405-
delete outputList;
406-
entry.obj = nullptr;
407-
} else {
408-
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
409-
delete (TObject*)entry.obj;
410-
entry.obj = nullptr;
412+
};
413+
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
414+
mergePart(pc.inputs().get("x", pi));
411415
}
412416
};
413417
}};

0 commit comments

Comments
 (0)