Skip to content

Commit 2d7a0bd

Browse files
authored
QC-1287 Fix Mergers latency optimization with RoundRobin processing (#14328)
The optimization was correctly working when we split a range of InputSpecs between different Mergers in a layer, but not when they are time-pipelined. In the commit, we modify the expected number of input messages per cycle to include time-pipeline parameter in the previous and current layer.
1 parent 807a6b1 commit 2d7a0bd

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

Utilities/Mergers/src/MergerInfrastructureBuilder.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
136136
mergerBuilder.setName(mInfrastructureName);
137137
mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow);
138138

139+
size_t timePipelinePreviousLayer = 1;
139140
for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) {
140141

141142
size_t numberOfMergers = mergersPerLayer[layer];
@@ -166,7 +167,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
166167
if (layer > 1 && !expendable) {
167168
// we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
168169
// we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
169-
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
170+
171+
// The formula below takes into account both ways of splitting inputs - by consuming a subset of InputSpecs and by using time-pipelined data processors.
172+
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal;
170173
assert(inputNumber != 0);
171174
layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
172175
}
@@ -185,6 +188,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
185188
inputsRangeBegin = inputsRangeEnd;
186189
}
187190
layerInputs = nextLayerInputs; // todo: could be optimised with pointers
191+
timePipelinePreviousLayer = timePipelineVal;
188192
}
189193

190194
return workflow;

0 commit comments

Comments
 (0)