Skip to content

Commit a52acfc

Browse files
committed
DPL: move updateStateTransition() and smaller helper functions to DataProcessingHelpers
1 parent 17b6c77 commit a52acfc

File tree

3 files changed

+145
-116
lines changed

3 files changed

+145
-116
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ struct ForwardChannelState;
2121
struct OutputChannelInfo;
2222
struct OutputChannelSpec;
2323
struct OutputChannelState;
24+
struct ProcessingPolicies;
25+
struct DeviceSpec;
26+
enum struct StreamingState;
27+
enum struct TransitionHandlingState;
2428

2529
/// Generic helpers for DataProcessing releated functions.
2630
struct DataProcessingHelpers {
@@ -35,6 +39,12 @@ struct DataProcessingHelpers {
3539
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice);
3640
/// Broadcast the oldest possible timeslice to all channels in output
3741
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const& ref, size_t timeslice);
42+
/// change the device StreamingState to newState
43+
static void switchState(ServiceRegistryRef const& ref, StreamingState newState);
44+
/// check if spec is a source devide
45+
static bool hasOnlyGenerated(DeviceSpec const& spec);
46+
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
47+
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
3848
};
3949

4050
} // namespace o2::framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -125,63 +125,6 @@ void on_idle_timer(uv_timer_t* handle)
125125
state->loopReason |= DeviceState::TIMER_EXPIRED;
126126
}
127127

128-
bool hasOnlyTimers(DeviceSpec const& spec)
129-
{
130-
return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
131-
}
132-
133-
bool hasOnlyGenerated(DeviceSpec const& spec)
134-
{
135-
return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
136-
}
137-
138-
void on_transition_requested_expired(uv_timer_t* handle)
139-
{
140-
auto* ref = (ServiceRegistryRef*)handle->data;
141-
auto& state = ref->get<DeviceState>();
142-
state.loopReason |= DeviceState::TIMER_EXPIRED;
143-
// Check if this is a source device
144-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
145-
auto& spec = ref->get<DeviceSpec const>();
146-
std::string messageOnExpire = hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
147-
if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
148-
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
149-
} else {
150-
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
151-
}
152-
state.transitionHandling = TransitionHandlingState::Expired;
153-
}
154-
155-
auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
156-
{
157-
auto& state = ref.get<DeviceState>();
158-
auto& context = ref.get<DataProcessorContext>();
159-
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
160-
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
161-
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
162-
state.streaming = newState;
163-
ref.get<ControlService>().notifyStreamingState(state.streaming);
164-
};
165-
166-
void on_data_processing_expired(uv_timer_t* handle)
167-
{
168-
auto* ref = (ServiceRegistryRef*)handle->data;
169-
auto& state = ref->get<DeviceState>();
170-
auto& spec = ref->get<DeviceSpec const>();
171-
state.loopReason |= DeviceState::TIMER_EXPIRED;
172-
173-
// Check if this is a source device
174-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
175-
176-
if (hasOnlyGenerated(spec)) {
177-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
178-
switchState(*ref, StreamingState::EndOfStreaming);
179-
} else {
180-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
181-
state.allowedProcessing = DeviceState::CalibrationOnly;
182-
}
183-
}
184-
185128
void on_communication_requested(uv_async_t* s)
186129
{
187130
auto* state = (DeviceState*)s->data;
@@ -1267,7 +1210,7 @@ void DataProcessingDevice::PreRun()
12671210
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
12681211
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12691212
state.quitRequested = false;
1270-
switchState(ref, StreamingState::Streaming);
1213+
DataProcessingHelpers::switchState(ref, StreamingState::Streaming);
12711214
state.allowedProcessing = DeviceState::Any;
12721215
for (auto& info : state.inputChannelInfos) {
12731216
if (info.state != InputChannelState::Pull) {
@@ -1338,58 +1281,6 @@ void DataProcessingDevice::Reset()
13381281
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
13391282
}
13401283

1341-
TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
1342-
{
1343-
auto& state = ref.get<DeviceState>();
1344-
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1345-
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1346-
return state.transitionHandling;
1347-
}
1348-
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1349-
auto& deviceContext = ref.get<DeviceContext>();
1350-
// Check if we only have timers
1351-
auto& spec = ref.get<DeviceSpec const>();
1352-
if (hasOnlyTimers(spec)) {
1353-
switchState(ref, StreamingState::EndOfStreaming);
1354-
}
1355-
1356-
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1357-
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1358-
uv_update_time(state.loop);
1359-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1360-
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1361-
}
1362-
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1363-
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1364-
uv_update_time(state.loop);
1365-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1366-
deviceContext.exitTransitionTimeout);
1367-
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1368-
bool onlyGenerated = hasOnlyGenerated(spec);
1369-
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1370-
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1371-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1372-
} else {
1373-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1374-
"New state requested. Waiting for %d seconds before %{public}s",
1375-
timeout,
1376-
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1377-
}
1378-
return TransitionHandlingState::Requested;
1379-
} else {
1380-
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1381-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1382-
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1383-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1384-
} else if (policies.termination == TerminationPolicy::QUIT) {
1385-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1386-
} else {
1387-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1388-
}
1389-
return TransitionHandlingState::Expired;
1390-
}
1391-
}
1392-
13931284
void DataProcessingDevice::Run()
13941285
{
13951286
ServiceRegistryRef ref{mServiceRegistry};
@@ -1442,7 +1333,7 @@ void DataProcessingDevice::Run()
14421333
shouldNotWait = true;
14431334
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
14441335
}
1445-
state.transitionHandling = updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
1336+
state.transitionHandling = DataProcessingHelpers::updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
14461337
// If we are Idle, we can then consider the transition to be expired.
14471338
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14481339
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
@@ -1828,7 +1719,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18281719
// dependent on the callback, not something which is controlled by the
18291720
// framework itself.
18301721
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1831-
switchState(ref, StreamingState::EndOfStreaming);
1722+
DataProcessingHelpers::switchState(ref, StreamingState::EndOfStreaming);
18321723
state.lastActiveDataProcessor = &context;
18331724
}
18341725

