@@ -1331,6 +1331,58 @@ void DataProcessingDevice::Reset()
13311331 ref.get <CallbackService>().call <CallbackService::Id::Reset>();
13321332}
13331333
1334+ TransitionHandlingState updateStateTransition (ServiceRegistryRef& ref, ProcessingPolicies const & policies)
1335+ {
1336+ auto & state = ref.get <DeviceState>();
1337+ auto & deviceProxy = ref.get <FairMQDeviceProxy>();
1338+ if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested () == false ) {
1339+ return state.transitionHandling ;
1340+ }
1341+ O2_SIGNPOST_ID_FROM_POINTER (lid, device, state.loop );
1342+ auto & deviceContext = ref.get <DeviceContext>();
1343+ // Check if we only have timers
1344+ auto & spec = ref.get <DeviceSpec const >();
1345+ if (hasOnlyTimers (spec)) {
1346+ switchState (ref, StreamingState::EndOfStreaming);
1347+ }
1348+
1349+ // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1350+ if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout ) {
1351+ uv_update_time (state.loop );
1352+ O2_SIGNPOST_EVENT_EMIT (calibration, lid, " timer_setup" , " Starting %d s timer for dataProcessingTimeout." , deviceContext.dataProcessingTimeout );
1353+ uv_timer_start (deviceContext.dataProcessingGracePeriodTimer , on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000 , 0 );
1354+ }
1355+ if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1356+ ref.get <CallbackService>().call <CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1357+ uv_update_time (state.loop );
1358+ O2_SIGNPOST_EVENT_EMIT (calibration, lid, " timer_setup" , " Starting %d s timer for exitTransitionTimeout." ,
1359+ deviceContext.exitTransitionTimeout );
1360+ uv_timer_start (deviceContext.gracePeriodTimer , on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000 , 0 );
1361+ bool onlyGenerated = hasOnlyGenerated (spec);
1362+ int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout ;
1363+ if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode () == false ) {
1364+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. Waiting for %d seconds before quitting." , timeout);
1365+ } else {
1366+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" ,
1367+ " New state requested. Waiting for %d seconds before %{public}s" ,
1368+ timeout,
1369+ onlyGenerated ? " dropping remaining input and switching to READY state." : " switching to READY state." );
1370+ }
1371+ return TransitionHandlingState::Requested;
1372+ } else {
1373+ if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1374+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. No timeout set, quitting immediately as per --completion-policy" );
1375+ } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1376+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. No timeout set, switching to READY state immediately" );
1377+ } else if (policies.termination == TerminationPolicy::QUIT) {
1378+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state pending and we are already idle, quitting immediately as per --completion-policy" );
1379+ } else {
1380+ O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state pending and we are already idle, switching to READY immediately." );
1381+ }
1382+ return TransitionHandlingState::Expired;
1383+ }
1384+ }
1385+
13341386void DataProcessingDevice::Run ()
13351387{
13361388 ServiceRegistryRef ref{mServiceRegistry };
@@ -1383,51 +1435,7 @@ void DataProcessingDevice::Run()
13831435 shouldNotWait = true ;
13841436 state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
13851437 }
1386- if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending ()) {
1387- state.transitionHandling = TransitionHandlingState::Requested;
1388- auto & deviceContext = ref.get <DeviceContext>();
1389- // Check if we only have timers
1390- auto & spec = ref.get <DeviceSpec const >();
1391- if (hasOnlyTimers (spec)) {
1392- switchState (ref, StreamingState::EndOfStreaming);
1393- }
1394-
1395- // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1396- if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout ) {
1397- uv_update_time (state.loop );
1398- O2_SIGNPOST_EVENT_EMIT (calibration, lid, " timer_setup" , " Starting %d s timer for dataProcessingTimeout." , deviceContext.dataProcessingTimeout );
1399- uv_timer_start (deviceContext.dataProcessingGracePeriodTimer , on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000 , 0 );
1400- }
1401- if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1402- state.transitionHandling = TransitionHandlingState::Requested;
1403- ref.get <CallbackService>().call <CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1404- uv_update_time (state.loop );
1405- O2_SIGNPOST_EVENT_EMIT (calibration, lid, " timer_setup" , " Starting %d s timer for exitTransitionTimeout." ,
1406- deviceContext.exitTransitionTimeout );
1407- uv_timer_start (deviceContext.gracePeriodTimer , on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000 , 0 );
1408- bool onlyGenerated = hasOnlyGenerated (spec);
1409- int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout ;
1410- if (mProcessingPolicies .termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode () == false ) {
1411- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. Waiting for %d seconds before quitting." , timeout);
1412- } else {
1413- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" ,
1414- " New state requested. Waiting for %d seconds before %{public}s" ,
1415- timeout,
1416- onlyGenerated ? " dropping remaining input and switching to READY state." : " switching to READY state." );
1417- }
1418- } else {
1419- state.transitionHandling = TransitionHandlingState::Expired;
1420- if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies .termination == TerminationPolicy::QUIT) {
1421- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. No timeout set, quitting immediately as per --completion-policy" );
1422- } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies .termination != TerminationPolicy::QUIT) {
1423- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state requested. No timeout set, switching to READY state immediately" );
1424- } else if (mProcessingPolicies .termination == TerminationPolicy::QUIT) {
1425- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state pending and we are already idle, quitting immediately as per --completion-policy" );
1426- } else {
1427- O2_SIGNPOST_EVENT_EMIT_INFO (device, lid, " run_loop" , " New state pending and we are already idle, switching to READY immediately." );
1428- }
1429- }
1430- }
1438+ state.transitionHandling = updateStateTransition (ref, ref.get <DeviceContext>().processingPolicies );
14311439 // If we are Idle, we can then consider the transition to be expired.
14321440 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14331441 O2_SIGNPOST_EVENT_EMIT (device, lid, " run_loop" , " State transition requested and we are now in Idle. We can consider it to be completed." );
0 commit comments