Skip to content

Commit 5dafde5

Browse files
committed
DPL: refactor TransitionState calculation
* Move it to a function and make it easily invokable with just a service registry ref. * Drop unneeded helper. * Move the policies to the DeviceContext this should not change the current behavior. The goal is to be able to trigger the timers also within the input proxy busy loop so that the timers start at the appropriate moment.
1 parent b54384b commit 5dafde5

File tree

6 files changed

+63
-64
lines changed

6 files changed

+63
-64
lines changed

Framework/Core/include/Framework/CommonServices.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ struct CommonServices {
5656
return [](InitContext&, void* service) -> void* { return service; };
5757
}
5858

59-
static ServiceSpec deviceContextSpec();
6059
static ServiceSpec dataProcessorContextSpec();
6160
static ServiceSpec driverClientSpec();
6261
static ServiceSpec monitoringSpec();

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers {
7777
class DataProcessingDevice : public fair::mq::Device
7878
{
7979
public:
80-
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
80+
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&);
8181
void Init() final;
8282
void InitTask() final;
8383
void PreRun() final;

Framework/Core/include/Framework/DeviceContext.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t;
2121
namespace o2::framework
2222
{
2323
struct ComputingQuotaStats;
24+
struct ProcessingPolicies;
2425

2526
/// Stucture which holds the whole runtime context
2627
/// of a running device which is not stored as
@@ -33,6 +34,7 @@ struct DeviceContext {
3334
int expectedRegionCallbacks = 0;
3435
int exitTransitionTimeout = 0;
3536
int dataProcessingTimeout = 0;
37+
ProcessingPolicies &processingPolicies;
3638
};
3739

3840
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec()
12371237
.kind = ServiceKind::Serial};
12381238
}
12391239

1240-
o2::framework::ServiceSpec CommonServices::deviceContextSpec()
1241-
{
1242-
return ServiceSpec{
1243-
.name = "device-context",
1244-
.init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1245-
return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1246-
},
1247-
.configure = noConfiguration(),
1248-
.kind = ServiceKind::Serial};
1249-
}
1250-
12511240
o2::framework::ServiceSpec CommonServices::dataAllocatorSpec()
12521241
{
12531242
return ServiceSpec{

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "Framework/DataProcessor.h"
1818
#include "Framework/DataSpecUtils.h"
1919
#include "Framework/DeviceState.h"
20+
#include "Framework/DeviceStateEnums.h"
2021
#include "Framework/DispatchPolicy.h"
2122
#include "Framework/DispatchControl.h"
2223
#include "Framework/DanglingContext.h"
@@ -196,11 +197,10 @@ struct locked_execution {
196197
~locked_execution() { ref.unlock(); }
197198
};
198199

199-
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies)
200+
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry)
200201
: mRunningDevice{running},
201202
mConfigRegistry{nullptr},
202-
mServiceRegistry{registry},
203-
mProcessingPolicies{policies}
203+
mServiceRegistry{registry}
204204
{
205205
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
206206
if (key == "cleanup") {
@@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
247247
mHandles.resize(1);
248248

249249
ServiceRegistryRef ref{mServiceRegistry};
250+
250251
mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
251252
auto& state = ref.get<DeviceState>();
252253
assert(state.loop);
@@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset()
13301331
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
13311332
}
13321333

1334+
TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
1335+
{
1336+
auto& state = ref.get<DeviceState>();
1337+
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1338+
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1339+
return state.transitionHandling;
1340+
}
1341+
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1342+
auto& deviceContext = ref.get<DeviceContext>();
1343+
// Check if we only have timers
1344+
auto& spec = ref.get<DeviceSpec const>();
1345+
if (hasOnlyTimers(spec)) {
1346+
switchState(ref, StreamingState::EndOfStreaming);
1347+
}
1348+
1349+
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1350+
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1351+
uv_update_time(state.loop);
1352+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1353+
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1354+
}
1355+
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1356+
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1357+
uv_update_time(state.loop);
1358+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1359+
deviceContext.exitTransitionTimeout);
1360+
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1361+
bool onlyGenerated = hasOnlyGenerated(spec);
1362+
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1363+
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1364+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1365+
} else {
1366+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1367+
"New state requested. Waiting for %d seconds before %{public}s",
1368+
timeout,
1369+
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1370+
}
1371+
return TransitionHandlingState::Requested;
1372+
} else {
1373+
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1374+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1375+
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1376+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1377+
} else if (policies.termination == TerminationPolicy::QUIT) {
1378+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1379+
} else {
1380+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1381+
}
1382+
return TransitionHandlingState::Expired;
1383+
}
1384+
}
1385+
13331386
void DataProcessingDevice::Run()
13341387
{
13351388
ServiceRegistryRef ref{mServiceRegistry};
@@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run()
13821435
shouldNotWait = true;
13831436
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
13841437
}
1385-
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1386-
state.transitionHandling = TransitionHandlingState::Requested;
1387-
auto& deviceContext = ref.get<DeviceContext>();
1388-
// Check if we only have timers
1389-
auto& spec = ref.get<DeviceSpec const>();
1390-
if (hasOnlyTimers(spec)) {
1391-
switchState(ref, StreamingState::EndOfStreaming);
1392-
}
1393-
1394-
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1395-
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1396-
uv_update_time(state.loop);
1397-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1398-
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1399-
}
1400-
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1401-
state.transitionHandling = TransitionHandlingState::Requested;
1402-
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1403-
uv_update_time(state.loop);
1404-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1405-
deviceContext.exitTransitionTimeout);
1406-
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1407-
bool onlyGenerated = hasOnlyGenerated(spec);
1408-
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1409-
if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1410-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1411-
} else {
1412-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1413-
"New state requested. Waiting for %d seconds before %{public}s",
1414-
timeout,
1415-
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1416-
}
1417-
} else {
1418-
state.transitionHandling = TransitionHandlingState::Expired;
1419-
if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1420-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1421-
} else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1422-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1423-
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1424-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1425-
} else {
1426-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1427-
}
1428-
}
1429-
}
1438+
state.transitionHandling = updateStateTransition(ref, mProcessingPolicies);
14301439
// If we are Idle, we can then consider the transition to be expired.
14311440
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14321441
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
@@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run()
15601569
}
15611570
}
15621571

1563-
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1572+
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
15641573
auto& spec = ref.get<DeviceSpec const>();
15651574
/// Cleanup messages which are still pending on exit.
15661575
for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {

Framework/Core/src/runDataProcessing.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10921092
quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
10931093
serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
10941094

1095-
deviceContext = std::make_unique<DeviceContext>();
1095+
deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
10961096
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
10971097
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
10981098
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
10991099
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
11001100

1101-
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
1101+
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);
11021102

11031103
serviceRef.get<RawDeviceService>().setDevice(device.get());
11041104
r.fDevice = std::move(device);

0 commit comments

Comments
 (0)