2222#include " Framework/DataOutputDirector.h"
2323#include " Framework/TableTreeHelpers.h"
2424#include " Framework/Monitoring.h"
25+ #include " Framework/Signpost.h"
2526
2627#include < Monitoring/Monitoring.h>
28+ #include < TDirectory.h>
2729#include < TFile.h>
2830#include < TFile.h>
2931#include < TTree.h>
3032#include < TMap.h>
3133#include < TObjString.h>
3234#include < arrow/table.h>
35+ #include < chrono>
36+ #include < ios>
37+
38+ O2_DECLARE_DYNAMIC_LOG (histogram_registry);
3339
3440namespace o2 ::framework::writers
3541{
@@ -46,6 +52,7 @@ struct InputObjectRoute {
4652struct InputObject {
4753 TClass* kind = nullptr ;
4854 void * obj = nullptr ;
55+ std::string container;
4956 std::string name;
5057 int count = -1 ;
5158};
@@ -273,24 +280,30 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
273280 callbacks.set <CallbackService::Id::EndOfStream>(endofdatacb);
274281 return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
275282 auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const & ref) {
283+ O2_SIGNPOST_ID_GENERATE (hid, histogram_registry);
284+ O2_SIGNPOST_START (histogram_registry, hid, " mergePart" , " Merging histogram" );
276285 if (!ref.header ) {
277- LOG (error) << " Header not found" ;
286+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart " , " Header not found. " ) ;
278287 return ;
279288 }
280289 auto datah = o2::header::get<o2::header::DataHeader*>(ref.header );
281290 if (!datah) {
282- LOG (error) << " No data header in stack" ;
291+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart " , " No data header in stack" ) ;
283292 return ;
284293 }
285294
286295 if (!ref.payload ) {
287- LOGP (error, " Payload not found for {}/{}/{}" , datah->dataOrigin .as <std::string>(), datah->dataDescription .as <std::string>(), datah->subSpecification );
296+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart" , " Payload not found for %{public}s/%{public}s/%d" ,
297+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
298+ datah->subSpecification );
288299 return ;
289300 }
290301
291302 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header );
292303 if (!objh) {
293- LOGP (error, " No output object header in stack of {}/{}/{}" , datah->dataOrigin .as <std::string>(), datah->dataDescription .as <std::string>(), datah->subSpecification );
304+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart" , " No output object header in stack of %{public}s/%{public}s/%d." ,
305+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
306+ datah->subSpecification );
294307 return ;
295308 }
296309
@@ -300,48 +313,73 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
300313 obj.kind = tm.ReadClass ();
301314 tm.SetBufferOffset (0 );
302315 tm.ResetMap ();
316+ O2_SIGNPOST_ID_GENERATE (did, histogram_registry);
317+ O2_SIGNPOST_START (histogram_registry, did, " initialising root" , " Starting deserialization of %{public}s/%{public}s/%d" ,
318+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
319+ datah->subSpecification );
303320 if (obj.kind == nullptr ) {
304- LOGP (error, " Cannot read class info from buffer of {}/{}/{}" , datah->dataOrigin .as <std::string>(), datah->dataDescription .as <std::string>(), datah->subSpecification );
321+ O2_SIGNPOST_END (histogram_registry, did, " initialising root" , " Failed to deserialise" );
322+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart" , " Cannot read class info from buffer of %{public}s/%{public}s/%d." ,
323+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
324+ datah->subSpecification );
305325 return ;
306326 }
327+ O2_SIGNPOST_END (histogram_registry, did, " initialising root" , " Done init." );
307328
308329 auto policy = objh->mPolicy ;
309330 auto sourceType = objh->mSourceType ;
310331 auto hash = objh->mTaskHash ;
332+ O2_SIGNPOST_START (histogram_registry, did, " deserialization" , " Starting deserialization of %{public}s/%{public}s/%d" ,
333+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
334+ datah->subSpecification );
311335
312336 obj.obj = tm.ReadObjectAny (obj.kind );
313337 auto * named = static_cast <TNamed*>(obj.obj );
314338 obj.name = named->GetName ();
339+ O2_SIGNPOST_END (histogram_registry, did, " deserialization" , " Done deserialization." );
340+ // If we have a folder, we assume the first element of the path
341+ // to be the name of the registry.
342+ if (sourceType == HistogramRegistrySource) {
343+ obj.container = objh->containerName ;
344+ } else {
345+ obj.container = obj.name ;
346+ }
315347 auto hpos = std::find_if (tskmap.begin (), tskmap.end (), [&](auto && x) { return x.id == hash; });
316348 if (hpos == tskmap.end ()) {
317- LOG (error) << " No task found for hash " << hash;
349+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart " , " No task found for hash %d. " , hash) ;
318350 return ;
319351 }
320352 auto taskname = hpos->name ;
321353 auto opos = std::find_if (objmap.begin (), objmap.end (), [&](auto && x) { return x.id == hash; });
322354 if (opos == objmap.end ()) {
323- LOG (error) << " No object list found for task " << taskname << " (hash=" << hash << " )" ;
355+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart" , " No object list found for task %{public}s (hash=%d)." ,
356+ taskname.c_str (), hash);
324357 return ;
325358 }
326359 auto objects = opos->bindings ;
327- if (std::find (objects.begin (), objects.end (), obj.name ) == objects.end ()) {
328- LOG (error) << " No object " << obj.name << " in map for task " << taskname;
360+ if (std::find (objects.begin (), objects.end (), obj.container ) == objects.end ()) {
361+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " mergePart" , " No container %{public}s in map for task %{public}s." ,
362+ obj.container .c_str (), taskname.c_str ());
329363 return ;
330364 }
331365 auto nameHash = runtime_hash (obj.name .c_str ());
332366 InputObjectRoute key{obj.name , nameHash, taskname, hash, policy, sourceType};
333367 auto existing = std::find_if (inputObjects->begin (), inputObjects->end (), [&](auto && x) { return (x.first .uniqueId == nameHash) && (x.first .taskHash == hash); });
334368 // If it's the first one, we just add it to the list.
369+ O2_SIGNPOST_START (histogram_registry, did, " merging" , " Starting merging of %{public}s/%{public}s/%d" ,
370+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
371+ datah->subSpecification );
335372 if (existing == inputObjects->end ()) {
336373 obj.count = objh->mPipelineSize ;
337- inputObjects->push_back ( std::make_pair ( key, obj) );
374+ inputObjects->emplace_back ( key, obj);
338375 existing = inputObjects->end () - 1 ;
339376 } else {
340377 obj.count = existing->second .count ;
341378 // Otherwise, we merge it with the existing one.
342379 auto merger = existing->second .kind ->GetMerge ();
343380 if (!merger) {
344- LOG (error) << " Already one unmergeable object found for " << obj.name ;
381+ O2_SIGNPOST_END (histogram_registry, did, " merging" , " Unabled to merge" );
382+ O2_SIGNPOST_END_WITH_ERROR (histogram_registry, hid, " merging" , " Already one unmergeable object found for %{public}s" , obj.name .c_str ());
345383 return ;
346384 }
347385 TList coll;
@@ -353,15 +391,22 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
353391 existing->second .count -= 1 ;
354392
355393 if (existing->second .count != 0 ) {
394+ O2_SIGNPOST_END (histogram_registry, did, " merging" , " Done partial merging." );
395+ O2_SIGNPOST_END (histogram_registry, hid, " mergePart" , " Pipeline lanes still missing." );
356396 return ;
357397 }
398+ O2_SIGNPOST_END (histogram_registry, did, " merging" , " Done merging." );
358399 // Write the object here.
359400 auto route = existing->first ;
360401 auto entry = existing->second ;
361402 auto file = ROOTfileNames.find (route.policy );
362403 if (file == ROOTfileNames.end ()) {
404+ O2_SIGNPOST_END (histogram_registry, hid, " mergePart" , " Not matching any file." );
363405 return ;
364406 }
407+ O2_SIGNPOST_START (histogram_registry, did, " writing" , " Starting writing of %{public}s/%{public}s/%d" ,
408+ datah->dataOrigin .as <std::string>().c_str (), datah->dataDescription .as <std::string>().c_str (),
409+ datah->subSpecification );
365410 auto filename = file->second ;
366411 if (f[route.policy ] == nullptr ) {
367412 f[route.policy ] = TFile::Open (filename.c_str (), " RECREATE" );
@@ -375,53 +420,53 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
375420 currentFile = filename;
376421 }
377422
378- // translate the list-structure created by the registry into a directory structure within the file
379- std::function<void (TList*, TDirectory*)> writeListToFile;
380- writeListToFile = [&](TList* list, TDirectory* parentDir) {
381- TIter next (list);
382- TObject* object = nullptr ;
383- while ((object = next ())) {
384- if (object->InheritsFrom (TList::Class ())) {
385- writeListToFile (static_cast <TList*>(object), parentDir->mkdir (object->GetName (), object->GetName (), true ));
423+ // FIXME: handle folders
424+ f[route.policy ]->cd (" /" );
425+ auto * currentDir = f[route.policy ]->GetDirectory (currentDirectory.c_str ());
426+ // The name contains a path...
427+ int objSize = 0 ;
428+ if (sourceType == HistogramRegistrySource) {
429+ TDirectory* currentFolder = currentDir;
430+ O2_SIGNPOST_EVENT_EMIT (histogram_registry, hid, " mergePart" , " Toplevel folder is %{public}s." ,
431+ currentDir->GetName ());
432+ std::string objName = entry.name ;
433+ auto lastSlash = entry.name .rfind (' /' );
434+
435+ if (lastSlash != std::string::npos) {
436+ auto dirname = entry.name .substr (0 , lastSlash);
437+ objName = entry.name .substr (lastSlash + 1 );
438+ currentFolder = currentDir->GetDirectory (dirname.c_str ());
439+ if (!currentFolder) {
440+ O2_SIGNPOST_EVENT_EMIT (histogram_registry, hid, " mergePart" , " Creating folder %{public}s" ,
441+ dirname.c_str ());
442+ currentFolder = currentDir->mkdir (dirname.c_str (), " " , kTRUE );
386443 } else {
387- int objSize = parentDir->WriteObjectAny (object, object->Class (), object->GetName ());
388- static int maxSizeWritten = 0 ;
389- if (objSize > maxSizeWritten) {
390- auto & monitoring = pc.services ().get <Monitoring>();
391- maxSizeWritten = objSize;
392- monitoring.send (Metric{fmt::format (" {}/{}:{}" , object->ClassName (), object->GetName (), objSize), " aod-largest-object-written" }.addTag (tags::Key::Subsystem, tags::Value::DPL));
393- }
394- auto * written = list->Remove (object);
395- delete written;
444+ O2_SIGNPOST_EVENT_EMIT (histogram_registry, hid, " mergePart" , " Folder %{public}s already there." ,
445+ currentFolder->GetName ());
396446 }
397447 }
398- };
399-
400- TDirectory* currentDir = f[route.policy ]->GetDirectory (currentDirectory.c_str ());
401- if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
402- auto * outputList = static_cast <TList*>(entry.obj );
403- outputList->SetOwner (false );
404-
405- // if registry should live in dedicated folder a TNamed object is appended to the list
406- if (outputList->Last () && outputList->Last ()->IsA () == TNamed::Class ()) {
407- delete outputList->Last ();
408- outputList->RemoveLast ();
409- currentDir = currentDir->mkdir (outputList->GetName (), outputList->GetName (), true );
410- }
411-
412- writeListToFile (outputList, currentDir);
413- outputList->SetOwner ();
414- delete outputList;
448+ O2_SIGNPOST_EVENT_EMIT (histogram_registry, hid, " mergePart" , " Writing %{public}s of kind %{public}s in %{public}s" ,
449+ entry.name .c_str (), entry.kind ->GetName (), currentDir->GetName ());
450+ objSize = currentFolder->WriteObjectAny (entry.obj , entry.kind , objName.c_str ());
451+ O2_SIGNPOST_END (histogram_registry, did, " writing" , " End writing %{public}s" , entry.name .c_str ());
452+ delete (TObject*)entry.obj ;
415453 entry.obj = nullptr ;
416454 } else {
417- currentDir->WriteObjectAny (entry.obj , entry.kind , entry.name .c_str ());
455+ O2_SIGNPOST_EVENT_EMIT (histogram_registry, hid, " mergePart" , " Writing %{public}s of kind %{public}s in %{public}s" ,
456+ entry.name .c_str (), entry.kind ->GetName (), currentDir->GetName ());
457+ objSize = currentDir->WriteObjectAny (entry.obj , entry.kind , entry.name .c_str ());
458+ O2_SIGNPOST_END (histogram_registry, did, " writing" , " End writing %{public}s" , entry.name .c_str ());
418459 delete (TObject*)entry.obj ;
419460 entry.obj = nullptr ;
420461 }
462+ O2_SIGNPOST_END (histogram_registry, hid, " mergePart" , " Done merging object of %d bytes." , objSize);
421463 };
464+ O2_SIGNPOST_ID_GENERATE (rid, histogram_registry);
465+ O2_SIGNPOST_START (histogram_registry, rid, " processParts" , " Start merging %zu parts received together." , pc.inputs ().getNofParts (0 ));
422466 for (int pi = 0 ; pi < pc.inputs ().getNofParts (0 ); ++pi) {
423467 mergePart (pc.inputs ().get (" x" , pi));
424468 }
469+ O2_SIGNPOST_END (histogram_registry, rid, " processParts" , " Done histograms in multipart message." );
425470 };
426471 }};
427472}
0 commit comments