1010// or submit itself to any jurisdiction.
1111#include " Framework/AsyncQueue.h"
1212#include " Framework/DataProcessingDevice.h"
13+ #include < atomic>
1314#include " Framework/ControlService.h"
1415#include " Framework/ComputingQuotaEvaluator.h"
1516#include " Framework/DataProcessingHeader.h"
@@ -99,6 +100,8 @@ O2_DECLARE_DYNAMIC_LOG(async_queue);
99100O2_DECLARE_DYNAMIC_LOG (forwarding);
100101// Special log to track CCDB related requests
101102O2_DECLARE_DYNAMIC_LOG (ccdb);
103+ // Special log to track task scheduling
104+ O2_DECLARE_DYNAMIC_LOG (scheduling);
102105
103106using namespace o2 ::framework;
104107using ConfigurationInterface = o2::configuration::ConfigurationInterface;
@@ -1551,10 +1554,22 @@ void DataProcessingDevice::Run()
15511554 auto & spec = ref.get <DeviceSpec const >();
15521555 bool enough = ref.get <ComputingQuotaEvaluator>().selectOffer (streamRef.index , spec.resourcePolicy .request , uv_now (state.loop ));
15531556
1557+ struct SchedulingStats {
1558+ std::atomic<size_t > lastScheduled = 0 ;
1559+ std::atomic<size_t > numberOfUnscheduledSinceLastScheduled = 0 ;
1560+ std::atomic<size_t > numberOfUnscheduled = 0 ;
1561+ std::atomic<size_t > numberOfScheduled = 0 ;
1562+ };
1563+ static SchedulingStats schedulingStats;
1564+ O2_SIGNPOST_ID_GENERATE (sid, scheduling);
15541565 if (enough) {
15551566 stream.id = streamRef;
15561567 stream.running = true ;
15571568 stream.registry = &mServiceRegistry ;
1569+ schedulingStats.lastScheduled = uv_now (state.loop );
1570+ schedulingStats.numberOfScheduled ++;
1571+ schedulingStats.numberOfUnscheduledSinceLastScheduled = 0 ;
1572+ O2_SIGNPOST_EVENT_EMIT (scheduling, sid, " Run" , " Enough resources to schedule computation on stream %d" , streamRef.index );
15581573 if (dplEnableMultithreding) [[unlikely]] {
15591574 stream.task = &handle;
15601575 uv_queue_work (state.loop , stream.task , run_callback, run_completion);
@@ -1563,6 +1578,20 @@ void DataProcessingDevice::Run()
15631578 run_completion (&handle, 0 );
15641579 }
15651580 } else {
1581+ if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1582+ (uv_now (state.loop ) - schedulingStats.lastScheduled ) > 30000 ) {
1583+ O2_SIGNPOST_EVENT_EMIT_WARN (scheduling, sid, " Run" ,
1584+ " Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu." ,
1585+ schedulingStats.numberOfUnscheduledSinceLastScheduled .load (),
1586+ schedulingStats.lastScheduled .load ());
1587+ } else {
1588+ O2_SIGNPOST_EVENT_EMIT (scheduling, sid, " Run" ,
1589+ " Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu." ,
1590+ schedulingStats.numberOfUnscheduledSinceLastScheduled .load (),
1591+ schedulingStats.lastScheduled .load ());
1592+ }
1593+ schedulingStats.numberOfUnscheduled ++;
1594+ schedulingStats.numberOfUnscheduledSinceLastScheduled ++;
15661595 auto ref = ServiceRegistryRef{mServiceRegistry };
15671596 ref.get <ComputingQuotaEvaluator>().handleExpired (reportExpiredOffer);
15681597 }
0 commit comments