Skip to content

Commit 1bac664

Browse files
committed
DPL: allow to disable oldest possible timeframe propagation with a label
This allows to disable all DomainInfoHeader propagation with a corresponding DataProcessorLabel. It addresses the issue reported in QC-1320, where remote QC workflows were getting flooded with a DIH for each QC task instance in the setup.
1 parent 1bcfeed commit 1bac664

File tree

6 files changed

+62
-0
lines changed

6 files changed

+62
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ o2_add_library(Framework
108108
src/SimpleOptionsRetriever.cxx
109109
src/O2ControlHelpers.cxx
110110
src/O2ControlLabels.cxx
111+
src/CommonLabels.cxx
111112
src/O2ControlParameters.cxx
112113
src/O2DataModelHelpers.cxx
113114
src/OutputSpec.cxx
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_COMMONLABELS_H
13+
#define O2_FRAMEWORK_COMMONLABELS_H
14+
15+
#include "Framework/DataProcessorLabel.h"
16+
17+
namespace o2::framework
18+
{
19+
20+
// Label to disable forwarding/advertising of DomainInfoHeader (oldest possible outputs)
21+
// When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream.
22+
const extern DataProcessorLabel suppressDomainInfoLabel;
23+
24+
} // namespace o2::framework
25+
26+
#endif // O2_FRAMEWORK_COMMONLABELS_H
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/CommonLabels.h"
13+
14+
namespace o2::framework
15+
{
16+
17+
const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"};
18+
19+
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "Framework/DefaultsHelpers.h"
4646
#include "Framework/Signpost.h"
4747
#include "Framework/DriverConfig.h"
48+
#include "Framework/CommonLabels.h"
4849

4950
#include "TextDriverClient.h"
5051
#include "WSDriverClient.h"
@@ -604,6 +605,12 @@ o2::framework::ServiceSpec
604605
break;
605606
}
606607
}
608+
for (const auto& label : services.get<DeviceSpec const>().labels) {
609+
if (label == suppressDomainInfoLabel) {
610+
decongestion->suppressDomainInfo = true;
611+
break;
612+
}
613+
}
607614
auto& queue = services.get<AsyncQueue>();
608615
decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
609616
return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "Framework/DeviceStateEnums.h"
3535
#include "Headers/DataHeader.h"
3636
#include "Framework/DataProcessingHeader.h"
37+
#include "DecongestionService.h"
3738

3839
#include <fairmq/Device.h>
3940
#include <fairmq/Channel.h>
@@ -83,6 +84,9 @@ void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFa
8384

8485
bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, ForwardChannelInfo const& info, ForwardChannelState& state, size_t timeslice)
8586
{
87+
if (ref.get<DecongestionService>().suppressDomainInfo) {
88+
return false;
89+
}
8690
if (state.oldestForChannel.value >= timeslice) {
8791
return false;
8892
}
@@ -93,6 +97,9 @@ bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const
9397

9498
bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice)
9599
{
100+
if (ref.get<DecongestionService>().suppressDomainInfo) {
101+
return false;
102+
}
96103
if (state.oldestForChannel.value >= timeslice) {
97104
return false;
98105
}

Framework/Core/src/DecongestionService.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ namespace o2::framework
1818
struct DecongestionService {
1919
/// Wether we are a source in the processing chain
2020
bool isFirstInTopology = true;
21+
/// do not advertise/forward DomainInfoHeader from this device
22+
bool suppressDomainInfo = false;
2123
/// The last timeslice which the ExpirationHandler::Creator callback
2224
/// created. This can be used to skip dummy iterations.
2325
size_t nextEnumerationTimeslice = 0;

0 commit comments

Comments
 (0)