Skip to content

Commit 6e6eb7b

Browse files
committed
DPL: Move the processing policies to the DeviceContext
* Drop unneeded helper. * Move the policies to the DeviceContext
1 parent 67a1c64 commit 6e6eb7b

File tree

6 files changed

+16
-26
lines changed

6 files changed

+16
-26
lines changed

Framework/Core/include/Framework/CommonServices.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ struct CommonServices {
5656
return [](InitContext&, void* service) -> void* { return service; };
5757
}
5858

59-
static ServiceSpec deviceContextSpec();
6059
static ServiceSpec dataProcessorContextSpec();
6160
static ServiceSpec driverClientSpec();
6261
static ServiceSpec monitoringSpec();

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers {
7777
class DataProcessingDevice : public fair::mq::Device
7878
{
7979
public:
80-
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
80+
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&);
8181
void Init() final;
8282
void InitTask() final;
8383
void PreRun() final;
@@ -112,7 +112,6 @@ class DataProcessingDevice : public fair::mq::Device
112112
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
113113
std::vector<fair::mq::RegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
114114
std::mutex mRegionInfoMutex;
115-
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
116115
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
117116
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
118117
/// Handle to wake up the main loop from other threads

Framework/Core/include/Framework/DeviceContext.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t;
2121
namespace o2::framework
2222
{
2323
struct ComputingQuotaStats;
24+
struct ProcessingPolicies;
2425

2526
/// Stucture which holds the whole runtime context
2627
/// of a running device which is not stored as
@@ -33,6 +34,7 @@ struct DeviceContext {
3334
int expectedRegionCallbacks = 0;
3435
int exitTransitionTimeout = 0;
3536
int dataProcessingTimeout = 0;
37+
ProcessingPolicies& processingPolicies;
3638
};
3739

3840
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec()
12371237
.kind = ServiceKind::Serial};
12381238
}
12391239

1240-
o2::framework::ServiceSpec CommonServices::deviceContextSpec()
1241-
{
1242-
return ServiceSpec{
1243-
.name = "device-context",
1244-
.init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1245-
return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1246-
},
1247-
.configure = noConfiguration(),
1248-
.kind = ServiceKind::Serial};
1249-
}
1250-
12511240
o2::framework::ServiceSpec CommonServices::dataAllocatorSpec()
12521241
{
12531242
return ServiceSpec{

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "Framework/DataProcessor.h"
1818
#include "Framework/DataSpecUtils.h"
1919
#include "Framework/DeviceState.h"
20+
#include "Framework/DeviceStateEnums.h"
2021
#include "Framework/DispatchPolicy.h"
2122
#include "Framework/DispatchControl.h"
2223
#include "Framework/DanglingContext.h"
@@ -196,11 +197,10 @@ struct locked_execution {
196197
~locked_execution() { ref.unlock(); }
197198
};
198199

199-
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies)
200+
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry)
200201
: mRunningDevice{running},
201202
mConfigRegistry{nullptr},
202-
mServiceRegistry{registry},
203-
mProcessingPolicies{policies}
203+
mServiceRegistry{registry}
204204
{
205205
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
206206
if (key == "cleanup") {
@@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
247247
mHandles.resize(1);
248248

249249
ServiceRegistryRef ref{mServiceRegistry};
250+
250251
mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
251252
auto& state = ref.get<DeviceState>();
252253
assert(state.loop);
@@ -1189,18 +1190,18 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
11891190
errorCallback(errorContext);
11901191
};
11911192
} else {
1192-
context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1193-
&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1193+
context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
11941194
auto& err = error_from_ref(e);
11951195
/// FIXME: we should pass the salt in, so that the message
11961196
/// can access information which were stored in the stream.
11971197
ServiceRegistryRef ref{serviceRegistry, ServiceRegistry::globalDeviceSalt()};
11981198
auto& context = ref.get<DataProcessorContext>();
1199+
auto& deviceContext = ref.get<DeviceContext>();
11991200
O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
12001201
BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
12011202
auto& stats = ref.get<DataProcessingStats>();
12021203
stats.updateStats({(int)ProcessingStatsId::EXCEPTION_COUNT, DataProcessingStats::Op::Add, 1});
1203-
switch (errorPolicy) {
1204+
switch (deviceContext.processingPolicies.error) {
12041205
case TerminationPolicy::QUIT:
12051206
O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
12061207
throw e;
@@ -1211,10 +1212,10 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
12111212
};
12121213
}
12131214

1214-
auto decideEarlyForward = [&context, &spec, this]() -> bool {
1215+
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
12151216
/// We must make sure there is no optional
12161217
/// if we want to optimize the forwarding
1217-
bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1218+
bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
12181219
bool onlyConditions = true;
12191220
bool overriddenEarlyForward = false;
12201221
for (auto& forwarded : spec.forwards) {
@@ -1229,7 +1230,7 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
12291230
break;
12301231
}
12311232
#endif
1232-
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1233+
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
12331234
context.canForwardEarly = false;
12341235
overriddenEarlyForward = true;
12351236
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
@@ -1560,7 +1561,7 @@ void DataProcessingDevice::Run()
15601561
}
15611562
}
15621563

1563-
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1564+
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
15641565
auto& spec = ref.get<DeviceSpec const>();
15651566
/// Cleanup messages which are still pending on exit.
15661567
for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {

Framework/Core/src/runDataProcessing.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10921092
quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
10931093
serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
10941094

1095-
deviceContext = std::make_unique<DeviceContext>();
1095+
deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
10961096
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
10971097
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
10981098
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
10991099
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
11001100

1101-
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
1101+
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);
11021102

11031103
serviceRef.get<RawDeviceService>().setDevice(device.get());
11041104
r.fDevice = std::move(device);

0 commit comments

Comments
 (0)