@@ -1841,7 +1732,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18411732
/// timers as they do not need to be further processed.
18421733
auto& relayer = ref.get<DataRelayer>();
18431734

1844-
bool shouldProcess = hasOnlyGenerated(spec) == false;
1735+
bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
18451736

18461737
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
18471738
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
@@ -1874,7 +1765,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18741765
}
18751766
// This is needed because the transport is deleted before the device.
18761767
relayer.clear();
1877-
switchState(ref, StreamingState::Idle);
1768+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
18781769
// In case we should process, note the data processor responsible for it
18791770
if (shouldProcess) {
18801771
state.lastActiveDataProcessor = &context;
@@ -2567,7 +2458,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25672458
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
25682459
} else {
25692460
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2570-
switchState(ref, StreamingState::Idle);
2461+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
25712462
}
25722463
if (shouldProcess(action)) {
25732464
auto& timingInfo = ref.get<TimingInfo>();
@@ -2655,7 +2546,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
26552546
for (auto& channel : spec.outputChannels) {
26562547
DataProcessingHelpers::sendEndOfStream(ref, channel);
26572548
}
2658-
switchState(ref, StreamingState::Idle);
2549+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
26592550
}
26602551

26612552
return true;

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,28 @@
2020
#include "Framework/Logger.h"
2121
#include "Framework/SendingPolicy.h"
2222
#include "Framework/RawDeviceService.h"
23+
#include "Framework/DeviceState.h"
24+
#include "Framework/DeviceContext.h"
25+
#include "Framework/ProcessingPolicies.h"
26+
#include "Framework/Signpost.h"
27+
#include "Framework/CallbackService.h"
28+
#include "Framework/DefaultsHelpers.h"
29+
#include "Framework/ServiceRegistryRef.h"
30+
#include "Framework/DeviceSpec.h"
31+
#include "Framework/ControlService.h"
32+
#include "Framework/DataProcessingContext.h"
33+
#include "Framework/DeviceStateEnums.h"
2334

2435
#include <fairmq/Device.h>
2536
#include <fairmq/Channel.h>
2637

