Skip to content

Commit 1036f12

Browse files
committed
DPL: add Signposts for some of the DataProcessing / Stream callbacks
1 parent a6b7699 commit 1036f12

File tree

6 files changed

+96
-67
lines changed

6 files changed

+96
-67
lines changed

Framework/Core/include/Framework/DeviceState.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ struct DeviceState {
5959

6060
enum LogStreams : int {
6161
NO_LOG = 0,
62-
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
63-
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
64-
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
62+
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
63+
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
64+
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
65+
DATA_PROCESSOR_CONTEXT_LOG = 1 << 3, // Log for the DataProcessorContext callbacks
66+
STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks
6567
};
6668

6769
std::vector<InputChannelInfo> inputChannelInfos;

Framework/Core/src/CommonServices.cxx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include <Configuration/ConfigurationInterface.h>
5858
#include <Configuration/ConfigurationFactory.h>
5959
#include <Monitoring/MonitoringFactory.h>
60+
#include "Framework/Signpost.h"
6061

6162
#include <fairmq/Device.h>
6263
#include <fairmq/shmem/Monitor.h>
@@ -79,6 +80,9 @@ using Value = o2::monitoring::tags::Value;
7980
#pragma GCC diagnostic push
8081
#pragma GCC diagnostic ignored "-Wpedantic"
8182

83+
O2_DECLARE_DYNAMIC_LOG(data_processor_context);
84+
O2_DECLARE_DYNAMIC_LOG(stream_context);
85+
8286
namespace o2::framework
8387
{
8488

@@ -188,6 +192,8 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
188192
}
189193
}
190194
if (didCreate == false && messageContext.didDispatch() == true) {
195+
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
196+
O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band");
191197
LOGP(debug, "Data created out of band");
192198
return;
193199
}

Framework/Core/src/DataProcessingContext.cxx

Lines changed: 61 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,145 +10,144 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "Framework/DataProcessingContext.h"
13+
#include "Framework/DataProcessorSpec.h"
14+
#include "Framework/Signpost.h"
1315

16+
O2_DECLARE_DYNAMIC_LOG(data_processor_context);
1417
namespace o2::framework
1518
{
19+
20+
namespace
21+
{
22+
template <typename T, typename... ARGS>
23+
void invokeAll(T& handles, char const* callbackName, o2::framework::DataProcessorSpec* spec, ARGS&... args)
24+
{
25+
assert(callbackName);
26+
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, spec);
27+
// FIXME: for now spec is nullptr because we don't have a list of possible DataProcessorSpecs
28+
// per device.
29+
char const* dataProcessorName = spec ? spec->name.c_str() : "DataProcessorContext";
30+
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting %{public}s::%{public}s", dataProcessorName, callbackName);
31+
for (auto& handle : handles) {
32+
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
33+
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
34+
handle.callback(args..., handle.service);
35+
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
36+
}
37+
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending %{public}s::%{public}s", dataProcessorName, callbackName);
38+
}
39+
} // namespace
40+
1641
/// Invoke callbacks to be executed before every dangling check
1742
void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx)
1843
{
19-
for (auto& handle : preProcessingHandlers) {
20-
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
21-
handle.callback(ctx, handle.service);
22-
}
44+
invokeAll(preProcessingHandlers, "preProcessingCallbacks", spec, ctx);
2345
}
2446

2547
void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx)
2648
{
27-
for (auto& handle : finaliseOutputsHandles) {
28-
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
29-
handle.callback(ctx, handle.service);
30-
}
49+
invokeAll(finaliseOutputsHandles, "finaliseOutputsCallbacks", spec, ctx);
3150
}
3251

3352
/// Invoke callbacks to be executed before every dangling check
3453
void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx)
3554
{
36-
for (auto& handle : postProcessingHandlers) {
37-
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
38-
handle.callback(ctx, handle.service);
39-
}
55+
invokeAll(postProcessingHandlers, "postProcessingCallbacks", spec, ctx);
4056
}
4157

4258
/// Invoke callbacks to be executed before every dangling check
43-
void DataProcessorContext::preDanglingCallbacks(DanglingContext& danglingContext)
59+
void DataProcessorContext::preDanglingCallbacks(DanglingContext& ctx)
4460
{
45-
for (auto& handle : preDanglingHandles) {
46-
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
47-
handle.callback(danglingContext, handle.service);
48-
}
61+
invokeAll(preDanglingHandles, "preDanglingCallbacks", spec, ctx);
4962
}
5063

