Skip to content

Commit 6a3a272

Browse files
committed
DPL: move updateStateTransition() and smaller helper functions to DataProcessingHelpers
1 parent c6f8660 commit 6a3a272

File tree

3 files changed

+145
-116
lines changed

3 files changed

+145
-116
lines changed

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ struct ForwardChannelState;
2121
struct OutputChannelInfo;
2222
struct OutputChannelSpec;
2323
struct OutputChannelState;
24+
struct ProcessingPolicies;
25+
struct DeviceSpec;
26+
enum struct StreamingState;
27+
enum struct TransitionHandlingState;
2428

2529
/// Generic helpers for DataProcessing releated functions.
2630
struct DataProcessingHelpers {
@@ -35,6 +39,12 @@ struct DataProcessingHelpers {
3539
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice);
3640
/// Broadcast the oldest possible timeslice to all channels in output
3741
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const& ref, size_t timeslice);
42+
/// change the device StreamingState to newState
43+
static void switchState(ServiceRegistryRef const& ref, StreamingState newState);
44+
/// check if spec is a source devide
45+
static bool hasOnlyGenerated(DeviceSpec const& spec);
46+
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
47+
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
3848
};
3949

4050
} // namespace o2::framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -122,63 +122,6 @@ void on_idle_timer(uv_timer_t* handle)
122122
state->loopReason |= DeviceState::TIMER_EXPIRED;
123123
}
124124

125-
bool hasOnlyTimers(DeviceSpec const& spec)
126-
{
127-
return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
128-
}
129-
130-
bool hasOnlyGenerated(DeviceSpec const& spec)
131-
{
132-
return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
133-
}
134-
135-
void on_transition_requested_expired(uv_timer_t* handle)
136-
{
137-
auto* ref = (ServiceRegistryRef*)handle->data;
138-
auto& state = ref->get<DeviceState>();
139-
state.loopReason |= DeviceState::TIMER_EXPIRED;
140-
// Check if this is a source device
141-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
142-
auto& spec = ref->get<DeviceSpec const>();
143-
std::string messageOnExpire = hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
144-
if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
145-
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
146-
} else {
147-
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
148-
}
149-
state.transitionHandling = TransitionHandlingState::Expired;
150-
}
151-
152-
auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
153-
{
154-
auto& state = ref.get<DeviceState>();
155-
auto& context = ref.get<DataProcessorContext>();
156-
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
157-
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
158-
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
159-
state.streaming = newState;
160-
ref.get<ControlService>().notifyStreamingState(state.streaming);
161-
};
162-
163-
void on_data_processing_expired(uv_timer_t* handle)
164-
{
165-
auto* ref = (ServiceRegistryRef*)handle->data;
166-
auto& state = ref->get<DeviceState>();
167-
auto& spec = ref->get<DeviceSpec const>();
168-
state.loopReason |= DeviceState::TIMER_EXPIRED;
169-
170-
// Check if this is a source device
171-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
172-
173-
if (hasOnlyGenerated(spec)) {
174-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
175-
switchState(*ref, StreamingState::EndOfStreaming);
176-
} else {
177-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
178-
state.allowedProcessing = DeviceState::CalibrationOnly;
179-
}
180-
}
181-
182125
void on_communication_requested(uv_async_t* s)
183126
{
184127
auto* state = (DeviceState*)s->data;
@@ -1260,7 +1203,7 @@ void DataProcessingDevice::PreRun()
12601203
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
12611204
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12621205
state.quitRequested = false;
1263-
switchState(ref, StreamingState::Streaming);
1206+
DataProcessingHelpers::switchState(ref, StreamingState::Streaming);
12641207
state.allowedProcessing = DeviceState::Any;
12651208
for (auto& info : state.inputChannelInfos) {
12661209
if (info.state != InputChannelState::Pull) {
@@ -1331,58 +1274,6 @@ void DataProcessingDevice::Reset()
13311274
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
13321275
}
13331276

1334-
TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
1335-
{
1336-
auto& state = ref.get<DeviceState>();
1337-
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1338-
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1339-
return state.transitionHandling;
1340-
}
1341-
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1342-
auto& deviceContext = ref.get<DeviceContext>();
1343-
// Check if we only have timers
1344-
auto& spec = ref.get<DeviceSpec const>();
1345-
if (hasOnlyTimers(spec)) {
1346-
switchState(ref, StreamingState::EndOfStreaming);
1347-
}
1348-
1349-
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1350-
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1351-
uv_update_time(state.loop);
1352-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1353-
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1354-
}
1355-
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1356-
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1357-
uv_update_time(state.loop);
1358-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1359-
deviceContext.exitTransitionTimeout);
1360-
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1361-
bool onlyGenerated = hasOnlyGenerated(spec);
1362-
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1363-
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1364-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1365-
} else {
1366-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1367-
"New state requested. Waiting for %d seconds before %{public}s",
1368-
timeout,
1369-
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1370-
}
1371-
return TransitionHandlingState::Requested;
1372-
} else {
1373-
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1374-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1375-
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1376-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1377-
} else if (policies.termination == TerminationPolicy::QUIT) {
1378-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1379-
} else {
1380-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1381-
}
1382-
return TransitionHandlingState::Expired;
1383-
}
1384-
}
1385-
13861277
void DataProcessingDevice::Run()
13871278
{
13881279
ServiceRegistryRef ref{mServiceRegistry};
@@ -1435,7 +1326,7 @@ void DataProcessingDevice::Run()
14351326
shouldNotWait = true;
14361327
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
14371328
}
1438-
state.transitionHandling = updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
1329+
state.transitionHandling = DataProcessingHelpers::updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
14391330
// If we are Idle, we can then consider the transition to be expired.
14401331
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14411332
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
@@ -1794,7 +1685,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17941685
// dependent on the callback, not something which is controlled by the
17951686
// framework itself.
17961687
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1797-
switchState(ref, StreamingState::EndOfStreaming);
1688+
DataProcessingHelpers::switchState(ref, StreamingState::EndOfStreaming);
17981689
state.lastActiveDataProcessor = &context;
17991690
}
18001691

