Skip to content

Commit 7044ca3

Browse files
committed
DPL analysis: support timeslice rate limiting in DPL resource manager
Use DPL resource manager rather than the ad-hoc solution for reconstruction.
1 parent 5f66e50 commit 7044ca3

File tree

9 files changed

+167
-25
lines changed

9 files changed

+167
-25
lines changed

Framework/Core/include/Framework/CommonDataProcessors.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ struct CommonDataProcessors {
3737
/// and simply discards them. @a rateLimitingChannelConfig is the configuration
3838
/// for the rate limiting channel, if any required.
3939
static DataProcessorSpec getDummySink(std::vector<InputSpec> const& danglingInputs, std::string rateLimitingChannelConfig);
40+
/// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec
41+
/// and simply discards them. Rate limiting goes through the DPL driver
42+
static DataProcessorSpec getScheduledDummySink(std::vector<InputSpec> const& danglingInputs);
4043
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec);
4144
};
4245

Framework/Core/include/Framework/ComputingQuotaOffer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct ComputingQuotaOffer {
4444
int64_t memory = 0;
4545
/// How much shared memory it can allocate
4646
int64_t sharedMemory = 0;
47+
/// How many timeslices it can process without giving back control
48+
int64_t timeslices = 0;
4749
/// How much runtime it can use before giving back the resource
4850
/// in milliseconds.
4951
int64_t runtime = 0;

Framework/Core/include/Framework/ResourcePolicyHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace o2::framework
2222
struct ResourcePolicyHelpers {
2323
static ResourcePolicy trivialTask(char const* taskMatcher);
2424
static ResourcePolicy cpuBoundTask(char const* taskMatcher, int maxCPUs = 1);
25+
static ResourcePolicy rateLimitedSharedMemoryBoundTask(char const* taskMatcher, int maxMemory, int maxTimeslices);
2526
static ResourcePolicy sharedMemoryBoundTask(char const* taskMatcher, int maxMemory);
2627
};
2728

Framework/Core/src/ArrowSupport.cxx

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,24 @@ struct MetricIndices {
7777
size_t shmOfferBytesConsumed = -1;
7878
size_t timeframesRead = -1;
7979
size_t timeframesConsumed = -1;
80+
size_t timeframesExpired = -1;
8081
};
8182

8283
std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
8384
{
8485
std::vector<MetricIndices> results;
8586

8687
for (auto& info : allDevicesMetrics) {
87-
MetricIndices indices;
88-
indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
89-
indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
90-
indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
91-
indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
92-
indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
93-
indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
94-
indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
95-
indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
96-
results.push_back(indices);
88+
results.emplace_back(MetricIndices{
89+
.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
90+
.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
91+
.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
92+
.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
93+
.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
94+
.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
95+
.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
96+
.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
97+
.timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
9798
}
9899
return results;
99100
}
@@ -258,6 +259,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
258259
int64_t totalMessagesDestroyed = 0;
259260
int64_t totalTimeframesRead = 0;
260261
int64_t totalTimeframesConsumed = 0;
262+
int64_t totalTimeframesExpired = 0;
261263
auto &driverMetrics = sm.driverMetricsInfo;
262264
auto &allDeviceMetrics = sm.deviceMetricsInfos;
263265
auto &specs = sm.deviceSpecs;
@@ -266,9 +268,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
266268
static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
267269
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
268270
static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
271+
// These are really to monitor the rate limiting
269272
static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
273+
static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
270274
static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
275+
static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
271276
static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
277+
static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
278+
272279
static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
273280
static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
274281
static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
@@ -390,6 +397,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
390397
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
391398
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
392399
}
400+
{
401+
size_t index = indices.timeframesExpired;
402+
assert(index < deviceMetrics.metrics.size());
403+
changed |= deviceMetrics.changed[index];
404+
MetricInfo info = deviceMetrics.metrics[index];
405+
assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
406+
auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
407+
auto value = (int64_t)data[(info.pos - 1) % data.size()];
408+
totalTimeframesExpired += value;
409+
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
410+
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
411+
}
393412
}
394413
static uint64_t unchangedCount = 0;
395414
if (changed) {
@@ -407,26 +426,45 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
407426
unchangedCount++;
408427
}
409428
changedCountMetric(driverMetrics, unchangedCount, timestamp);
410-
auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
411-
if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
412-
return;
413-
}
429+
414430
static const ResourceSpec shmResourceSpec{
415431
.name = "shared memory",
416432
.unit = "MB",
417433
.api = "/shm-offer {}",
418-
.maxAvailable = (int64_t)calculateAvailableSharedMemory(registry),
434+
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
419435
.maxQuantum = 100,
420436
.minQuantum = 50,
421437
.metricOfferScaleFactor = 1000000,
422438
};
439+
static const ResourceSpec timesliceResourceSpec{
440+
.name = "timeslice",
441+
.unit = "timeslices",
442+
.api = "/timeslice-offer {}",
443+
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
444+
.maxQuantum = 2,
445+
.minQuantum = 1,
446+
.metricOfferScaleFactor = 1,
447+
};
423448
static ResourceState shmResourceState{
424449
.available = shmResourceSpec.maxAvailable,
425450
};
451+
static ResourceState timesliceResourceState{
452+
.available = timesliceResourceSpec.maxAvailable,
453+
};
426454
static ResourceStats shmResourceStats{
427455
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
428456
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
429457
};
458+
static ResourceStats timesliceResourceStats{
459+
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
460+
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
461+
};
462+
463+
offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
464+
specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
465+
totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
466+
availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
467+
(void*)&sm);
430468

