Skip to content

Commit eeca1b0

Browse files
committed
DPL: run data processing on separate thread
1 parent cda0619 commit eeca1b0

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ struct TaskStreamInfo {
6363
/// The registry associated to the task being run
6464
ServiceRegistry* registry;
6565
/// The libuv task handle
66-
uv_work_t task;
66+
uv_work_t* task;
6767
/// Wether or not this task is running
6868
bool running = false;
6969
};

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,12 @@ void DataProcessingDevice::Run()
12641264
bool firstLoop = true;
12651265
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
12661266
O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1267+
1268+
bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1269+
if (dplEnableMultithreding) {
1270+
setenv("UV_THREADPOOL_SIZE", "1", 1);
1271+
}
1272+
12671273
while (state.transitionHandling != TransitionHandlingState::Expired) {
12681274
if (state.nextFairMQState.empty() == false) {
12691275
(void)this->ChangeState(state.nextFairMQState.back());
@@ -1448,13 +1454,13 @@ void DataProcessingDevice::Run()
14481454
stream.id = streamRef;
14491455
stream.running = true;
14501456
stream.registry = &mServiceRegistry;
1451-
#ifdef DPL_ENABLE_THREADING
1452-
stream.task.data = &handle;
1453-
uv_queue_work(state.loop, &stream.task, run_callback, run_completion);
1454-
#else
1455-
run_callback(&handle);
1456-
run_completion(&handle, 0);
1457-
#endif
1457+
if (dplEnableMultithreding) [[unlikely]] {
1458+
stream.task = &handle;
1459+
uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1460+
} else {
1461+
run_callback(&handle);
1462+
run_completion(&handle, 0);
1463+
}
14581464
} else {
14591465
auto ref = ServiceRegistryRef{mServiceRegistry};
14601466
ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);

0 commit comments

Comments
 (0)