2121#include " Framework/TableConsumer.h"
2222#include " Framework/DataOutputDirector.h"
2323#include " Framework/TableTreeHelpers.h"
24+ #include " Framework/Signpost.h"
2425
25- #include < TFile.h>
2626#include < TFile.h>
2727#include < TTree.h>
28+ #include < TFile.h>
2829#include < TMap.h>
2930#include < TObjString.h>
3031#include < arrow/table.h>
3132
33+ O2_DECLARE_DYNAMIC_LOG (histograms);
34+ O2_DECLARE_DYNAMIC_LOG (derived_data);
35+
3236namespace o2 ::framework::writers
3337{
3438
@@ -102,12 +106,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
102106
103107 // this functor is called once per time frame
104108 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void {
105- LOGP (debug, " ======== getGlobalAODSink::processing ========== " );
106- LOGP (debug, " processing data set with {} entries" , pc.inputs ().size ());
109+ O2_SIGNPOST_ID_GENERATE (hid, histograms );
110+ O2_SIGNPOST_START (derived_data, hid, " getGlobalAODSink " , " Processing dataset with %zu entries. " , pc.inputs ().size ());
107111
108112 // return immediately if pc.inputs() is empty. This should never happen!
109113 if (pc.inputs ().size () == 0 ) {
110- LOGP (info, " No inputs available!" );
114+ O2_SIGNPOST_EVENT_EMIT (derived_data, hid, " getGlobalAODSink" , " Processing dataset with %zu entries." , pc.inputs ().size ());
115+ O2_SIGNPOST_END (derived_data, hid, " getGlobalAODSink" , " Done processing." );
111116 return ;
112117 }
113118
@@ -135,7 +140,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
135140 // loop over the DataRefs which are contained in pc.inputs()
136141 for (const auto & ref : pc.inputs ()) {
137142 if (!ref.spec ) {
138- LOGP (debug , " Invalid input will be skipped!" );
143+ O2_SIGNPOST_EVENT_EMIT_ERROR (derived_data, hid, " getGlobalAODSink " , " Invalid input will be skipped!" );
139144 continue ;
140145 }
141146
@@ -178,17 +183,18 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
178183 // get the TableConsumer and corresponding arrow table
179184 auto msg = pc.inputs ().get (ref.spec ->binding );
180185 if (msg.header == nullptr ) {
181- LOGP (error, " No header for message {}:{}" , ref.spec ->binding , DataSpecUtils::describe (*ref.spec ));
186+ O2_SIGNPOST_EVENT_EMIT_ERROR (derived_data, hid, " getGlobalAODSink" , " No header for message %{public}s:%{public}s" ,
187+ ref.spec ->binding .c_str (), DataSpecUtils::describe (*ref.spec ).c_str ());
182188 continue ;
183189 }
184190 auto s = pc.inputs ().get <TableConsumer>(ref.spec ->binding );
185191 auto table = s->asArrowTable ();
186192 if (!table->Validate ().ok ()) {
187- LOGP (warning, " The table \" {} \" is not valid and will not be saved!" , tableName);
193+ O2_SIGNPOST_EVENT_EMIT_WARN (derived_data, hid, " getGlobalAODSink " , " The table \" %{public}s \" is not valid and will not be saved!" , tableName. c_str () );
188194 continue ;
189195 }
190196 if (table->schema ()->fields ().empty ()) {
191- LOGP (debug, " The table \" {} \" is empty but will be saved anyway!" , tableName);
197+ O2_SIGNPOST_EVENT_EMIT (derived_data, hid, " getGlobalAODSink " , " The table \" %{public}s \" is empty but will be saved anyway!" , tableName. c_str () );
192198 }
193199
194200 // loop over all DataOutputDescriptors
@@ -203,7 +209,8 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
203209
204210 // update metadata
205211 if (fileAndFolder.file ->FindObjectAny (" metaData" )) {
206- LOGF (debug, " Metadata: target file %s already has metadata, preserving it" , fileAndFolder.file ->GetName ());
212+ O2_SIGNPOST_EVENT_EMIT (derived_data, hid, " getGlobalAODSink" , " Metadata: target file %{public}s already has metadata, preserving it" ,
213+ fileAndFolder.file ->GetName ());
207214 } else if (!aodMetaDataKeys.empty () && !aodMetaDataVals.empty ()) {
208215 TMap aodMetaDataMap;
209216 for (uint32_t imd = 0 ; imd < aodMetaDataKeys.size (); imd++) {
@@ -227,6 +234,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
227234 ta2tr.process ();
228235 }
229236 }
237+ O2_SIGNPOST_END (derived_data, hid, " getGlobalAODSink" , " Done processing." );
230238 };
231239 }
232240
@@ -252,9 +260,11 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
252260 static std::string currentFile = " " ;
253261
254262 auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
255- LOG (debug) << " Writing merged objects and histograms to file" ;
263+ O2_SIGNPOST_ID_GENERATE (hid, histograms);
264+ O2_SIGNPOST_START (histograms, hid, " getOutputObjHistWriter" , " Writing merged objects and histograms to file" );
256265 if (inputObjects->empty ()) {
257- LOG (error) << " Output object map is empty!" ;
266+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " Output object map is empty!" );
267+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Writing completed with error." );
258268 context.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
259269 return ;
260270 }
@@ -263,30 +273,39 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
263273 f[i]->Close ();
264274 }
265275 }
266- LOG (debug) << " All outputs merged in their respective target files " ;
276+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter " , " Writing completed correctly. " ) ;
267277 context.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
268278 };
269279
270280 callbacks.set <CallbackService::Id::EndOfStream>(endofdatacb);
271281 return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
282+ O2_SIGNPOST_ID_GENERATE (hid, histograms);
283+ O2_SIGNPOST_START (histograms, hid, " getOutputObjHistWriter" , " Processing dataset with %zu entries." , pc.inputs ().size ());
272284 auto const & ref = pc.inputs ().get (" x" );
273285 if (!ref.header ) {
274- LOG (error) << " Header not found" ;
275- return ;
276- }
277- if (!ref.payload ) {
278- LOG (error) << " Payload not found" ;
286+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " Header not found." );
287+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
279288 return ;
280289 }
281290 auto datah = o2::header::get<o2::header::DataHeader*>(ref.header );
282291 if (!datah) {
283- LOG (error) << " No data header in stack" ;
292+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " No data header in stack." );
293+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
294+ return ;
295+ }
296+
297+ if (!ref.payload ) {
298+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " Payload not found for %{public}s." ,
299+ datah->dataDescription .as <std::string>().c_str ());
300+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
284301 return ;
285302 }
286303
287304 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header );
288305 if (!objh) {
289- LOG (error) << " No output object header in stack" ;
306+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " No output object in stack %{public}s." ,
307+ datah->dataDescription .as <std::string>().c_str ());
308+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
290309 return ;
291310 }
292311
@@ -297,7 +316,8 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
297316 tm.SetBufferOffset (0 );
298317 tm.ResetMap ();
299318 if (obj.kind == nullptr ) {
300- LOG (error) << " Cannot read class info from buffer." ;
319+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " Cannot read class info from buffer." );
320+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
301321 return ;
302322 }
303323
@@ -310,18 +330,23 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
310330 obj.name = named->GetName ();
311331 auto hpos = std::find_if (tskmap.begin (), tskmap.end (), [&](auto && x) { return x.id == hash; });
312332 if (hpos == tskmap.end ()) {
313- LOG (error) << " No task found for hash " << hash;
333+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " No task found for hash %d" , hash);
334+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
314335 return ;
315336 }
316337 auto taskname = hpos->name ;
317338 auto opos = std::find_if (objmap.begin (), objmap.end (), [&](auto && x) { return x.id == hash; });
318339 if (opos == objmap.end ()) {
319- LOG (error) << " No object list found for task " << taskname << " (hash=" << hash << " )" ;
340+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " No object list found for task %{public}s (hash=%d)" ,
341+ taskname.c_str (), hash);
342+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
320343 return ;
321344 }
322345 auto objects = opos->bindings ;
323346 if (std::find (objects.begin (), objects.end (), obj.name ) == objects.end ()) {
324- LOG (error) << " No object " << obj.name << " in map for task " << taskname;
347+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " No object %{public}s in map for task %{public}s" ,
348+ obj.name .c_str (), taskname.c_str ());
349+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
325350 return ;
326351 }
327352 auto nameHash = runtime_hash (obj.name .c_str ());
@@ -330,14 +355,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
330355 // If it's the first one, we just add it to the list.
331356 if (existing == inputObjects->end ()) {
332357 obj.count = objh->mPipelineSize ;
333- inputObjects->push_back ( std::make_pair ( key, obj) );
358+ inputObjects->emplace_back ( key, obj);
334359 existing = inputObjects->end () - 1 ;
335360 } else {
336361 obj.count = existing->second .count ;
337362 // Otherwise, we merge it with the existing one.
338363 auto merger = existing->second .kind ->GetMerge ();
339364 if (!merger) {
340- LOG (error) << " Already one unmergeable object found for " << obj.name ;
365+ O2_SIGNPOST_EVENT_EMIT_ERROR (histograms, hid, " getOutputObjHistWriter" , " Already one unmergeable object found for %{public}s" ,
366+ obj.name .c_str ());
367+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Processing completed with error." );
341368 return ;
342369 }
343370 TList coll;
@@ -349,13 +376,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
349376 existing->second .count -= 1 ;
350377
351378 if (existing->second .count != 0 ) {
379+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Still waiting for %d histograms to arrive." , existing->second .count );
352380 return ;
353381 }
382+ O2_SIGNPOST_EVENT_EMIT (histograms, hid, " getOutputObjHistWriter" , " All histogramsa are there. Writing to disk." );
354383 // Write the object here.
355384 auto route = existing->first ;
356385 auto entry = existing->second ;
357386 auto file = ROOTfileNames.find (route.policy );
358387 if (file == ROOTfileNames.end ()) {
388+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Could not find where to write object." );
359389 return ;
360390 }
361391 auto filename = file->second ;
@@ -408,6 +438,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
408438 delete (TObject*)entry.obj ;
409439 entry.obj = nullptr ;
410440 }
441+ O2_SIGNPOST_END (histograms, hid, " getOutputObjHistWriter" , " Done processing histogram." );
411442 };
412443 }};
413444}
0 commit comments