3333#include " Framework/AnalysisSupportHelpers.h"
3434#include " Framework/ServiceRegistryRef.h"
3535#include " Framework/ServiceRegistryHelpers.h"
36+ #include " Framework/Signpost.h"
3637
3738#include " CommonMessageBackendsHelpers.h"
3839#include < Monitoring/Monitoring.h>
4647#include < boost/program_options/variables_map.hpp>
4748#include < csignal>
4849
50+ O2_DECLARE_DYNAMIC_LOG (rate_limiting);
51+
4952namespace o2 ::framework
5053{
5154
@@ -132,6 +135,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
132135 auto &allDeviceMetrics = sm.deviceMetricsInfos ;
133136 auto &specs = sm.deviceSpecs ;
134137 auto &infos = sm.deviceInfos ;
138+ O2_SIGNPOST_ID_FROM_POINTER (sid, rate_limiting, &sm);
135139
136140 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t >(driverMetrics, " rate-limit-state" );
137141 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t >(driverMetrics, " total-arrow-bytes-created" );
@@ -298,14 +302,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
298302 for (size_t di = 0 ; di < specs.size (); di++) {
299303 if (availableSharedMemory < possibleOffer) {
300304 if (lowSharedMemoryCount == 0 ) {
301- LOGP (detail, " We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}" , availableSharedMemory, possibleOffer, offeredSharedMemory);
305+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " not enough" ,
306+ " We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu" ,
307+ availableSharedMemory, possibleOffer, offeredSharedMemory);
302308 }
303309 lowSharedMemoryCount++;
304310 enoughSharedMemoryCount = 0 ;
305311 break ;
306312 } else {
307313 if (enoughSharedMemoryCount == 0 ) {
308- LOGP (detail, " We are back in a state where we enough shared memory: {}MB" , availableSharedMemory);
314+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " enough" ,
315+ " We are back in a state where we enough shared memory: %{bytes}llu MB" , availableSharedMemory);
309316 }
310317 enoughSharedMemoryCount++;
311318 lowSharedMemoryCount = 0 ;
@@ -323,7 +330,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
323330 continue ;
324331 }
325332 possibleOffer = std::min (MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
326- LOGP (detail, " Offering {}MB out of {} to {}" , possibleOffer, availableSharedMemory, specs[candidate].id );
333+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
334+ " Offering %{bytes}llu MB out of %{bytes}llu to %{public}s" ,
335+ possibleOffer, availableSharedMemory, specs[candidate].id .c_str ());
327336 manager.queueMessage (specs[candidate].id .c_str (), fmt::format (" /shm-offer {}" , possibleOffer).data ());
328337 availableSharedMemory -= possibleOffer;
329338 offeredSharedMemory += possibleOffer;
@@ -341,12 +350,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
341350 static int64_t lastShmOfferConsumed = 0 ;
342351 static int64_t lastUnusedOfferedMemory = 0 ;
343352 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
344- LOGP (detail, " Offer consumed so far {}" , shmOfferBytesConsumed);
353+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
354+ " Offer consumed so far %{bytes}llu" , shmOfferBytesConsumed);
345355 lastShmOfferConsumed = shmOfferBytesConsumed;
346356 }
347357 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000 );
348358 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
349- LOGP (detail, " unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000" , unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000 , shmOfferBytesConsumed / 1000000 );
359+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
360+ " unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000" ,
361+ unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000 , shmOfferBytesConsumed / 1000000 );
350362 lastUnusedOfferedMemory = unusedOfferedMemory;
351363 }
352364 // availableSharedMemory is the amount of memory which we know is available to be offered.
@@ -362,14 +374,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
362374 auto * arrow = reinterpret_cast <ArrowContext*>(service);
363375 auto totalBytes = 0 ;
364376 auto totalMessages = 0 ;
377+ O2_SIGNPOST_ID_FROM_POINTER (sid, rate_limiting, &arrow);
365378 for (auto & input : ctx.inputs ()) {
366379 if (input.header == nullptr ) {
367380 continue ;
368381 }
369382 auto const * dh = DataRefUtils::getHeader<DataHeader*>(input);
370383 auto payloadSize = DataRefUtils::getPayloadSize (input);
371384 if (dh->serialization != o2::header::gSerializationMethodArrow ) {
372- LOGP (debug, " Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory" , dh->dataOrigin , dh->dataDescription );
385+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
386+ " Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory." ,
387+ dh->dataOrigin .str , dh->dataDescription .str );
373388 continue ;
374389 }
375390 bool forwarded = false ;
@@ -380,15 +395,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
380395 }
381396 }
382397 if (forwarded) {
383- LOGP (debug, " Message {}/{} is forwarded so we are not returning its memory." , dh->dataOrigin , dh->dataDescription );
398+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
399+ " Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory." ,
400+ dh->dataOrigin .str , dh->dataDescription .str );
384401 continue ;
385402 }
386- LOGP (debug, " Message {}/{} is being deleted. We will return {}MB." , dh->dataOrigin , dh->dataDescription , payloadSize / 1000000 .);
403+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " offer" ,
404+ " Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB." ,
405+ dh->dataOrigin .str , dh->dataDescription .str , payloadSize / 1000000 .);
387406 totalBytes += payloadSize;
388407 totalMessages += 1 ;
389408 }
390409 arrow->updateBytesDestroyed (totalBytes);
391- LOGP (debug, " {}MB bytes being given back to reader, totaling {}MB" , totalBytes / 1000000 ., arrow->bytesDestroyed () / 1000000 .);
410+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " give back" ,
411+ " %{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB" ,
412+ totalBytes / 1000000 ., arrow->bytesDestroyed () / 1000000 .);
392413 arrow->updateMessagesDestroyed (totalMessages);
393414 auto & stats = ctx.services ().get <DataProcessingStats>();
394415 stats.updateStats ({static_cast <short >(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast <int64_t >(arrow->bytesDestroyed ())});
@@ -410,7 +431,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
410431 static bool once = false ;
411432 // Until we guarantee this is called only once...
412433 if (!once) {
413- LOGP (info, " Rate limiting set up at {}MB distributed over {} readers" , config->maxMemory , readers);
434+ O2_SIGNPOST_ID_GENERATE (sid, rate_limiting);
435+ O2_SIGNPOST_EVENT_EMIT_INFO (rate_limiting, sid, " setup" ,
436+ " Rate limiting set up at %{bytes}llu MB distributed over %d readers" ,
437+ config->maxMemory , readers);
414438 registry.registerService (ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
415439 once = true ;
416440 } },
0 commit comments