1010// or submit itself to any jurisdiction.
1111#include " Framework/AsyncQueue.h"
1212#include " Framework/DataProcessingDevice.h"
13+ #include < atomic>
1314#include " Framework/ControlService.h"
1415#include " Framework/ComputingQuotaEvaluator.h"
1516#include " Framework/DataProcessingHeader.h"
@@ -98,6 +99,8 @@ O2_DECLARE_DYNAMIC_LOG(async_queue);
9899O2_DECLARE_DYNAMIC_LOG (forwarding);
99100// Special log to track CCDB related requests
100101O2_DECLARE_DYNAMIC_LOG (ccdb);
102+ // Special log to track task scheduling
103+ O2_DECLARE_DYNAMIC_LOG (scheduling);
101104
102105using namespace o2 ::framework;
103106using ConfigurationInterface = o2::configuration::ConfigurationInterface;
@@ -1542,10 +1545,22 @@ void DataProcessingDevice::Run()
15421545 auto & spec = ref.get <DeviceSpec const >();
15431546 bool enough = ref.get <ComputingQuotaEvaluator>().selectOffer (streamRef.index , spec.resourcePolicy .request , uv_now (state.loop ));
15441547
1548+ struct SchedulingStats {
1549+ std::atomic<size_t > lastScheduled = 0 ;
1550+ std::atomic<size_t > numberOfUnscheduledSinceLastScheduled = 0 ;
1551+ std::atomic<size_t > numberOfUnscheduled = 0 ;
1552+ std::atomic<size_t > numberOfScheduled = 0 ;
1553+ };
1554+ static SchedulingStats schedulingStats;
1555+ O2_SIGNPOST_ID_GENERATE (sid, scheduling);
15451556 if (enough) {
15461557 stream.id = streamRef;
15471558 stream.running = true ;
15481559 stream.registry = &mServiceRegistry ;
1560+ schedulingStats.lastScheduled = uv_now (state.loop );
1561+ schedulingStats.numberOfScheduled ++;
1562+ schedulingStats.numberOfUnscheduledSinceLastScheduled = 0 ;
1563+ O2_SIGNPOST_EVENT_EMIT (scheduling, sid, " Run" , " Enough resources to schedule computation on stream %d" , streamRef.index );
15491564 if (dplEnableMultithreding) [[unlikely]] {
15501565 stream.task = &handle;
15511566 uv_queue_work (state.loop , stream.task , run_callback, run_completion);
@@ -1554,6 +1569,20 @@ void DataProcessingDevice::Run()
15541569 run_completion (&handle, 0 );
15551570 }
15561571 } else {
1572+ if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1573+ (uv_now (state.loop ) - schedulingStats.lastScheduled ) > 30000 ) {
1574+ O2_SIGNPOST_EVENT_EMIT_WARN (scheduling, sid, " Run" ,
1575+ " Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu." ,
1576+ schedulingStats.numberOfUnscheduledSinceLastScheduled .load (),
1577+ schedulingStats.lastScheduled .load ());
1578+ } else {
1579+ O2_SIGNPOST_EVENT_EMIT (scheduling, sid, " Run" ,
1580+ " Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu." ,
1581+ schedulingStats.numberOfUnscheduledSinceLastScheduled .load (),
1582+ schedulingStats.lastScheduled .load ());
1583+ }
1584+ schedulingStats.numberOfUnscheduled ++;
1585+ schedulingStats.numberOfUnscheduledSinceLastScheduled ++;
15571586 auto ref = ServiceRegistryRef{mServiceRegistry };
15581587 ref.get <ComputingQuotaEvaluator>().handleExpired (reportExpiredOffer);
15591588 }
0 commit comments