Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 27 additions & 35 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -141,20 +141,34 @@ void on_transition_requested_expired(uv_timer_t* handle)
state.transitionHandling = TransitionHandlingState::Expired;
}

auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
{
auto& state = ref.get<DeviceState>();
auto& context = ref.get<DataProcessorContext>();
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
state.streaming = newState;
ref.get<ControlService>().notifyStreamingState(state.streaming);
};

void on_data_processing_expired(uv_timer_t* handle)
{
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
auto& spec = ref->get<DeviceSpec const>();
state.loopReason |= DeviceState::TIMER_EXPIRED;

// Check if this is a source device
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);

// Source devices should never end up in this callback, since the exitTransitionTimeout should
// be reset to the dataProcessingTimeout and the timers cohalesced.
assert(hasOnlyGenerated(ref->get<DeviceSpec const>()) == false);
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
state.allowedProcessing = DeviceState::CalibrationOnly;
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
switchState(*ref, StreamingState::EndOfStreaming);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
state.allowedProcessing = DeviceState::CalibrationOnly;
}
}

void on_communication_requested(uv_async_t* s)
Expand Down Expand Up @@ -1236,7 +1250,7 @@ void DataProcessingDevice::PreRun()
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
state.quitRequested = false;
state.streaming = StreamingState::Streaming;
switchState(ref, StreamingState::Streaming);
state.allowedProcessing = DeviceState::Any;
for (auto& info : state.inputChannelInfos) {
if (info.state != InputChannelState::Pull) {
Expand Down Expand Up @@ -1365,14 +1379,7 @@ void DataProcessingDevice::Run()
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
state.streaming = StreamingState::EndOfStreaming;
}

// If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
// the same (because source devices are not allowed to produce any calibration).
// should be the same.
if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout;
switchState(ref, StreamingState::EndOfStreaming);
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
Expand All @@ -1385,7 +1392,8 @@ void DataProcessingDevice::Run()
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
Expand Down Expand Up @@ -1728,15 +1736,6 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
{
auto& context = ref.get<DataProcessorContext>();
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
auto switchState = [ref](StreamingState newState) {
auto& state = ref.get<DeviceState>();
auto& context = ref.get<DataProcessorContext>();
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
state.streaming = newState;
ref.get<ControlService>().notifyStreamingState(state.streaming);
};
auto& state = ref.get<DeviceState>();
auto& spec = ref.get<DeviceSpec const>();

Expand Down Expand Up @@ -1772,7 +1771,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// dependent on the callback, not something which is controlled by the
// framework itself.
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
switchState(StreamingState::EndOfStreaming);
switchState(ref, StreamingState::EndOfStreaming);
state.lastActiveDataProcessor = &context;
}

Expand Down Expand Up @@ -1818,7 +1817,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
}
// This is needed because the transport is deleted before the device.
relayer.clear();
switchState(StreamingState::Idle);
switchState(ref, StreamingState::Idle);
// In case we should process, note the data processor responsible for it
if (shouldProcess) {
state.lastActiveDataProcessor = &context;
Expand Down Expand Up @@ -2328,13 +2327,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
}
};

auto switchState = [ref](StreamingState newState) {
auto& control = ref.get<ControlService>();
auto& state = ref.get<DeviceState>();
state.streaming = newState;
control.notifyStreamingState(state.streaming);
};

ref.get<DataRelayer>().getReadyToProcess(completed);
if (completed.empty() == true) {
LOGP(debug, "No computations available for dispatching.");
Expand Down Expand Up @@ -2510,7 +2502,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
} else {
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
state.streaming = StreamingState::Idle;
switchState(ref, StreamingState::Idle);
}
if (shouldProcess(action)) {
auto& timingInfo = ref.get<TimingInfo>();
Expand Down Expand Up @@ -2598,7 +2590,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
for (auto& channel : spec.outputChannels) {
DataProcessingHelpers::sendEndOfStream(ref, channel);
}
switchState(StreamingState::Idle);
switchState(ref, StreamingState::Idle);
}

return true;
Expand Down
11 changes: 10 additions & 1 deletion Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataProcessingHeader.h"
#include "Framework/DataProcessingContext.h"
#include "Framework/DataRef.h"
#include "Framework/InputRecord.h"
#include "Framework/InputSpan.h"
Expand Down Expand Up @@ -46,7 +47,6 @@
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <gsl/span>
#include <numeric>
#include <string>

using namespace o2::framework::data_matcher;
Expand All @@ -55,6 +55,8 @@ using DataProcessingHeader = o2::framework::DataProcessingHeader;
using Verbosity = o2::monitoring::Verbosity;

O2_DECLARE_DYNAMIC_LOG(data_relayer);
// Stream which keeps track of the calibration lifetime logic
O2_DECLARE_DYNAMIC_LOG(calibration);

namespace o2::framework
{
Expand Down Expand Up @@ -480,6 +482,13 @@ DataRelayer::RelayChoice
// We are in calibration mode and the data does not have the calibration bit set.
// We do not store it.
if (services.get<DeviceState>().allowedProcessing == DeviceState::ProcessingType::CalibrationOnly && !isCalibrationData(messages[mi])) {
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
"Dropping incoming %zu messages because they are data processing.", nPayloads);
// Actually dropping messages.
for (size_t i = mi; i < mi + nPayloads + 1; i++) {
auto discard = std::move(messages[i]);
}
mi += nPayloads;
continue;
}
Expand Down