Skip to content

Commit 5750365

Browse files
committed
DPL: Enforce that dpl pipeline length is at least as long as number of TFs in flight
1 parent bf8a402 commit 5750365

File tree

6 files changed

+42
-14
lines changed

6 files changed

+42
-14
lines changed

Framework/Core/include/Framework/DefaultsHelpers.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,24 @@
1212
#ifndef O2_FRAMEWORK_DEFAULTHELPERS_H_
1313
#define O2_FRAMEWORK_DEFAULTHELPERS_H_
1414

15+
namespace fair::mq
16+
{
17+
class ProgOptions;
18+
}
19+
1520
namespace o2::framework
1621
{
1722
enum struct DeploymentMode;
23+
struct DeviceConfig;
1824

1925
struct DefaultsHelpers {
2026
static DeploymentMode deploymentMode();
2127
/// @true if running online
2228
static bool onlineDeploymentMode();
2329
/// get max number of timeslices in the queue
24-
static unsigned int pipelineLength();
30+
static unsigned int pipelineLength(unsigned int minLength);
31+
static unsigned int pipelineLength(const fair::mq::ProgOptions& options);
32+
static unsigned int pipelineLength(const DeviceConfig& dc);
2533
};
2634
} // namespace o2::framework
2735

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
564564
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
565565
config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
566566
} else {
567-
config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
567+
config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(dc);
568568
}
569569
static bool once = false;
570570
// Until we guarantee this is called only once...

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
14831483
auto& infos = state.inputChannelInfos;
14841484

14851485
if (context.balancingInputs) {
1486-
static int pipelineLength = DefaultsHelpers::pipelineLength();
1486+
static int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
14871487
static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
14881488
auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
14891489
return infos[a].oldestForChannel.value > limitNew;
@@ -2259,12 +2259,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22592259
return false;
22602260
}
22612261

2262-
auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2262+
int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
2263+
2264+
auto postUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
22632265
auto& stats = ref.get<DataProcessingStats>();
22642266
auto& states = ref.get<DataProcessingStates>();
22652267
std::atomic_thread_fence(std::memory_order_release);
22662268
char relayerSlotState[1024];
2267-
int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2269+
int written = snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
22682270
char* buffer = relayerSlotState + written;
22692271
for (size_t ai = 0; ai != record.size(); ai++) {
22702272
buffer[ai] = record.isValid(ai) ? '3' : '0';
@@ -2291,11 +2293,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22912293
count++;
22922294
};
22932295

2294-
auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2296+
auto preUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
22952297
auto& states = ref.get<DataProcessingStates>();
22962298
std::atomic_thread_fence(std::memory_order_release);
22972299
char relayerSlotState[1024];
2298-
snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2300+
snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
22992301
char* buffer = strchr(relayerSlotState, ' ') + 1;
23002302
for (size_t ai = 0; ai != record.size(); ai++) {
23012303
buffer[ai] = record.isValid(ai) ? '2' : '0';

Framework/Core/src/DataRelayer.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "Framework/DataProcessingStates.h"
3838
#include "Framework/DataTakingContext.h"
3939
#include "Framework/DefaultsHelpers.h"
40+
#include "Framework/RawDeviceService.h"
4041

4142
#include "Headers/DataHeaderHelpers.h"
4243
#include "Framework/Formatters.h"
@@ -48,6 +49,7 @@
4849
#include <fairmq/Channel.h>
4950
#include <functional>
5051
#include <fairmq/shmem/Message.h>
52+
#include <fairmq/Device.h>
5153
#include <fmt/format.h>
5254
#include <fmt/ostream.h>
5355
#include <span>
@@ -81,7 +83,7 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy,
8183
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
8284

8385
if (policy.configureRelayer == nullptr) {
84-
static int pipelineLength = DefaultsHelpers::pipelineLength();
86+
static int pipelineLength = DefaultsHelpers::pipelineLength(*services.get<RawDeviceService>().device()->fConfig);
8587
setPipelineLength(pipelineLength);
8688
} else {
8789
policy.configureRelayer(*this);

Framework/Core/src/DefaultsHelpers.cxx

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,45 @@
1111

1212
#include "Framework/DefaultsHelpers.h"
1313
#include "Framework/DataTakingContext.h"
14+
#include "Framework/DeviceConfig.h"
15+
#include <fairmq/ProgOptions.h>
16+
1417
#include <cstdlib>
1518
#include <cstring>
1619
#include <stdexcept>
1720

1821
namespace o2::framework
1922
{
2023

21-
unsigned int DefaultsHelpers::pipelineLength()
24+
unsigned int DefaultsHelpers::pipelineLength(unsigned int minLength)
2225
{
2326
static bool override = getenv("DPL_DEFAULT_PIPELINE_LENGTH");
2427
if (override) {
2528
static unsigned int retval = atoi(getenv("DPL_DEFAULT_PIPELINE_LENGTH"));
26-
return retval;
29+
return std::max(minLength, retval);
2730
}
2831
DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
2932
// just some reasonable numers
3033
// The number should really be tuned at runtime for each processor.
3134
if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS || deploymentMode == DeploymentMode::FST) {
32-
return 512;
35+
return std::max(minLength, 512u);
3336
} else {
34-
return 64;
37+
return std::max(minLength, 64u);
3538
}
3639
}
3740

41+
unsigned int DefaultsHelpers::pipelineLength(const DeviceConfig& dc)
42+
{
43+
static unsigned int minLength = dc.options.count("timeframes-rate-limit") ? std::max(0, atoi(dc.options["timeframes-rate-limit"].as<std::string>().c_str())) : 0;
44+
return pipelineLength(minLength);
45+
}
46+
47+
unsigned int DefaultsHelpers::pipelineLength(const fair::mq::ProgOptions& options)
48+
{
49+
static unsigned int minLength = std::max(0, atoi(options.GetValue<std::string>("timeframes-rate-limit").c_str()));
50+
return pipelineLength(minLength);
51+
}
52+
3853
static DeploymentMode getDeploymentMode_internal()
3954
{
4055
char* explicitMode = getenv("O2_DPL_DEPLOYMENT_MODE");

Framework/Core/src/runDataProcessing.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,8 @@ void spawnDevice(uv_loop_t* loop,
817817
.sendInitialValue = true,
818818
});
819819

820-
for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
820+
unsigned int pipelineLength = DefaultsHelpers::pipelineLength(DeviceConfig{varmap});
821+
for (size_t i = 0; i < pipelineLength; ++i) {
821822
allStates.back().registerState(DataProcessingStates::StateSpec{
822823
.name = fmt::format("matcher_variables/{}", i),
823824
.stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
@@ -826,7 +827,7 @@ void spawnDevice(uv_loop_t* loop,
826827
});
827828
}
828829

829-
for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
830+
for (size_t i = 0; i < pipelineLength; ++i) {
830831
allStates.back().registerState(DataProcessingStates::StateSpec{
831832
.name = fmt::format("data_relayer/{}", i),
832833
.stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + i),

0 commit comments

Comments
 (0)