@@ -1807,7 +1698,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18071698
/// timers as they do not need to be further processed.
18081699
auto& relayer = ref.get<DataRelayer>();
18091700

1810-
bool shouldProcess = hasOnlyGenerated(spec) == false;
1701+
bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
18111702

18121703
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
18131704
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
@@ -1840,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18401731
}
18411732
// This is needed because the transport is deleted before the device.
18421733
relayer.clear();
1843-
switchState(ref, StreamingState::Idle);
1734+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
18441735
// In case we should process, note the data processor responsible for it
18451736
if (shouldProcess) {
18461737
state.lastActiveDataProcessor = &context;
@@ -2533,7 +2424,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25332424
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
25342425
} else {
25352426
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2536-
switchState(ref, StreamingState::Idle);
2427+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
25372428
}
25382429
if (shouldProcess(action)) {
25392430
auto& timingInfo = ref.get<TimingInfo>();
@@ -2621,7 +2512,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
26212512
for (auto& channel : spec.outputChannels) {
26222513
DataProcessingHelpers::sendEndOfStream(ref, channel);
26232514
}
2624-
switchState(ref, StreamingState::Idle);
2515+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
26252516
}
26262517

26272518
return true;

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,28 @@
2020
#include "Framework/Logger.h"
2121
#include "Framework/SendingPolicy.h"
2222
#include "Framework/RawDeviceService.h"
23+
#include "Framework/DeviceState.h"
24+
#include "Framework/DeviceContext.h"
25+
#include "Framework/ProcessingPolicies.h"
26+
#include "Framework/Signpost.h"
27+
#include "Framework/CallbackService.h"
28+
#include "Framework/DefaultsHelpers.h"
29+
#include "Framework/ServiceRegistryRef.h"
30+
#include "Framework/DeviceSpec.h"
31+
#include "Framework/ControlService.h"
32+
#include "Framework/DataProcessingContext.h"
33+
#include "Framework/DeviceStateEnums.h"
2334

2435
#include <fairmq/Device.h>
2536
#include <fairmq/Channel.h>
2637

