Skip to content

Commit a70f2e2

Browse files
committed
DPL: add signposts to debug task scheduling
1 parent 76a4eed commit a70f2e2

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/AsyncQueue.h"
1212
#include "Framework/DataProcessingDevice.h"
13+
#include <atomic>
1314
#include "Framework/ControlService.h"
1415
#include "Framework/ComputingQuotaEvaluator.h"
1516
#include "Framework/DataProcessingHeader.h"
@@ -99,6 +100,8 @@ O2_DECLARE_DYNAMIC_LOG(async_queue);
99100
O2_DECLARE_DYNAMIC_LOG(forwarding);
100101
// Special log to track CCDB related requests
101102
O2_DECLARE_DYNAMIC_LOG(ccdb);
103+
// Special log to track task scheduling
104+
O2_DECLARE_DYNAMIC_LOG(scheduling);
102105

103106
using namespace o2::framework;
104107
using ConfigurationInterface = o2::configuration::ConfigurationInterface;
@@ -1551,10 +1554,22 @@ void DataProcessingDevice::Run()
15511554
auto& spec = ref.get<DeviceSpec const>();
15521555
bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
15531556

1557+
struct SchedulingStats {
1558+
std::atomic<size_t> lastScheduled = 0;
1559+
std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1560+
std::atomic<size_t> numberOfUnscheduled = 0;
1561+
std::atomic<size_t> numberOfScheduled = 0;
1562+
};
1563+
static SchedulingStats schedulingStats;
1564+
O2_SIGNPOST_ID_GENERATE(sid, scheduling);
15541565
if (enough) {
15551566
stream.id = streamRef;
15561567
stream.running = true;
15571568
stream.registry = &mServiceRegistry;
1569+
schedulingStats.lastScheduled = uv_now(state.loop);
1570+
schedulingStats.numberOfScheduled++;
1571+
schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1572+
O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
15581573
if (dplEnableMultithreding) [[unlikely]] {
15591574
stream.task = &handle;
15601575
uv_queue_work(state.loop, stream.task, run_callback, run_completion);
@@ -1563,6 +1578,20 @@ void DataProcessingDevice::Run()
15631578
run_completion(&handle, 0);
15641579
}
15651580
} else {
1581+
if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1582+
(uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1583+
O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1584+
"Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1585+
schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1586+
schedulingStats.lastScheduled.load());
1587+
} else {
1588+
O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1589+
"Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1590+
schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1591+
schedulingStats.lastScheduled.load());
1592+
}
1593+
schedulingStats.numberOfUnscheduled++;
1594+
schedulingStats.numberOfUnscheduledSinceLastScheduled++;
15661595
auto ref = ServiceRegistryRef{mServiceRegistry};
15671596
ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
15681597
}

0 commit comments

Comments
 (0)