|
30 | 30 | #include "Framework/RuntimeError.h" |
31 | 31 | #include "Framework/RateLimiter.h" |
32 | 32 | #include "Framework/PluginManager.h" |
33 | | -#include "Framework/Signpost.h" |
34 | 33 | #include <Monitoring/Monitoring.h> |
35 | 34 |
|
36 | 35 | #include <fairmq/Device.h> |
37 | | -#include <uv.h> |
38 | 36 | #include <fstream> |
39 | 37 | #include <functional> |
40 | 38 | #include <memory> |
41 | 39 | #include <string> |
42 | 40 |
|
43 | 41 | using namespace o2::framework::data_matcher; |
44 | 42 |
|
45 | | -// Special log to track callbacks we know about |
46 | | -O2_DECLARE_DYNAMIC_LOG(callbacks); |
47 | | - |
48 | 43 | namespace o2::framework |
49 | 44 | { |
50 | 45 |
|
@@ -150,46 +145,27 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpe |
150 | 145 |
|
151 | 146 | void retryMetricCallback(uv_async_t* async) |
152 | 147 | { |
153 | | - O2_SIGNPOST_ID_FROM_POINTER(cid, callbacks, async); |
154 | | - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Attempting again propagating rate-limiting information."); |
155 | | - |
156 | | - // Check if this is a source device |
157 | 148 | static size_t lastTimeslice = -1; |
158 | 149 | auto* services = (ServiceRegistryRef*)async->data; |
159 | 150 | auto& timesliceIndex = services->get<TimesliceIndex>(); |
160 | 151 | auto* device = services->get<RawDeviceService>().device(); |
161 | 152 | auto channel = device->GetChannels().find("metric-feedback"); |
162 | 153 | auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; |
163 | 154 | if (channel == device->GetChannels().end()) { |
164 | | - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel."); |
165 | 155 | return; |
166 | 156 | } |
167 | 157 | fair::mq::MessagePtr payload(device->NewMessage()); |
168 | 158 | payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr); |
169 | 159 | auto consumed = oldestPossingTimeslice; |
170 | 160 |
|
171 | | - size_t start = uv_hrtime(); |
172 | 161 | int64_t result = channel->second[0].Send(payload, 100); |
173 | | - size_t stop = uv_hrtime(); |
174 | 162 | // If the sending worked, we do not retry. |
175 | | - if (result <= 0) { |
176 | | - // Forcefully slow down in case FairMQ returns earlier than expected... |
177 | | - int64_t ellapsed = (stop - start) / 1000000; |
178 | | - if (ellapsed < 100) { |
179 | | - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", |
180 | | - "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.", |
181 | | - result, ellapsed); |
182 | | - uv_sleep(100 - ellapsed); |
183 | | - } else { |
184 | | - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", |
185 | | - "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed); |
186 | | - } |
| 163 | + if (result != 0) { |
187 | 164 | // If the sending did not work, we keep trying until it actually works. |
188 | 165 | // This will schedule other tasks in the queue, so the processing of the |
189 | 166 | // data will still happen. |
190 | 167 | uv_async_send(async); |
191 | 168 | } else { |
192 | | - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed); |
193 | 169 | lastTimeslice = consumed; |
194 | 170 | } |
195 | 171 | } |
|
0 commit comments