Skip to content

Commit eb14b21

Browse files
committed
DPL: cleanup state switching
- Remove duplicate helper - Add signposts to mark streaming states transitions - Notify driver
1 parent 66e56fe commit eb14b21

File tree

1 file changed

+20
-24
lines changed

1 file changed

+20
-24
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,17 @@ void on_transition_requested_expired(uv_timer_t* handle)
141141
state.transitionHandling = TransitionHandlingState::Expired;
142142
}
143143

144+
auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
145+
{
146+
auto& state = ref.get<DeviceState>();
147+
auto& context = ref.get<DataProcessorContext>();
148+
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
149+
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
150+
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
151+
state.streaming = newState;
152+
ref.get<ControlService>().notifyStreamingState(state.streaming);
153+
};
154+
144155
void on_data_processing_expired(uv_timer_t* handle)
145156
{
146157
auto* ref = (ServiceRegistryRef*)handle->data;
@@ -1236,7 +1247,7 @@ void DataProcessingDevice::PreRun()
12361247
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
12371248
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12381249
state.quitRequested = false;
1239-
state.streaming = StreamingState::Streaming;
1250+
switchState(ref, StreamingState::Streaming);
12401251
state.allowedProcessing = DeviceState::Any;
12411252
for (auto& info : state.inputChannelInfos) {
12421253
if (info.state != InputChannelState::Pull) {
@@ -1365,10 +1376,10 @@ void DataProcessingDevice::Run()
13651376
// Check if we only have timers
13661377
auto& spec = ref.get<DeviceSpec const>();
13671378
if (hasOnlyTimers(spec)) {
1368-
state.streaming = StreamingState::EndOfStreaming;
1379+
switchState(ref, StreamingState::EndOfStreaming);
13691380
}
13701381

1371-
// If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
1382+
// If this is a source device, exitTransitionTimeout and dataProcessingTimeout are effectively
13721383
// the same (because source devices are not allowed to produce any calibration).
13731384
// should be the same.
13741385
if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
@@ -1385,7 +1396,8 @@ void DataProcessingDevice::Run()
13851396
state.transitionHandling = TransitionHandlingState::Requested;
13861397
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
13871398
uv_update_time(state.loop);
1388-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
1399+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1400+
deviceContext.exitTransitionTimeout);
13891401
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
13901402
if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
13911403
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
@@ -1728,15 +1740,6 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17281740
{
17291741
auto& context = ref.get<DataProcessorContext>();
17301742
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1731-
auto switchState = [ref](StreamingState newState) {
1732-
auto& state = ref.get<DeviceState>();
1733-
auto& context = ref.get<DataProcessorContext>();
1734-
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1735-
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
1736-
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
1737-
state.streaming = newState;
1738-
ref.get<ControlService>().notifyStreamingState(state.streaming);
1739-
};
17401743
auto& state = ref.get<DeviceState>();
17411744
auto& spec = ref.get<DeviceSpec const>();
17421745

@@ -1772,7 +1775,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17721775
// dependent on the callback, not something which is controlled by the
17731776
// framework itself.
17741777
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1775-
switchState(StreamingState::EndOfStreaming);
1778+
switchState(ref, StreamingState::EndOfStreaming);
17761779
state.lastActiveDataProcessor = &context;
17771780
}
17781781

@@ -1818,7 +1821,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18181821
}
18191822
// This is needed because the transport is deleted before the device.
18201823
relayer.clear();
1821-
switchState(StreamingState::Idle);
1824+
switchState(ref, StreamingState::Idle);
18221825
// In case we should process, note the data processor responsible for it
18231826
if (shouldProcess) {
18241827
state.lastActiveDataProcessor = &context;
@@ -2328,13 +2331,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23282331
}
23292332
};
23302333

2331-
auto switchState = [ref](StreamingState newState) {
2332-
auto& control = ref.get<ControlService>();
2333-
auto& state = ref.get<DeviceState>();
2334-
state.streaming = newState;
2335-
control.notifyStreamingState(state.streaming);
2336-
};
2337-
23382334
ref.get<DataRelayer>().getReadyToProcess(completed);
23392335
if (completed.empty() == true) {
23402336
LOGP(debug, "No computations available for dispatching.");
@@ -2510,7 +2506,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25102506
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
25112507
} else {
25122508
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2513-
state.streaming = StreamingState::Idle;
2509+
switchState(ref, StreamingState::Idle);
25142510
}
25152511
if (shouldProcess(action)) {
25162512
auto& timingInfo = ref.get<TimingInfo>();
@@ -2598,7 +2594,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25982594
for (auto& channel : spec.outputChannels) {
25992595
DataProcessingHelpers::sendEndOfStream(ref, channel);
26002596
}
2601-
switchState(StreamingState::Idle);
2597+
switchState(ref, StreamingState::Idle);
26022598
}
26032599

26042600
return true;

0 commit comments

Comments
 (0)