1616#include " Framework/DataTakingContext.h"
1717#include " Framework/DeviceState.h"
1818#include " Framework/DeviceContext.h"
19+ #include " Framework/Signpost.h"
20+
1921#include < fairmq/Device.h>
2022#include < uv.h>
2123#include < fairmq/shmem/Monitor.h>
2224#include < fairmq/shmem/Common.h>
2325#include < chrono>
2426#include < thread>
2527
28+ O2_DECLARE_DYNAMIC_LOG (rate_limiting);
29+
2630using namespace o2 ::framework;
2731
2832int RateLimiter::check (ProcessingContext& ctx, int maxInFlight, size_t minSHM)
2933{
34+ O2_SIGNPOST_ID_GENERATE (sid, rate_limiting);
3035 if (!maxInFlight && !minSHM) {
3136 return 0 ;
3237 }
@@ -45,9 +50,13 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
4550 while ((mSentTimeframes - mConsumedTimeframes ) >= maxInFlight) {
4651 if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast<std::chrono::duration<float >>(std::chrono::system_clock::now () - startTime).count () > MESSAGE_DELAY_TIME)) {
4752 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
48- LOG (alarm) << " Maximum number of TF in flight reached (" << maxInFlight << " : published " << mSentTimeframes << " - finished " << mConsumedTimeframes << " ), waiting" ;
53+ O2_SIGNPOST_EVENT_EMIT_ALARM (rate_limiting, sid, " timeframe_ratelimit" ,
54+ " Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting" ,
55+ maxInFlight, mSentTimeframes , mConsumedTimeframes );
4956 } else {
50- LOG (info) << " Maximum number of TF in flight reached (" << maxInFlight << " : published " << mSentTimeframes << " - finished " << mConsumedTimeframes << " ), waiting" ;
57+ O2_SIGNPOST_EVENT_EMIT_INFO (rate_limiting, sid, " timeframe_ratelimit" ,
58+ " Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting" ,
59+ maxInFlight, mSentTimeframes , mConsumedTimeframes );
5160 }
5261 waitMessage = true ;
5362 timeoutForMessage = false ;
@@ -67,12 +76,19 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
6776 }
6877 assert (msg->GetSize () == 8 );
6978 mConsumedTimeframes = *(int64_t *)msg->GetData ();
79+ O2_SIGNPOST_EVENT_EMIT (rate_limiting, sid, " timeframe_ratelimit" ,
80+ " Received %llu as consumed timeframes" ,
81+ mConsumedTimeframes );
7082 }
7183 if (waitMessage) {
7284 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
73- LOG (important) << (mSentTimeframes - mConsumedTimeframes ) << " / " << maxInFlight << " TF in flight, continuing to publish" ;
85+ O2_SIGNPOST_EVENT_EMIT_IMPORTANT (rate_limiting, sid, " timeframe_ratelimit" ,
86+ " %lli / %d TF in flight, continue to publish" ,
87+ (mSentTimeframes - mConsumedTimeframes ), maxInFlight);
7488 } else {
75- LOG (info) << (mSentTimeframes - mConsumedTimeframes ) << " / " << maxInFlight << " TF in flight, continuing to publish" ;
89+ O2_SIGNPOST_EVENT_EMIT_INFO (rate_limiting, sid, " timeframe_ratelimit" ,
90+ " %lli / %d TF in flight, continue to publish" ,
91+ (mSentTimeframes - mConsumedTimeframes ), maxInFlight);
7692 }
7793 }
7894
0 commit comments