5164
/// Invoke callbacks to be executed after every dangling check
52-
void DataProcessorContext::postDanglingCallbacks(DanglingContext& danglingContext)
65+
void DataProcessorContext::postDanglingCallbacks(DanglingContext& ctx)
5366
{
54-
for (auto& handle : postDanglingHandles) {
55-
LOGP(debug, "Invoking postDanglingCallback for service {}", handle.spec.name);
56-
handle.callback(danglingContext, handle.service);
57-
}
67+
invokeAll(postDanglingHandles, "postDanglingCallbacks", spec, ctx);
5868
}
5969

6070
/// Invoke callbacks to be executed before every EOS user callback invokation
61-
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& eosContext)
71+
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& ctx)
6272
{
63-
for (auto& handle : preEOSHandles) {
64-
LOGP(detail, "Invoking preEosCallback for service {}", handle.spec.name);
65-
handle.callback(eosContext, handle.service);
66-
}
73+
invokeAll(preEOSHandles, "preEOSCallbacks", spec, ctx);
6774
}
6875

6976
/// Invoke callbacks to be executed after every EOS user callback invokation
70-
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& eosContext)
77+
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& ctx)
7178
{
72-
for (auto& handle : postEOSHandles) {
73-
LOGP(detail, "Invoking postEoSCallback for service {}", handle.spec.name);
74-
handle.callback(eosContext, handle.service);
75-
}
79+
invokeAll(postEOSHandles, "postEOSCallbacks", spec, ctx);
7680
}
7781

7882
/// Invoke callbacks to be executed after every data Dispatching
79-
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& processContext)
83+
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& ctx)
8084
{
81-
for (auto& handle : postDispatchingHandles) {
82-
LOGP(debug, "Invoking postDispatchingCallback for service {}", handle.spec.name);
83-
handle.callback(processContext, handle.service);
84-
}
85+
invokeAll(postDispatchingHandles, "postDispatchingCallbacks", spec, ctx);
8586
}
8687

8788
/// Invoke callbacks to be executed after every data Dispatching
88-
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& processContext)
89+
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& ctx)
8990
{
90-
for (auto& handle : postForwardingHandles) {
91-
LOGP(debug, "Invoking postForwardingCallback for service {}", handle.spec.name);
92-
handle.callback(processContext, handle.service);
93-
}
91+
invokeAll(postForwardingHandles, "postForwardingCallbacks", spec, ctx);
9492
}
9593

9694
/// Callbacks to be called in fair::mq::Device::PreRun()
9795
void DataProcessorContext::preStartCallbacks(ServiceRegistryRef ref)
9896
{
99-
for (auto& handle : preStartHandles) {
100-
LOGP(detail, "Invoking preStartCallback for service {}", handle.spec.name);
101-
handle.callback(ref, handle.service);
102-
}
97+
invokeAll(preStartHandles, "preStartCallbacks", spec, ref);
10398
}
10499

105100
void DataProcessorContext::postStopCallbacks(ServiceRegistryRef ref)
106101
{
107-
// FIXME: we need to call the callback only once for the global services
108-
/// I guess...
109-
for (auto& handle : postStopHandles) {
110-
LOGP(detail, "Invoking postStopCallback for service {}", handle.spec.name);
111-
handle.callback(ref, handle.service);
112-
}
102+
invokeAll(postStopHandles, "postStopCallbacks", spec, ref);
113103
}
114104

115105
/// Invoke callback to be executed on exit, in reverse order.
116106
void DataProcessorContext::preExitCallbacks(std::vector<ServiceExitHandle> handles, ServiceRegistryRef ref)
117107
{
108+
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, &ref);
109+
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preExitCallbacks");
118110
// FIXME: we need to call the callback only once for the global services
119111
/// I guess...
120112
for (auto handle = handles.rbegin(); handle != handles.rend(); ++handle) {
121-
LOGP(detail, "Invoking preExitCallback for service {}", handle->spec.name);
113+
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle->service);
114+
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
122115
handle->callback(ref, handle->service);
116+
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
123117
}
118+
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preExitCallbacks");
124119
}
125120

126121
/// Invoke callback to be executed on exit, in reverse order.
127122
void DataProcessorContext::preLoopCallbacks(ServiceRegistryRef ref)
128123
{
129-
// FIXME: we need to call the callback only once for the global services
130-
/// I guess...
131-
LOGP(debug, "Invoking preLoopCallbacks");
132-
for (auto& handle : preLoopHandles) {
133-
LOGP(debug, "Invoking preLoopCallback for service {}", handle.spec.name);
134-
handle.callback(ref, handle.service);
135-
}
124+
invokeAll(preLoopHandles, "preLoopCallbacks", spec, ref);
136125
}
137126

