Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,13 @@ void DataProcessingDevice::initPollers()
continue;
}

if (channelName.rfind("from_", 0) != 0) {
LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
if (channelName.rfind("from_", 0) != 0 && channelName != "metric-feedback") {
LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName);
continue;
}

if (channelName == "metric-feedback" && spec.name.rfind("internal-dpl-aod-reader", 0) != 0) {
LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName);
continue;
}

Expand Down
45 changes: 33 additions & 12 deletions Framework/Core/src/RateLimiter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#include "Framework/DeviceContext.h"
#include "Framework/Signpost.h"

#include <fairmq/Channel.h>
#include <fairmq/Device.h>
#include <uv.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Common.h>
#include <uv.h>
#include <chrono>
#include <thread>

Expand All @@ -32,15 +33,31 @@ using namespace o2::framework;
int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
{
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
// Feature is not enabled. Nothing to do.
if (!maxInFlight && !minSHM) {
return 0;
}
auto device = ctx.services().get<RawDeviceService>().device();
auto& deviceState = ctx.services().get<DeviceState>();
if (maxInFlight && device->GetChannels().count("metric-feedback")) {
// Let's check for the metric channel.
InputChannelInfo* channelInfo = nullptr;
for (auto& info : ctx.services().get<DeviceState>().inputChannelInfos) {
if (info.channel->GetName() == "metric-feedback") {
channelInfo = &info;
}
}
if (!channelInfo) {
return 0;
}
// No new data on channel. Nothing to do.
if (!channelInfo->readPolled) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
"Socket not polled. No need to check.");

return 0;
}
if (maxInFlight && channelInfo) {
auto& dtc = ctx.services().get<DataTakingContext>();
const auto& device = ctx.services().get<RawDeviceService>().device();
const auto& deviceContext = ctx.services().get<DeviceContext>();
auto& proxy = ctx.services().get<FairMQDeviceProxy>();
bool timeout = deviceContext.exitTransitionTimeout;
bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
bool waitMessage = false;
Expand All @@ -55,17 +72,17 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
maxInFlight, mSentTimeframes, mConsumedTimeframes);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
maxInFlight, mSentTimeframes, mConsumedTimeframes);
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
maxInFlight, mSentTimeframes, mConsumedTimeframes);
}
waitMessage = true;
timeoutForMessage = false;
}
auto msg = device->NewMessageFor("metric-feedback", 0, 0);
auto msg = channelInfo->channel->NewMessage(0, fair::mq::Alignment{64});
int64_t count = 0;
do {
count = device->Receive(msg, "metric-feedback", 0, recvTimeout);
if (timeout && count <= 0 && device->NewStatePending()) {
count = channelInfo->channel->Receive(msg, recvTimeout);
if (timeout && count <= 0 && proxy.newStateRequested()) {
return 1;
}
} while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
Expand All @@ -76,6 +93,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
}
assert(msg->GetSize() == 8);
mConsumedTimeframes = *(int64_t*)msg->GetData();
// We reset the read polled.
channelInfo->readPolled = false;
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
"Received %llu as consumed timeframes",
mConsumedTimeframes);
Expand All @@ -87,8 +106,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
"%lli / %d TF in flight, continue to publish",
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
"%lli / %d TF in flight, continue to publish",
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
}
}

Expand Down Expand Up @@ -133,6 +152,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).count();
if (elapsed < mSmothDelay) {
LOG(debug) << "TF Throttling: Elapsed " << elapsed << " --> Waiting for " << mSmothDelay - elapsed;
auto& deviceState = ctx.services().get<DeviceState>();
uv_run(deviceState.loop, UV_RUN_NOWAIT);
std::this_thread::sleep_for(std::chrono::microseconds((size_t)((mSmothDelay - elapsed) * 1.e6f)));
}
Expand All @@ -144,6 +164,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
if (minSHM) {
int waitMessage = 0;
auto& runningWorkflow = ctx.services().get<RunningWorkflowInfo const>();
auto* device = ctx.services().get<RawDeviceService>().device();
while (true) {
long freeMemory = -1;
try {
Expand Down