Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Utilities/Mergers/include/Mergers/FullHistoryMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class FullHistoryMerger : public framework::Task
void mergeCache();
void publish(framework::DataAllocator& allocator);
void clear();
bool shouldFinishCycle(const framework::InputRecord& inputs) const;
};

} // namespace o2::mergers
Expand Down
1 change: 1 addition & 0 deletions Utilities/Mergers/include/Mergers/IntegratingMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class IntegratingMerger : public framework::Task
void publishMovingWindow(framework::DataAllocator& allocator);
static void merge(ObjectStore& mMergedDelta, ObjectStore&& other);
void clear();
bool shouldFinishCycle(const framework::InputRecord&) const;

private:
header::DataHeader::SubSpecificationType mSubSpec;
Expand Down
8 changes: 5 additions & 3 deletions Utilities/Mergers/include/Mergers/MergerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum class MergedObjectTimespan {
// when InputObjectsTimespan::FullHistory is set.
LastDifference,
// Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite).
// The the above will be removed once we switch to NCycles in QC.
// The above will be removed once we switch to NCycles in QC.
NCycles
};

Expand All @@ -52,7 +52,8 @@ enum class PublishMovingWindow {
};

enum class PublicationDecision {
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
EachNArrivals, // Merged object is published whenever we receive N new input objects.
};

enum class TopologySize {
Expand All @@ -66,6 +67,7 @@ enum class ParallelismType {
RoundRobin // Mergers receive their input messages in round robin order. Useful when there is one InputSpec with a wildcard.
};

// fixme: this way of configuring mergers should be refactored, it does not make sense that we share `param`s across for different enum values.
template <typename V, typename P = double>
struct ConfigEntry {
V value;
Expand All @@ -82,7 +84,7 @@ class PublicationDecisionParameter
PublicationDecisionParameter(size_t param) : decision({{param, 1}}) {}
PublicationDecisionParameter(const std::vector<std::pair<size_t, size_t>>& decision) : decision(decision) {}

std::vector<std::pair<size_t, size_t>> decision;
std::vector<std::pair<size_t /* cycle duration seconds */, size_t /* validity seconds */>> decision;
};

// todo rework configuration in a way that user cannot create an invalid configuration
Expand Down
17 changes: 16 additions & 1 deletion Utilities/Mergers/src/FullHistoryMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
}
}

if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) {
if (shouldFinishCycle(ctx.inputs())) {
mCyclesSinceReset++;
mergeCache();
publish(ctx.outputs());
Expand All @@ -88,6 +88,21 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
}
}

bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
{
if (mFirstObjectSerialized.first.empty()) {
return false;
}

if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
return inputs.isValid("timer-publish");
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0;
} else {
throw std::runtime_error("unsupported publication decision parameter");
}
}

void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext)
{
mergeCache();
Expand Down
13 changes: 12 additions & 1 deletion Utilities/Mergers/src/IntegratingMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,22 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
}
}

if (ctx.inputs().isValid("timer-publish")) {
if (shouldFinishCycle(ctx.inputs())) {
finishCycle(ctx.outputs());
}
}

bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
{
if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
return inputs.isValid("timer-publish");
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0;
} else {
throw std::runtime_error("unsupported publication decision parameter");
}
}

void IntegratingMerger::finishCycle(DataAllocator& outputs)
{
mCyclesSinceReset++;
Expand Down
15 changes: 11 additions & 4 deletions Utilities/Mergers/src/MergerInfrastructureBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
auto layerInputs = mInputs;

// preparing some numbers
auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; });

// topology generation
MergerBuilder mergerBuilder;
Expand All @@ -150,7 +151,6 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
// we also expect moving windows to be published only by the last layer
layerConfig.publishMovingWindow = {PublishMovingWindow::No};
}
mergerBuilder.setConfig(layerConfig);

framework::Inputs nextLayerInputs;
auto inputsRangeBegin = layerInputs.begin();
Expand All @@ -162,20 +162,27 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()

auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder);
mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd));
inputsRangeBegin = inputsRangeEnd;

if (layer > 1 && !expendable) {
// we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
// we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
assert(inputNumber != 0);
layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
}
if (layer == mergersPerLayer.size() - 1) {
// the last layer => use the specified external OutputSpec
mergerBuilder.setOutputSpec(mOutputSpecIntegral);
}

mergerBuilder.setConfig(layerConfig);
auto merger = mergerBuilder.buildSpec();

auto input = DataSpecUtils::matchingInput(merger.outputs.at(0));
input.binding = "in";
nextLayerInputs.push_back(input);

workflow.emplace_back(std::move(merger));
inputsRangeBegin = inputsRangeEnd;
}
layerInputs = nextLayerInputs; // todo: could be optimised with pointers
}
Expand Down
6 changes: 3 additions & 3 deletions Utilities/Mergers/test/mergersBenchmarkTopology.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
for (size_t p = 0; p < objectsProducers; p++) {
mergersInputs.push_back({ "mo", "TST",
"HISTO", static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
Lifetime::Timeframe });
Lifetime::Sporadic });
DataProcessorSpec producer{
"producer-histo" + std::to_string(p), Inputs{},
Outputs{ { { "mo" },
"TST",
"HISTO",
static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
Lifetime::Timeframe } },
Lifetime::Sporadic } },
AlgorithmSpec{
(AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ](
ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now();
Expand Down Expand Up @@ -115,7 +115,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
DataProcessorSpec printer{
"printer-bins",
Inputs{
{ "histo", "TST", "HISTO", 0 }
{ "histo", "TST", "HISTO", 0, Lifetime::Sporadic }
},
Outputs{},
AlgorithmSpec{
Expand Down
Loading