Skip to content

Commit 195b976

Browse files
committed
DPL: improve rate limiting
* Now uses consistently the oldest possible timeframe to as a rate limiting metric. * There is now a callback whenever we receive a DomainInfoHeader. It can be subscribed by the user and it is used e.g. to implement the rate limiting.
1 parent ce549f6 commit 195b976

File tree

12 files changed

+88
-55
lines changed

12 files changed

+88
-55
lines changed

Framework/Core/include/Framework/CallbackService.h

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ class CallbackService
7373
PreProcessing,
7474
/// Invoked after the processing callback,
7575
PostProcessing,
76-
CCDBDeserialised
76+
/// Invoked whenever an object from CCDB is deserialised via ROOT.
77+
/// Use this to finalise the initialisation of the object.
78+
CCDBDeserialised,
79+
/// Invoked when new domain info is available
80+
DomainInfoUpdated
7781
};
7882

7983
using StartCallback = std::function<void()>;
@@ -88,21 +92,23 @@ class CallbackService
8892
using PreProcessingCallback = std::function<void(ServiceRegistry&, int)>;
8993
using PostProcessingCallback = std::function<void(ServiceRegistry&, int)>;
9094
using CCDBDeserializedCallback = std::function<void(ConcreteDataMatcher&, void*)>;
95+
using DomainInfoUpdatedCallback = std::function<void(ServiceRegistry&, size_t timeslice)>;
9196

92-
using Callbacks = CallbackRegistry<Id, //
93-
RegistryPair<Id, Id::Start, StartCallback>, //
94-
RegistryPair<Id, Id::Stop, StopCallback>, //
95-
RegistryPair<Id, Id::Reset, ResetCallback>, //
96-
RegistryPair<Id, Id::Idle, IdleCallback>, //
97-
RegistryPair<Id, Id::ClockTick, ClockTickCallback>, //
98-
RegistryPair<Id, Id::DataConsumed, DataConsumedCallback>, //
99-
RegistryPair<Id, Id::EndOfStream, EndOfStreamCallback>, //
100-
RegistryPair<Id, Id::RegionInfoCallback, RegionInfoCallback>, //
101-
RegistryPair<Id, Id::NewTimeslice, NewTimesliceCallback>, //
102-
RegistryPair<Id, Id::PreProcessing, PreProcessingCallback>, //
103-
RegistryPair<Id, Id::PostProcessing, PostProcessingCallback>, //
104-
RegistryPair<Id, Id::CCDBDeserialised, CCDBDeserializedCallback> //
105-
>; //
97+
using Callbacks = CallbackRegistry<Id, //
98+
RegistryPair<Id, Id::Start, StartCallback>, //
99+
RegistryPair<Id, Id::Stop, StopCallback>, //
100+
RegistryPair<Id, Id::Reset, ResetCallback>, //
101+
RegistryPair<Id, Id::Idle, IdleCallback>, //
102+
RegistryPair<Id, Id::ClockTick, ClockTickCallback>, //
103+
RegistryPair<Id, Id::DataConsumed, DataConsumedCallback>, //
104+
RegistryPair<Id, Id::EndOfStream, EndOfStreamCallback>, //
105+
RegistryPair<Id, Id::RegionInfoCallback, RegionInfoCallback>, //
106+
RegistryPair<Id, Id::NewTimeslice, NewTimesliceCallback>, //
107+
RegistryPair<Id, Id::PreProcessing, PreProcessingCallback>, //
108+
RegistryPair<Id, Id::PostProcessing, PostProcessingCallback>, //
109+
RegistryPair<Id, Id::CCDBDeserialised, CCDBDeserializedCallback>, //
110+
RegistryPair<Id, Id::DomainInfoUpdated, DomainInfoUpdatedCallback> //
111+
>; //
106112

107113
// set callback for specified processing step
108114
template <typename U>

Framework/Core/include/Framework/ExpirationHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ struct TimesliceSlot;
2929
struct InputRecord;
3030