431469
offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
432470
specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
@@ -497,8 +535,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
497535
if (!once) {
498536
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
499537
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
500-
"Rate limiting set up at %{bytes}llu MB distributed over %d readers",
501-
config->maxMemory, readers);
538+
"Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
539+
config->maxMemory, config->maxTimeframes, readers);
502540
registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
503541
once = true;
504542
} },

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ using namespace o2::framework::data_matcher;
4444

4545
// Special log to track callbacks we know about
4646
O2_DECLARE_DYNAMIC_LOG(callbacks);
47+
O2_DECLARE_DYNAMIC_LOG(rate_limiting);
4748

4849
namespace o2::framework
4950
{
@@ -224,6 +225,35 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> cons
224225
.labels = {{"resilient"}}};
225226
}
226227

228+
// For the cases were the driver is guaranteed to be there (e.g. in analysis) we can use a
229+
// more sophisticated controller which can get offers for timeslices so that we can rate limit
230+
// across multiple input devices and rate limit shared memory usage without race conditions
231+
DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector<InputSpec> const& danglingOutputInputs)
232+
{
233+
return DataProcessorSpec{
234+
.name = "internal-dpl-injected-dummy-sink",
235+
.inputs = danglingOutputInputs,
236+
.algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) {
237+
// We update the number of consumed timeframes based on the oldestPossingTimeslice
238+
// this information will be aggregated in the driver which will then decide wether or not a new offer for
239+
// a timeslice should be done and to which device
240+
auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) {
241+
LOGP(debug, "Domain info updated with timeslice {}", timeslice);
242+
auto& timesliceIndex = services.get<TimesliceIndex>();
243+
auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
244+
auto& stats = services.get<DataProcessingStats>();
245+
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
246+
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes to be set to %zu.", oldestPossingTimeslice);
247+
stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
248+
};
249+
callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
250+
251+
return adaptStateless([]() {
252+
});
253+
})},
254+
.labels = {{"resilient"}}};
255+
}
256+
227257
AlgorithmSpec CommonDataProcessors::wrapWithRateLimiting(AlgorithmSpec spec)
228258
{
229259
return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {

Framework/Core/src/ComputingQuotaEvaluator.cxx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ ComputingQuotaEvaluator::ComputingQuotaEvaluator(ServiceRegistryRef ref)
4040
0,
4141
0,
4242
0,
43+
0,
4344
-1,
4445
-1,
4546
OfferScore::Unneeded,
@@ -97,7 +98,7 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
9798
result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
9899
for (auto& offer : result) {
99100
// We pretend each offer id is a pointer, to have a unique id.
100-
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer*8));
101+
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer * 8));
101102
O2_SIGNPOST_START(quota, oid, "offers", "Offer %d has been selected.", offer);
102103
}
103104
dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1});
@@ -168,6 +169,7 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
168169
tmp.cpu += offer.cpu;
169170
tmp.memory += offer.memory;
170171
tmp.sharedMemory += offer.sharedMemory;
172+
tmp.timeslices += offer.timeslices;
171173
offer.score = selector(offer, tmp);
172174
switch (offer.score) {
173175
case OfferScore::Unneeded:
@@ -224,7 +226,7 @@ void ComputingQuotaEvaluator::dispose(int taskId)
224226
continue;
225227
}
226228
if (offer.sharedMemory <= 0) {
227-
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi*8));
229+
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi * 8));
228230
O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi);
229231
offer.valid = false;
230232
offer.score = OfferScore::Unneeded;
@@ -269,7 +271,7 @@ void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOff
269271
/// to the driver.
270272
for (auto& ref : mExpiredOffers) {
271273
auto& offer = mOffers[ref.index];
272-
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(ref.index*8));
274+
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(ref.index * 8));
273275
if (offer.sharedMemory < 0) {
274276
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d does not have any more memory. Marking it as invalid.", ref.index);
275277
offer.valid = false;

Framework/Core/src/ResourcePolicyHelpers.cxx

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111

1212
#include "Framework/ResourcePolicyHelpers.h"
1313
#include "Framework/DeviceSpec.h"
14-
#include "ResourcesMonitoringHelper.h"
1514

16-
#include <string>
1715
#include <regex>
1816

1917
namespace o2::framework
@@ -41,6 +39,36 @@ ResourcePolicy ResourcePolicyHelpers::cpuBoundTask(char const* s, int requestedC
4139
[requestedCPUs](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { return accumulated.cpu >= requestedCPUs ? OfferScore::Enough : OfferScore::More; }};
4240
}
4341

42+
ResourcePolicy ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask(char const* s, int requestedSharedMemory, int requestedTimeslices)
43+
{
44+
return ResourcePolicy{
45+
"ratelimited-shm-bound",
46+
[matcher = std::regex(s)](DeviceSpec const& spec) -> bool {
47+
return std::regex_match(spec.name, matcher);
48+
},
49+
[requestedSharedMemory, requestedTimeslices](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore {
50+
// If we have enough memory and not enough timeslices,
51+
// ignore further shared memory.
52+
if (accumulated.sharedMemory >= requestedSharedMemory && offer.timeslices == 0) {
53+
return OfferScore::Unneeded;
54+
}
55+
// If we have enough timeslices and not enough shared memory
56+
// ignore further timeslices.
57+
if (accumulated.timeslices >= requestedTimeslices && offer.sharedMemory == 0) {
58+
return OfferScore::Unneeded;
59+
}
60+
// If it does not offer neither shared memory nor timeslices, mark it as unneeded.
61+
if (offer.sharedMemory == 0 && offer.timeslices == 0) {
62+
return OfferScore::Unneeded;
63+
}
64+
// We have enough to process.
65+
if (accumulated.sharedMemory >= requestedSharedMemory && accumulated.timeslices >= requestedTimeslices) {
66+
return OfferScore::Enough;
67+
}
68+
// We need more resources
69+
return OfferScore::More; }};
70+
}
71+
4472
ResourcePolicy ResourcePolicyHelpers::sharedMemoryBoundTask(char const* s, int requestedSharedMemory)
4573
{
4674
return ResourcePolicy{

0 commit comments

Comments
 (0)