38+
#include <uv.h>
39+
40+
// A log to use for general device logging
41+
O2_DECLARE_DYNAMIC_LOG(device);
42+
// Stream which keeps track of the calibration lifetime logic
43+
O2_DECLARE_DYNAMIC_LOG(calibration);
44+
2745
namespace o2::framework
2846
{
2947
void DataProcessingHelpers::sendEndOfStream(ServiceRegistryRef const& ref, OutputChannelSpec const& channel)
@@ -89,4 +107,114 @@ void DataProcessingHelpers::broadcastOldestPossibleTimeslice(ServiceRegistryRef
89107
}
90108
}
91109

110+
void DataProcessingHelpers::switchState(ServiceRegistryRef const& ref, StreamingState newState)
111+
{
112+
auto& state = ref.get<DeviceState>();
113+
auto& context = ref.get<DataProcessorContext>();
114+
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
115+
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
116+
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
117+
state.streaming = newState;
118+
ref.get<ControlService>().notifyStreamingState(state.streaming);
119+
};
120+
121+
bool hasOnlyTimers(DeviceSpec const& spec)
122+
{
123+
return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
124+
}
125+
126+
bool DataProcessingHelpers::hasOnlyGenerated(DeviceSpec const& spec)
127+
{
128+
return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
129+
}
130+
131+
void on_data_processing_expired(uv_timer_t* handle)
132+
{
133+
auto* ref = (ServiceRegistryRef*)handle->data;
134+
auto& state = ref->get<DeviceState>();
135+
auto& spec = ref->get<DeviceSpec const>();
136+
state.loopReason |= DeviceState::TIMER_EXPIRED;
137+
138+
// Check if this is a source device
139+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
140+
141+
if (DataProcessingHelpers::hasOnlyGenerated(spec)) {
142+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
143+
DataProcessingHelpers::switchState(*ref, StreamingState::EndOfStreaming);
144+
} else {
145+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
146+
state.allowedProcessing = DeviceState::CalibrationOnly;
147+
}
148+
}
149+
150+
void on_transition_requested_expired(uv_timer_t* handle)
151+
{
152+
auto* ref = (ServiceRegistryRef*)handle->data;
153+
auto& state = ref->get<DeviceState>();
154+
state.loopReason |= DeviceState::TIMER_EXPIRED;
155+
// Check if this is a source device
156+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
157+
auto& spec = ref->get<DeviceSpec const>();
158+
std::string messageOnExpire = DataProcessingHelpers::hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
159+
if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
160+
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
161+
} else {
162+
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
163+
}
164+
state.transitionHandling = TransitionHandlingState::Expired;
165+
}
166+
167+
TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies)
168+
{
169+
auto& state = ref.get<DeviceState>();
170+
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
171+
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
172+
return state.transitionHandling;
173+
}
174+
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
175+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, state.loop);
176+
auto& deviceContext = ref.get<DeviceContext>();
177+
// Check if we only have timers
178+
auto& spec = ref.get<DeviceSpec const>();
179+
if (hasOnlyTimers(spec)) {
180+
DataProcessingHelpers::switchState(ref, StreamingState::EndOfStreaming);
181+
}
182+
183+
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
184+
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
185+
uv_update_time(state.loop);
186+
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
187+
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
188+
}
189+
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
190+
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
191+
uv_update_time(state.loop);
192+
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
193+
deviceContext.exitTransitionTimeout);
194+
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
195+
bool onlyGenerated = DataProcessingHelpers::hasOnlyGenerated(spec);
196+
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
197+
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
198+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
199+
} else {
200+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
201+
"New state requested. Waiting for %d seconds before %{public}s",
202+
timeout,
203+
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
204+
}
205+
return TransitionHandlingState::Requested;
206+
} else {
207+
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
208+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
209+
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
210+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
211+
} else if (policies.termination == TerminationPolicy::QUIT) {
212+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
213+
} else {
214+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
215+
}
216+
return TransitionHandlingState::Expired;
217+
}
218+
}
219+
92220
} // namespace o2::framework

0 commit comments

Comments
 (0)