Skip to content

Commit 238bc3e

Browse files
committed
DPL: add a poller to check for the metric-feedback channel
In the AOD reader it might happen that the memory rate limiting prevents proper handling of the messages coming on the metric-feedback channel. This obviates to the issue by adding an explicit poller for such channel. While doing so, I also had to change the rate limiter to account for the extra polling information, to avoid having a busy loop.
1 parent eb67b94 commit 238bc3e

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,13 @@ void DataProcessingDevice::initPollers()
851851
continue;
852852
}
853853

854-
if (channelName.rfind("from_", 0) != 0) {
855-
LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
854+
if (channelName.rfind("from_", 0) != 0 && channelName != "metric-feedback") {
855+
LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName);
856+
continue;
857+
}
858+
859+
if (channelName == "metric-feedback" && spec.name.rfind("internal-dpl-aod-reader", 0) != 0) {
860+
LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName);
856861
continue;
857862
}
858863

Framework/Core/src/RateLimiter.cxx

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
#include "Framework/DeviceContext.h"
1919
#include "Framework/Signpost.h"
2020

21+
#include <fairmq/Channel.h>
2122
#include <fairmq/Device.h>
22-
#include <uv.h>
2323
#include <fairmq/shmem/Monitor.h>
2424
#include <fairmq/shmem/Common.h>
25+
#include <uv.h>
2526
#include <chrono>
2627
#include <thread>
2728

@@ -32,15 +33,31 @@ using namespace o2::framework;
3233
int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
3334
{
3435
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
36+
// Feature is not enabled. Nothing to do.
3537
if (!maxInFlight && !minSHM) {
3638
return 0;
3739
}
38-
auto device = ctx.services().get<RawDeviceService>().device();
39-
auto& deviceState = ctx.services().get<DeviceState>();
40-
if (maxInFlight && device->GetChannels().count("metric-feedback")) {
40+
// Let's check for the metric channel.
41+
InputChannelInfo* channelInfo = nullptr;
42+
for (auto& info : ctx.services().get<DeviceState>().inputChannelInfos) {
43+
if (info.channel->GetName() == "metric-feedback") {
44+
channelInfo = &info;
45+
}
46+
}
47+
if (!channelInfo) {
48+
return 0;
49+
}
50+
// No new data on channel. Nothing to do.
51+
if (!channelInfo->readPolled) {
52+
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
53+
"Socket not polled. No need to check.");
54+
55+
return 0;
56+
}
57+
if (maxInFlight && channelInfo) {
4158
auto& dtc = ctx.services().get<DataTakingContext>();
42-
const auto& device = ctx.services().get<RawDeviceService>().device();
4359
const auto& deviceContext = ctx.services().get<DeviceContext>();
60+
auto& proxy = ctx.services().get<FairMQDeviceProxy>();
4461
bool timeout = deviceContext.exitTransitionTimeout;
4562
bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
4663
bool waitMessage = false;
@@ -55,17 +72,17 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
5572
maxInFlight, mSentTimeframes, mConsumedTimeframes);
5673
} else {
5774
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
58-
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
59-
maxInFlight, mSentTimeframes, mConsumedTimeframes);
75+
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
76+
maxInFlight, mSentTimeframes, mConsumedTimeframes);
6077
}
6178
waitMessage = true;
6279
timeoutForMessage = false;
6380
}
64-
auto msg = device->NewMessageFor("metric-feedback", 0, 0);
81+
auto msg = channelInfo->channel->NewMessage(0, fair::mq::Alignment{64});
6582
int64_t count = 0;
6683
do {
67-
count = device->Receive(msg, "metric-feedback", 0, recvTimeout);
68-
if (timeout && count <= 0 && device->NewStatePending()) {
84+
count = channelInfo->channel->Receive(msg, recvTimeout);
85+
if (timeout && count <= 0 && proxy.newStateRequested()) {
6986
return 1;
7087
}
7188
} while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
@@ -76,6 +93,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
7693
}
7794
assert(msg->GetSize() == 8);
7895
mConsumedTimeframes = *(int64_t*)msg->GetData();
96+
// We reset the read polled.
97+
channelInfo->readPolled = false;
7998
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
8099
"Received %llu as consumed timeframes",
81100
mConsumedTimeframes);
@@ -87,8 +106,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
87106
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
88107
} else {
89108
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
90-
"%lli / %d TF in flight, continue to publish",
91-
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
109+
"%lli / %d TF in flight, continue to publish",
110+
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
92111
}
93112
}
94113

@@ -133,6 +152,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
133152
float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).count();
134153
if (elapsed < mSmothDelay) {
135154
LOG(debug) << "TF Throttling: Elapsed " << elapsed << " --> Waiting for " << mSmothDelay - elapsed;
155+
auto& deviceState = ctx.services().get<DeviceState>();
136156
uv_run(deviceState.loop, UV_RUN_NOWAIT);
137157
std::this_thread::sleep_for(std::chrono::microseconds((size_t)((mSmothDelay - elapsed) * 1.e6f)));
138158
}
@@ -144,6 +164,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
144164
if (minSHM) {
145165
int waitMessage = 0;
146166
auto& runningWorkflow = ctx.services().get<RunningWorkflowInfo const>();
167+
auto* device = ctx.services().get<RawDeviceService>().device();
147168
while (true) {
148169
long freeMemory = -1;
149170
try {

0 commit comments

Comments
 (0)