Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ struct formatter<o2::framework::CompletionPolicy::CompletionOp> : ostream_format

// A log to use for general device logging
O2_DECLARE_DYNAMIC_LOG(device);
// A log to use for general device logging
O2_DECLARE_DYNAMIC_LOG(sockets);
// Special log to keep track of the lifetime of the parts
O2_DECLARE_DYNAMIC_LOG(parts);
// Stream which keeps track of the calibration lifetime logic
Expand Down Expand Up @@ -339,21 +341,21 @@ void on_socket_polled(uv_poll_t* poller, int status, int events)
{
auto* context = (PollerContext*)poller->data;
assert(context);
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
switch (events) {
case UV_READABLE: {
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_INCOMING;
} break;
case UV_WRITABLE: {
O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
if (context->read) {
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
context->state->loopReason |= DeviceState::DATA_CONNECTED;
} else {
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_OUTGOING;
// If the socket is writable, fairmq will handle the rest, so we can stop polling and
// just wait for the disconnect.
Expand All @@ -362,18 +364,18 @@ void on_socket_polled(uv_poll_t* poller, int status, int events)
context->pollerState = PollerContext::PollerState::Connected;
} break;
case UV_DISCONNECT: {
O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
} break;
case UV_PRIORITIZED: {
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
} break;
}
// We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
}

void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
{
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
auto* context = (PollerContext*)poller->data;
context->state->loopReason |= DeviceState::OOB_ACTIVITY;
if (status < 0) {
Expand All @@ -382,27 +384,27 @@ void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
}
switch (events) {
case UV_READABLE: {
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_INCOMING;
assert(context->channelInfo);
context->channelInfo->readPolled = true;
} break;
case UV_WRITABLE: {
O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
if (context->read) {
O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
} else {
O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_OUTGOING;
}
} break;
case UV_DISCONNECT: {
O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
} break;
case UV_PRIORITIZED: {
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
} break;
}
// We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
Expand Down