Skip to content

Commit 1bcf367

Browse files
authored
DPL: fix rate limiting handling (#14255)
On success, FairMQ returns a positive number of bytes, not 0.
1 parent a917b6b commit 1bcf367

File tree

1 file changed

+25
-1
lines changed

1 file changed

+25
-1
lines changed

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,21 @@
3030
#include "Framework/RuntimeError.h"
3131
#include "Framework/RateLimiter.h"
3232
#include "Framework/PluginManager.h"
33+
#include "Framework/Signpost.h"
3334
#include <Monitoring/Monitoring.h>
3435

3536
#include <fairmq/Device.h>
37+
#include <uv.h>
3638
#include <fstream>
3739
#include <functional>
3840
#include <memory>
3941
#include <string>
4042

4143
using namespace o2::framework::data_matcher;
4244

45+
// Special log to track callbacks we know about
46+
O2_DECLARE_DYNAMIC_LOG(callbacks);
47+
4348
namespace o2::framework
4449
{
4550

@@ -145,27 +150,46 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpe
145150

146151
void retryMetricCallback(uv_async_t* async)
147152
{
153+
O2_SIGNPOST_ID_FROM_POINTER(cid, callbacks, async);
154+
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Attempting again propagating rate-limiting information.");
155+
156+
// Check if this is a source device
148157
static size_t lastTimeslice = -1;
149158
auto* services = (ServiceRegistryRef*)async->data;
150159
auto& timesliceIndex = services->get<TimesliceIndex>();
151160
auto* device = services->get<RawDeviceService>().device();
152161
auto channel = device->GetChannels().find("metric-feedback");
153162
auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
154163
if (channel == device->GetChannels().end()) {
164+
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel.");
155165
return;
156166
}
157167
fair::mq::MessagePtr payload(device->NewMessage());
158168
payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr);
159169
auto consumed = oldestPossingTimeslice;
160170

171+
size_t start = uv_hrtime();
161172
int64_t result = channel->second[0].Send(payload, 100);
173+
size_t stop = uv_hrtime();
162174
// If the sending worked, we do not retry.
163-
if (result != 0) {
175+
if (result <= 0) {
176+
// Forcefully slow down in case FairMQ returns earlier than expected...
177+
int64_t ellapsed = (stop - start) / 1000000;
178+
if (ellapsed < 100) {
179+
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
180+
"FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
181+
result, ellapsed);
182+
uv_sleep(100 - ellapsed);
183+
} else {
184+
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
185+
"FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed);
186+
}
164187
// If the sending did not work, we keep trying until it actually works.
165188
// This will schedule other tasks in the queue, so the processing of the
166189
// data will still happen.
167190
uv_async_send(async);
168191
} else {
192+
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed);
169193
lastTimeslice = consumed;
170194
}
171195
}

0 commit comments

Comments
 (0)