1313#include " Framework/DataProcessingStats.h"
1414#include " Framework/ServiceRegistryRef.h"
1515#include " Framework/DeviceState.h"
16- #include " Framework/DriverClient.h"
17- #include " Framework/Monitoring.h"
18- #include " Framework/Logger.h"
16+ #include " Framework/Signpost.h"
1917#include < Monitoring/Monitoring.h>
2018
2119#include < vector>
2220#include < uv.h>
2321#include < cassert>
22+ #include < fmt/core.h>
23+ #include < fmt/format.h>
24+ #include < fmt/ranges.h>
2425
25- #define LOGLEVEL debug
26-
26+ O2_DECLARE_DYNAMIC_LOG (quota);
2727
2828namespace o2 ::framework
2929{
@@ -64,6 +64,8 @@ struct QuotaEvaluatorStats {
6464
6565bool ComputingQuotaEvaluator::selectOffer (int task, ComputingQuotaRequest const & selector, uint64_t now)
6666{
67+ O2_SIGNPOST_ID_GENERATE (qid, quota);
68+
6769 auto selectOffer = [&offers = this ->mOffers , &infos = this ->mInfos , task](int ref, uint64_t now) {
6870 auto & selected = offers[ref];
6971 auto & info = infos[ref];
@@ -89,28 +91,36 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
8991 // LOG(LOGLEVEL) << "No particular resource was requested, so we schedule task anyways";
9092 return enough;
9193 }
94+ O2_SIGNPOST_ID_GENERATE (sid, quota);
9295 if (enough) {
93- LOGP (LOGLEVEL, " {} offers were selected for a total of: cpu {}, memory {}, shared memory {}" , result.size (), totalOffer.cpu , totalOffer.memory , totalOffer.sharedMemory );
94- // LOG(LOGLEVEL) << " The following offers were selected for computation: {} " << fmt::join(result, ", ");
96+ O2_SIGNPOST_START (quota, sid, " summary" , " %zu offers were selected for a total of: cpu %d, memory %lli, shared memory %lli" ,
97+ result.size (), totalOffer.cpu , totalOffer.memory , totalOffer.sharedMemory );
98+ for (auto & offer : result) {
99+ // We pretend each offer id is a pointer, to have a unique id.
100+ O2_SIGNPOST_ID_FROM_POINTER (oid, quota, (void *)(int64_t )(offer*8 ));
101+ O2_SIGNPOST_START (quota, oid, " offers" , " Offer %d has been selected." , offer);
102+ }
95103 dpStats.updateStats ({static_cast <short >(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1 });
96104 } else {
105+ O2_SIGNPOST_START (quota, sid, " summary" , " Not enough resources to select offers." );
97106 dpStats.updateStats ({static_cast <short >(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1 });
98107 if (result.size ()) {
99108 dpStats.updateStats ({static_cast <short >(ProcessingStatsId::RESOURCES_INSUFFICIENT), DataProcessingStats::Op::Add, 1 });
100109 }
101110 }
102111 if (stats.invalidOffers .size ()) {
103- // LOGP(LOGLEVEL, " The following offers were invalid: {}", fmt::join(stats.invalidOffers, ", "));
112+ O2_SIGNPOST_EVENT_EMIT (quota, sid, " summary " , " The following offers were invalid: %s " , fmt::format ( " {}" , fmt::join (stats.invalidOffers , " , " )). c_str ( ));
104113 }
105114 if (stats.otherUser .size ()) {
106- // LOGP(LOGLEVEL, " The following offers were owned by other users: {}", fmt::join(stats.otherUser, ", "));
115+ O2_SIGNPOST_EVENT_EMIT (quota, sid, " summary " , " The following offers were owned by other users: %s " , fmt::format ( " {}" , fmt::join (stats.otherUser , " , " )). c_str ( ));
107116 }
108117 if (stats.expired .size ()) {
109- // LOGP(LOGLEVEL, " The following offers are expired: {}", fmt::join(stats.expired, ", "));
118+ O2_SIGNPOST_EVENT_EMIT (quota, sid, " summary " , " The following offers are expired: %s " , fmt::format ( " {}" , fmt::join (stats.expired , " , " )). c_str ( ));
110119 }
111120 if (stats.unexpiring .size () > 1 ) {
112- // LOGP(LOGLEVEL, " The following offers will never expire: {}", fmt::join(stats.unexpiring, ", "));
121+ O2_SIGNPOST_EVENT_EMIT (quota, sid, " summary " , " The following offers will never expire: %s " , fmt::format ( " {}" , fmt::join (stats.unexpiring , " , " )). c_str ( ));
113122 }
123+ O2_SIGNPOST_END (quota, sid, " summary" , " Done selecting offers." );
114124
115125 return enough;
116126 };
@@ -139,16 +149,18 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
139149 if (offer.runtime < 0 ) {
140150 stats.unexpiring .push_back (i);
141151 } else if (offer.runtime + info.received < now) {
142- LOGP (LOGLEVEL, " Offer {} expired since {} milliseconds and holds {}MB" , i, now - offer.runtime - info.received , offer.sharedMemory / 1000000 );
152+ O2_SIGNPOST_EVENT_EMIT (quota, qid, " select" , " Offer %d expired since %llu milliseconds and holds %llu MB" ,
153+ i, now - offer.runtime - info.received , offer.sharedMemory / 1000000 );
143154 mExpiredOffers .push_back (ComputingQuotaOfferRef{i});
144155 stats.expired .push_back (i);
145156 continue ;
146157 } else {
147- LOGP (LOGLEVEL, " Offer {} still valid for {} milliseconds, providing {}MB" , i, offer.runtime + info.received - now, offer.sharedMemory / 1000000 );
158+ O2_SIGNPOST_EVENT_EMIT (quota, qid, " select" , " Offer %d still valid for %llu milliseconds, providing %llu MB" ,
159+ i, offer.runtime + info.received - now, offer.sharedMemory / 1000000 );
148160 if (minValidity == 0 ) {
149161 minValidity = offer.runtime + info.received - now;
150162 }
151- minValidity = std::min (minValidity,(int64_t )(offer.runtime + info.received - now));
163+ minValidity = std::min (minValidity, (int64_t )(offer.runtime + info.received - now));
152164 }
153165 // / We then check if the offer is suitable
154166 assert (offer.sharedMemory >= 0 );
@@ -177,11 +189,10 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
177189 }
178190
179191 if (minValidity != 0 ) {
180- LOGP (LOGLEVEL, " Next offer to expire in {} milliseconds" , minValidity);
192+ O2_SIGNPOST_EVENT_EMIT (quota, qid, " select " , " Next offer to expire in %llu milliseconds" , minValidity);
181193 uv_timer_start (mTimer , [](uv_timer_t * handle) {
182- LOGP (LOGLEVEL, " Offer should be expired by now, checking again" );
183- },
184- minValidity + 100 , 0 );
194+ O2_SIGNPOST_ID_GENERATE (tid, quota);
195+ O2_SIGNPOST_EVENT_EMIT (quota, tid, " select" , " Offer should be expired by now, checking again." ); }, minValidity + 100 , 0 );
185196 }
186197 // If we get here it means we never got enough offers, so we return false.
187198 return summarizeWhatHappended (enough, stats.selectedOffers , accumulated, stats);
@@ -213,6 +224,8 @@ void ComputingQuotaEvaluator::dispose(int taskId)
213224 continue ;
214225 }
215226 if (offer.sharedMemory <= 0 ) {
227+ O2_SIGNPOST_ID_FROM_POINTER (oid, quota, (void *)(int64_t )(oi*8 ));
228+ O2_SIGNPOST_END (quota, oid, " offers" , " Offer %d back to not needed." , oi);
216229 offer.valid = false ;
217230 offer.score = OfferScore::Unneeded;
218231 }
@@ -242,34 +255,37 @@ void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pen
242255void ComputingQuotaEvaluator::handleExpired (std::function<void (ComputingQuotaOffer const &, ComputingQuotaStats const & stats)> expirator)
243256{
244257 static int nothingToDoCount = mExpiredOffers .size ();
258+ O2_SIGNPOST_ID_GENERATE (qid, quota);
245259 if (mExpiredOffers .size ()) {
246- LOGP (LOGLEVEL, " Handling {} expired offers" , mExpiredOffers .size ());
260+ O2_SIGNPOST_EVENT_EMIT (quota, qid, " handleExpired " , " Handling %zu expired offers" , mExpiredOffers .size ());
247261 nothingToDoCount = 0 ;
248262 } else {
249263 if (nothingToDoCount == 0 ) {
250264 nothingToDoCount++;
251- LOGP (LOGLEVEL , " No expired offers" );
265+ O2_SIGNPOST_EVENT_EMIT (quota, qid, " handleExpired " , " No expired offers" );
252266 }
253267 }
254268 // / Whenever an offer is expired, we give back the resources
255269 // / to the driver.
256270 for (auto & ref : mExpiredOffers ) {
257271 auto & offer = mOffers [ref.index ];
272+ O2_SIGNPOST_ID_FROM_POINTER (oid, quota, (void *)(int64_t )(ref.index *8 ));
258273 if (offer.sharedMemory < 0 ) {
259- LOGP (LOGLEVEL, " Offer {} does not have any more memory. Marking it as invalid." , ref.index );
274+ O2_SIGNPOST_END (quota, oid, " handleExpired " , " Offer %d does not have any more memory. Marking it as invalid." , ref.index );
260275 offer.valid = false ;
261276 offer.score = OfferScore::Unneeded;
262277 continue ;
263278 }
264279 // FIXME: offers should go through the driver client, not the monitoring
265280 // api.
266- LOGP (LOGLEVEL, " Offer {} expired. Giving back {}MB and {} cores" , ref.index , offer.sharedMemory / 1000000 , offer.cpu );
281+ O2_SIGNPOST_END (quota, oid, " handleExpired" , " Offer %d expired. Giving back %llu MB and %d cores" ,
282+ ref.index , offer.sharedMemory / 1000000 , offer.cpu );
267283 assert (offer.sharedMemory >= 0 );
268284 mStats .totalExpiredBytes += offer.sharedMemory ;
269285 mStats .totalExpiredOffers ++;
270286 expirator (offer, mStats );
271- // driverClient.tell("expired shmem {}", offer.sharedMemory);
272- // driverClient.tell("expired cpu {}", offer.cpu);
287+ // driverClient.tell("expired shmem {}", offer.sharedMemory);
288+ // driverClient.tell("expired cpu {}", offer.cpu);
273289 offer.sharedMemory = -1 ;
274290 offer.valid = false ;
275291 offer.score = OfferScore::Unneeded;
0 commit comments