138127
void DataProcessorContext::domainInfoUpdatedCallback(ServiceRegistryRef ref, size_t oldestPossibleTimeslice, ChannelIndex channelIndex)
139128
{
129+
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
130+
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext domainInfoUpdatedCallback");
140131
for (auto& handle : domainInfoHandles) {
141-
LOGP(debug, "Invoking domainInfoHandles for service {}", handle.spec.name);
132+
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
133+
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
142134
handle.callback(ref, oldestPossibleTimeslice, channelIndex);
135+
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
143136
}
137+
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext domainInfoUpdatedCallback");
144138
}
145139

146140
void DataProcessorContext::preSendingMessagesCallbacks(ServiceRegistryRef ref, fair::mq::Parts& parts, ChannelIndex channelIndex)
147141
{
142+
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
143+
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preSendingMessagesCallbacks");
148144
for (auto& handle : preSendingMessagesHandles) {
149-
LOGP(debug, "Invoking preSending for service {}", handle.spec.name);
145+
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
146+
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
150147
handle.callback(ref, parts, channelIndex);
148+
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
151149
}
150+
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preSendingMessagesCallbacks");
152151
}
153152

154153
} // namespace o2::framework

Framework/Core/src/StreamContext.cxx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
#include "Framework/StreamContext.h"
1313

14+
#include "Framework/Signpost.h"
15+
16+
O2_DECLARE_DYNAMIC_LOG(stream_context);
17+
1418
namespace o2::framework
1519
{
1620

@@ -49,12 +53,17 @@ void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx)
4953
/// Invoke callbacks to be executed after every process method invokation
5054
void StreamContext::postProcessingCallbacks(ProcessingContext& pcx)
5155
{
56+
O2_SIGNPOST_ID_FROM_POINTER(dpid, stream_context, &pcx);
57+
O2_SIGNPOST_START(stream_context, dpid, "callbacks", "Starting StreamContext postProcessingCallbacks");
5258
for (auto& handle : postProcessingHandles) {
53-
LOG(debug) << "Invoking postProcessingCallbacks for " << handle.service;
59+
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, handle.service);
60+
O2_SIGNPOST_START(stream_context, cid, "callbacks", "Starting StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
5461
assert(handle.service);
5562
assert(handle.callback);
5663
handle.callback(pcx, handle.service);
64+
O2_SIGNPOST_END(stream_context, cid, "callbacks", "Ending StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
5765
}
66+
O2_SIGNPOST_END(stream_context, dpid, "callbacks", "Ending StreamContext postProcessingCallbacks");
5867
}
5968

6069
/// Invoke callbacks to be executed before every EOS user callback invokation

Framework/Core/src/WSDriverClient.cxx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
O2_DECLARE_DYNAMIC_LOG(device);
2525
O2_DECLARE_DYNAMIC_LOG(completion);
2626
O2_DECLARE_DYNAMIC_LOG(monitoring_service);
27+
O2_DECLARE_DYNAMIC_LOG(data_processor_context);
28+
O2_DECLARE_DYNAMIC_LOG(stream_context);
2729

2830
namespace o2::framework
2931
{
@@ -184,12 +186,21 @@ void on_connect(uv_connect_t* connection, int status)
184186
} else {
185187
O2_LOG_DISABLE(completion);
186188
}
187-
188189
if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) {
189190
O2_LOG_ENABLE(monitoring_service);
190191
} else {
191192
O2_LOG_DISABLE(monitoring_service);
192193
}
194+
if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) {
195+
O2_LOG_ENABLE(data_processor_context);
196+
} else {
197+
O2_LOG_DISABLE(data_processor_context);
198+
}
199+
if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
200+
O2_LOG_ENABLE(stream_context);
201+
} else {
202+
O2_LOG_DISABLE(stream_context);
203+
}
193204
});
194205

195206
// Client will be filled in the line after. I can probably have a single

Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ void displayDeviceInspector(DeviceSpec const& spec,
405405
logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG);
406406
logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG);
407407
logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG);
408+
logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG);
409+
logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG);
408410
if (logsChanged && control.controller) {
409411
std::string cmd = fmt::format("/log-streams {}", control.logStreams);
410412
control.controller->write(cmd.c_str(), cmd.size());

0 commit comments

Comments
 (0)