Skip to content

Commit 4138baf

Browse files
authored
DPL Analysis: add option to artificially slow down AOD reader (#13549)
1 parent f49a578 commit 4138baf

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
#include <arrow/table.h>
4242
#include <arrow/util/key_value_metadata.h>
4343

44-
#include <thread>
45-
4644
using namespace o2;
4745
using namespace o2::aod;
4846

@@ -140,6 +138,8 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
140138

141139
auto filename = options.get<std::string>("aod-file-private");
142140

141+
auto maxRate = options.get<float>("aod-max-io-rate");
142+
143143
std::string parentFileReplacement;
144144
if (options.isSet("aod-parent-base-path-replacement")) {
145145
parentFileReplacement = options.get<std::string>("aod-parent-base-path-replacement");
@@ -192,6 +192,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
192192
fileCounter,
193193
numTF,
194194
watchdog,
195+
maxRate,
195196
didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
196197
// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
197198
// the TF to read is numTF
@@ -222,6 +223,8 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
222223
return;
223224
}
224225

226+
int64_t startTime = uv_hrtime();
227+
int64_t startSize = totalSizeCompressed;
225228
for (auto& route : requestedTables) {
226229
if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
227230
continue;
@@ -278,6 +281,18 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
278281
}
279282
first = false;
280283
}
284+
int64_t stopSize = totalSizeCompressed;
285+
int64_t bytesDelta = stopSize - startSize;
286+
int64_t stopTime = uv_hrtime();
287+
float currentDelta = float(stopTime - startTime) / 1000000000; // in s
288+
if (ceil(maxRate) > 0.) {
289+
float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
290+
// We only sleep if we read faster than the max-read-rate.
291+
if (extraTime > 0.) {
292+
LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
293+
uv_sleep(extraTime * 1000); // in milliseconds
294+
}
295+
}
281296
totalDFSent++;
282297
monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
283298
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "AnalysisSupportHelpers.h"
1313
#include "Framework/AlgorithmSpec.h"
1414
#include "Framework/AODReaderHelpers.h"
15+
#include "Framework/ConfigParamSpec.h"
1516
#include "Framework/ConfigParamsHelper.h"
1617
#include "Framework/CommonDataProcessors.h"
1718
#include "Framework/ConfigContext.h"
@@ -27,14 +28,14 @@
2728
#include "Framework/DefaultsHelpers.h"
2829
#include "Framework/Signpost.h"
2930

31+
#include "Framework/Variant.h"
3032
#include "Headers/DataHeader.h"
3133
#include <algorithm>
3234
#include <list>
3335
#include <set>
3436
#include <utility>
3537
#include <vector>
3638
#include <climits>
37-
#include <thread>
3839

3940
O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
4041

@@ -213,6 +214,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
213214
aodLifetime}},
214215
.algorithm = AlgorithmSpec::dummyAlgorithm(),
215216
.options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
217+
ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
216218
ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
217219
ConfigParamSpec{"aod-parent-access-level", VariantType::String, {"Allow parent file access up to specified level. Default: no (0)"}},
218220
ConfigParamSpec{"aod-parent-base-path-replacement", VariantType::String, {R"(Replace base path of parent files. Syntax: FROM;TO. E.g. "alien:///path/in/alien;/local/path". Enclose in "" on the command line.)"}},

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,6 +2026,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
20262026
"--aod-writer-resmode",
20272027
"--aod-writer-maxfilesize",
20282028
"--aod-writer-keep",
2029+
"--aod-max-io-rate",
20292030
"--aod-parent-access-level",
20302031
"--aod-parent-base-path-replacement",
20312032
"--driver-client-backend",

0 commit comments

Comments
 (0)