Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Utilities/Mergers/src/MergerInfrastructureBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
mergerBuilder.setName(mInfrastructureName);
mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow);

size_t timePipelinePreviousLayer = 1;
for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) {

size_t numberOfMergers = mergersPerLayer[layer];
Expand Down Expand Up @@ -166,7 +167,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
if (layer > 1 && !expendable) {
// we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
// we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);

// The formula below takes into account both ways of splitting inputs - by consuming a subset of InputSpecs and by using time-pipelined data processors.
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal;
assert(inputNumber != 0);
layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
}
Expand All @@ -185,6 +188,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
inputsRangeBegin = inputsRangeEnd;
}
layerInputs = nextLayerInputs; // todo: could be optimised with pointers
timePipelinePreviousLayer = timePipelineVal;
}

return workflow;
Expand Down