Skip to content

Commit babc74c

Browse files
committed
QC-1253 Mergers: Shorter latency with multiple layers
If we run multiple layers of Mergers, the merged object arrival time can be described as: merger cycle duration * number of layers (it can be shorter due to randomized timer shifts at startup). As a consequence, adding each new layer adds the latency to the merger topology. Assuming that the deployed Mergers are not expendable, we can rely on expecting the right number of input messages to know that each Merger in the lower layer produced an update, so we can publish the merged object. As an effect, we get lower latency.
1 parent fb8e068 commit babc74c

File tree

7 files changed

+49
-12
lines changed

7 files changed

+49
-12
lines changed

Utilities/Mergers/include/Mergers/FullHistoryMerger.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class FullHistoryMerger : public framework::Task
7171
void mergeCache();
7272
void publish(framework::DataAllocator& allocator);
7373
void clear();
74+
bool shouldFinishCycle(const framework::InputRecord& inputs) const;
7475
};
7576

7677
} // namespace o2::mergers

Utilities/Mergers/include/Mergers/IntegratingMerger.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class IntegratingMerger : public framework::Task
6060
void publishMovingWindow(framework::DataAllocator& allocator);
6161
static void merge(ObjectStore& mMergedDelta, ObjectStore&& other);
6262
void clear();
63+
bool shouldFinishCycle(const framework::InputRecord&) const;
6364

6465
private:
6566
header::DataHeader::SubSpecificationType mSubSpec;

Utilities/Mergers/include/Mergers/MergerConfig.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ enum class MergedObjectTimespan {
4141
// when InputObjectsTimespan::FullHistory is set.
4242
LastDifference,
4343
// Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite).
44-
// The the above will be removed once we switch to NCycles in QC.
44+
// The above will be removed once we switch to NCycles in QC.
4545
NCycles
4646
};
4747

@@ -52,7 +52,8 @@ enum class PublishMovingWindow {
5252
};
5353

5454
enum class PublicationDecision {
55-
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
55+
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
56+
EachNArrivals, // Merged object is published whenever we receive N new input objects.
5657
};
5758

5859
enum class TopologySize {
@@ -66,6 +67,7 @@ enum class ParallelismType {
6667
RoundRobin // Mergers receive their input messages in round robin order. Useful when there is one InputSpec with a wildcard.
6768
};
6869

70+
// fixme: this way of configuring mergers should be refactored, it does not make sense that we share `param`s across for different enum values.
6971
template <typename V, typename P = double>
7072
struct ConfigEntry {
7173
V value;
@@ -82,7 +84,7 @@ class PublicationDecisionParameter
8284
PublicationDecisionParameter(size_t param) : decision({{param, 1}}) {}
8385
PublicationDecisionParameter(const std::vector<std::pair<size_t, size_t>>& decision) : decision(decision) {}
8486

85-
std::vector<std::pair<size_t, size_t>> decision;
87+
std::vector<std::pair<size_t /* cycle duration seconds */, size_t /* validity seconds */>> decision;
8688
};
8789

8890
// todo rework configuration in a way that user cannot create an invalid configuration

Utilities/Mergers/src/FullHistoryMerger.cxx

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
7676
}
7777
}
7878

79-
if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) {
79+
if (shouldFinishCycle(ctx.inputs())) {
8080
mCyclesSinceReset++;
8181
mergeCache();
8282
publish(ctx.outputs());
@@ -88,6 +88,21 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
8888
}
8989
}
9090

91+
bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
92+
{
93+
if (mFirstObjectSerialized.first.empty()) {
94+
return false;
95+
}
96+
97+
if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
98+
return inputs.isValid("timer-publish");
99+
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
100+
return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0;
101+
} else {
102+
throw std::runtime_error("unsupported publication decision parameter");
103+
}
104+
}
105+
91106
void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext)
92107
{
93108
mergeCache();

Utilities/Mergers/src/IntegratingMerger.cxx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,22 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
6868
}
6969
}
7070

71-
if (ctx.inputs().isValid("timer-publish")) {
71+
if (shouldFinishCycle(ctx.inputs())) {
7272
finishCycle(ctx.outputs());
7373
}
7474
}
7575

76+
bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
77+
{
78+
if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
79+
return inputs.isValid("timer-publish");
80+
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
81+
return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0;
82+
} else {
83+
throw std::runtime_error("unsupported publication decision parameter");
84+
}
85+
}
86+
7687
void IntegratingMerger::finishCycle(DataAllocator& outputs)
7788
{
7889
mCyclesSinceReset++;

Utilities/Mergers/src/MergerInfrastructureBuilder.cxx

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
128128
auto layerInputs = mInputs;
129129

130130
// preparing some numbers
131-
auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
131+
const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
132+
const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; });
132133

133134
// topology generation
134135
MergerBuilder mergerBuilder;
@@ -150,7 +151,6 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
150151
// we also expect moving windows to be published only by the last layer
151152
layerConfig.publishMovingWindow = {PublishMovingWindow::No};
152153
}
153-
mergerBuilder.setConfig(layerConfig);
154154

155155
framework::Inputs nextLayerInputs;
156156
auto inputsRangeBegin = layerInputs.begin();
@@ -162,20 +162,27 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
162162

163163
auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder);
164164
mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd));
165-
inputsRangeBegin = inputsRangeEnd;
166165

166+
if (layer > 1 && !expendable) {
167+
// we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
168+
// we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
169+
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
170+
assert(inputNumber != 0);
171+
layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
172+
}
167173
if (layer == mergersPerLayer.size() - 1) {
168174
// the last layer => use the specified external OutputSpec
169175
mergerBuilder.setOutputSpec(mOutputSpecIntegral);
170176
}
171-
177+
mergerBuilder.setConfig(layerConfig);
172178
auto merger = mergerBuilder.buildSpec();
173179

174180
auto input = DataSpecUtils::matchingInput(merger.outputs.at(0));
175181
input.binding = "in";
176182
nextLayerInputs.push_back(input);
177183

178184
workflow.emplace_back(std::move(merger));
185+
inputsRangeBegin = inputsRangeEnd;
179186
}
180187
layerInputs = nextLayerInputs; // todo: could be optimised with pointers
181188
}

Utilities/Mergers/test/mergersBenchmarkTopology.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
7171
for (size_t p = 0; p < objectsProducers; p++) {
7272
mergersInputs.push_back({ "mo", "TST",
7373
"HISTO", static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
74-
Lifetime::Timeframe });
74+
Lifetime::Sporadic });
7575
DataProcessorSpec producer{
7676
"producer-histo" + std::to_string(p), Inputs{},
7777
Outputs{ { { "mo" },
7878
"TST",
7979
"HISTO",
8080
static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
81-
Lifetime::Timeframe } },
81+
Lifetime::Sporadic } },
8282
AlgorithmSpec{
8383
(AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ](
8484
ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now();
@@ -115,7 +115,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
115115
DataProcessorSpec printer{
116116
"printer-bins",
117117
Inputs{
118-
{ "histo", "TST", "HISTO", 0 }
118+
{ "histo", "TST", "HISTO", 0, Lifetime::Sporadic }
119119
},
120120
Outputs{},
121121
AlgorithmSpec{

0 commit comments

Comments
 (0)