3131
struct ExpirationHandler {
32-
using Creator = std::function<TimesliceSlot(TimesliceIndex&)>;
32+
using Creator = std::function<TimesliceSlot(ChannelIndex, TimesliceIndex&)>;
3333
/// Callback type to check if the record must be expired
3434
using Checker = std::function<bool(ServiceRegistry&, uint64_t timestamp, InputSpan const& record)>;
3535
/// Callback type to actually materialise a given record

Framework/Core/include/Framework/InputRecord.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,11 @@ class InputRecord
682682
return {this, size()};
683683
}
684684

685+
InputSpan& span()
686+
{
687+
return mSpan;
688+
}
689+
685690
private:
686691
ServiceRegistry& mRegistry;
687692
std::vector<InputRoute> const& mInputsSchema;

Framework/Core/include/Framework/MessageContext.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
namespace o2::framework
4141
{
4242

43-
class Output;
43+
struct Output;
4444

4545
class MessageContext
4646
{

Framework/Core/include/Framework/ProcessingContext.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ class ProcessingContext
3030
{
3131
}
3232

33+
/// The inputs associated with this processing context.
3334
InputRecord& inputs() { return mInputs; }
35+
/// The services registry associated with this processing context.
3436
ServiceRegistry& services() { return mServices; }
37+
/// The data allocator is used to allocate memory for the output data.
3538
DataAllocator& outputs() { return mAllocator; }
3639

3740
InputRecord& mInputs;

Framework/Core/src/CallbacksPolicy.cxx

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@ CallbacksPolicy epnProcessReporting()
3333
.policy = [](CallbackService& callbacks, InitContext& context) -> void {
3434
callbacks.set(CallbackService::Id::PreProcessing, [](ServiceRegistry& registry, int op) {
3535
auto& info = registry.get<TimingInfo>();
36-
LOGP(info, "Processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",
37-
info.timeslice, info.tfCounter, info.firstTFOrbit, op);
36+
if ((int)info.firstTFOrbit != -1) {
37+
LOGP(info, "Processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",
38+
info.timeslice, info.tfCounter, info.firstTFOrbit, op);
39+
}
3840
});
3941
callbacks.set(CallbackService::Id::PostProcessing, [](ServiceRegistry& registry, int op) {
4042
auto& info = registry.get<TimingInfo>();
41-
LOGP(info, "Done processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",
42-
info.timeslice, info.tfCounter, info.firstTFOrbit, op);
43+
if ((int)info.firstTFOrbit != -1) {
44+
LOGP(info, "Done processing timeslice:{}, tfCounter:{}, firstTFOrbit:{}, action:{}",
45+
info.timeslice, info.tfCounter, info.firstTFOrbit, op);
46+
}
4347
});
4448
}};
4549
}

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "Framework/Logger.h"
2929
#include "Framework/OutputSpec.h"
3030
#include "Framework/RawDeviceService.h"
31+
#include "Framework/TimesliceIndex.h"
3132
#include "Framework/Variant.h"
3233
#include "../../../Algorithm/include/Algorithm/HeaderStack.h"
3334
#include "Framework/OutputObjHeader.h"
@@ -508,19 +509,19 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> cons
508509
.name = "internal-dpl-injected-dummy-sink",
509510
.inputs = danglingOutputInputs,
510511
.algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks) {
511-
auto dataConsumed = [](ServiceRegistry& services) {
512-
services.get<DataProcessingStats>().consumedTimeframes++;
512+
auto domainInfoUpdated = [](ServiceRegistry& services, size_t timeslice) {
513+
auto& timesliceIndex = services.get<TimesliceIndex>();
513514
auto device = services.get<RawDeviceService>().device();
514515
auto channel = device->fChannels.find("metric-feedback");
515516
if (channel != device->fChannels.end()) {
516517
FairMQMessagePtr payload(device->NewMessage());
517-
int64_t* consumed = (int64_t*)malloc(sizeof(int64_t));
518-
*consumed = services.get<DataProcessingStats>().consumedTimeframes;
518+
size_t* consumed = (size_t*)malloc(sizeof(size_t));
519+
*consumed = timesliceIndex.getOldestPossibleOutput().timeslice.value;
519520
payload->Rebuild(consumed, sizeof(int64_t), nullptr, nullptr);
520521
channel->second[0].Send(payload);
521522
}
522523
};
523-
callbacks.set(CallbackService::Id::DataConsumed, dataConsumed);
524+
callbacks.set(CallbackService::Id::DomainInfoUpdated, domainInfoUpdated);
524525

525526
return adaptStateless([]() {
526527
});

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1326,7 +1326,14 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
13261326
return;
13271327
}
13281328
if (oldestPossibleTimeslice != (size_t)-1) {
1329-
context.relayer->setOldestPossibleInput({oldestPossibleTimeslice}, info.id);
1329+
TimesliceIndex& timesliceIndex = context.registry->get<TimesliceIndex>();
1330+
auto r = timesliceIndex.setOldestPossibleInput({oldestPossibleTimeslice}, info.id);
1331+
timesliceIndex.updateOldestPossibleOutput();
1332+
auto& proxy = context.registry->get<FairMQDeviceProxy>();
1333+
auto oldestPossibleOutput = context.relayer->getOldestPossibleOutput();
1334+
LOGP(detail, "Broadcasting possible output {}", oldestPossibleOutput.timeslice.value);
1335+
context.registry->get<CallbackService>()(CallbackService::Id::DomainInfoUpdated, *(context.registry), (size_t)oldestPossibleOutput.timeslice.value);
1336+
DataProcessingHelpers::broadcastOldestPossibleTimeslice(proxy, oldestPossibleOutput.timeslice.value);
13301337
}
13311338
handleValidMessages(*inputTypes);
13321339
return;

Framework/Core/src/DataRelayer.cxx

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "Framework/Signpost.h"
2626
#include "Framework/RoutingIndices.h"
2727
#include "Framework/VariableContextHelpers.h"
28+
#include "Framework/FairMQDeviceProxy.h"
2829
#include "DataProcessingStatus.h"
2930
#include "DataRelayerHelpers.h"
3031
#include "InputRouteHelpers.h"
@@ -59,8 +60,8 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy,
5960
std::vector<InputRoute> const& routes,
6061
monitoring::Monitoring& metrics,
6162
TimesliceIndex& index)
62-
: mTimesliceIndex{index},
63-
mMetrics{metrics},
63+
: mMetrics{metrics},
64+
mTimesliceIndex{index},
6465
mCompletionPolicy{policy},
6566
mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
6667
mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
@@ -102,6 +103,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
102103
{
103104
LOGP(debug, "DataRelayer::processDanglingInputs");
104105
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
106+
auto& deviceProxy = services.get<FairMQDeviceProxy>();
105107

106108
ActivityStats activity;
107109
/// Nothing to do if nothing can expire.
@@ -115,7 +117,8 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
115117
LOGP(debug, "Creating new slot");
116118
for (auto& handler : expirationHandlers) {
117119
LOGP(debug, "handler.creator for {}", handler.name);
118-
slotsCreatedByHandlers.push_back(handler.creator(mTimesliceIndex));
120+
auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
121+
slotsCreatedByHandlers.push_back(handler.creator(channelIndex, mTimesliceIndex));
119122
}
120123
}
121124
if (slotsCreatedByHandlers.empty() == false) {

Framework/Core/src/DataSender.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ void DataSender::send(FairMQParts& parts, ChannelIndex channelIndex)
8686
/// FIXME: throttling this information?
8787
/// FIXME: do it only if it changes?
8888
TimesliceIndex& index = mRegistry.get<TimesliceIndex>();
89+
index.updateOldestPossibleOutput();
8990

9091
auto oldest = index.getOldestPossibleOutput();
9192
if (oldest.timeslice.value == -1) {

0 commit comments

Comments
 (0)