38+
#include <uv.h>
39+
40+
// A log to use for general device logging
41+
O2_DECLARE_DYNAMIC_LOG(device);
42+
// Stream which keeps track of the calibration lifetime logic
43+
O2_DECLARE_DYNAMIC_LOG(calibration);
44+
2745
namespace o2::framework
2846
{
2947
void DataProcessingHelpers::sendEndOfStream(ServiceRegistryRef const& ref, OutputChannelSpec const& channel)
@@ -89,4 +107,114 @@ void DataProcessingHelpers::broadcastOldestPossibleTimeslice(ServiceRegistryRef
89107
}
90108
}
91109

110+
void DataProcessingHelpers::switchState(ServiceRegistryRef const& ref, StreamingState newState)
111+
{
112+
auto& state = ref.get<DeviceState>();
113+
auto& context = ref.get<DataProcessorContext>();
114+
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
115+
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
116+
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
117+
state.streaming = newState;
118+
ref.get<ControlService>().notifyStreamingState(state.streaming);
119+
};
120+
121+
bool hasOnlyTimers(DeviceSpec const& spec)
122+
{
123+
return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
124+
}
125+
126+
bool DataProcessingHelpers::hasOnlyGenerated(DeviceSpec const& spec)
127+
{
128+
return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
129+
}
130+
131+
void on_data_processing_expired(uv_timer_t* handle)
132+
{
133+
auto* ref = (ServiceRegistryRef*)handle->data;
134+
auto& state = ref->get<DeviceState>();
135+
auto& spec = ref->get<DeviceSpec const>();
136+
state.loopReason |= DeviceState::TIMER_EXPIRED;
137+
138+
// Check if this is a source device
139+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
140+
141+
if (DataProcessingHelpers::hasOnlyGenerated(spec)) {
142+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
143+
DataProcessingHelpers::switchState(*ref, StreamingState::EndOfStreaming);
144+
} else {
145+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
146+
state.allowedProcessing = DeviceState::CalibrationOnly;
147+
}
148+
}
149+
150+
void on_transition_requested_expired(uv_timer_t* handle)
151+
{
152+
auto* ref = (ServiceRegistryRef*)handle->data;
153+
auto& state = ref->get<DeviceState>();
154+
state.loopReason |= DeviceState::TIMER_EXPIRED;
155+
// Check if this is a source device
156+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
157+
auto& spec = ref->get<DeviceSpec const>();
158+
std::string messageOnExpire = DataProcessingHelpers::hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
159+
if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
160+
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
161+
} else {
162+
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
163+
}
164+
state.transitionHandling = TransitionHandlingState::Expired;
165+
}
166+
167+
TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies)
168+
{
169+
auto& state = ref.get<DeviceState>();
170+
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
171+
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
172+
return state.transitionHandling;
173+
}
174+
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
175+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, state.loop);
176+
auto& deviceContext = ref.get<DeviceContext>();
177+
// Check if we only have timers
178+
auto& spec = ref.get<DeviceSpec const>();
179+
if (hasOnlyTimers(spec)) {
180+
DataProcessingHelpers::switchState(ref, StreamingState::EndOfStreaming);
181+
}
182+
183+
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
184+
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
185+
uv_update_time(state.loop);
186+
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
187+
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
188+
}
189+
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
190+
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
191+
uv_update_time(state.loop);
192+
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
193+
deviceContext.exitTransitionTimeout);
194+
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
195+
bool onlyGenerated = DataProcessingHelpers::hasOnlyGenerated(spec);
196+
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
197+
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
198+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
199+
} else {
200+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
201+
"New state requested. Waiting for %d seconds before %{public}s",
202+
timeout,
203+
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
204+
}
205+
return TransitionHandlingState::Requested;
206+
} else {
207+
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
208+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
209+
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
210+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
211+
} else if (policies.termination == TerminationPolicy::QUIT) {
212+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
213+
} else {
214+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
215+
}
216+
return TransitionHandlingState::Expired;
217+
}
218+
}
219+
92220
} // namespace o2::framework

0 commit comments

Comments
 (0)