Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ o2_add_library(Framework
src/TableConsumer.cxx
src/TableTreeHelpers.cxx
src/TopologyPolicy.cxx
src/TopologyPolicyHelpers.cxx
src/TextDriverClient.cxx
src/TimesliceIndex.cxx
src/TimingHelpers.cxx
Expand Down Expand Up @@ -248,6 +249,7 @@ add_executable(o2-test-framework-core
test/test_TimeParallelPipelining.cxx
test/test_TimesliceIndex.cxx
test/test_TypeTraits.cxx
test/test_TopologyPolicies.cxx
test/test_Variants.cxx
test/test_WorkflowHelpers.cxx
test/test_WorkflowSerialization.cxx
Expand Down
23 changes: 23 additions & 0 deletions Framework/Core/include/Framework/TopologyPolicyHelpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
#define O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
#include "Framework/WorkflowSpec.h"
#include <vector>

namespace o2::framework
{
struct TopologyPolicyHelpers {
static auto buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>;
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
92 changes: 92 additions & 0 deletions Framework/Core/src/TopologyPolicyHelpers.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/TopologyPolicyHelpers.h"
#include "Framework/TopologyPolicy.h"

namespace o2::framework
{
namespace
{
void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
{
stream << spec.name;
if (!spec.labels.empty()) {
stream << "(";
bool first = false;
for (auto& label : spec.labels) {
stream << (first ? "" : ",") << label.value;
first = true;
}
stream << ")";
}
}
} // namespace

auto TopologyPolicyHelpers::buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>
{
std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
dependencyCheckers.reserve(physicalWorkflow.size());

for (auto& spec : physicalWorkflow) {
for (auto& policy : topologyPolicies) {
if (policy.matcher(spec)) {
dependencyCheckers.push_back(policy.checkDependency);
break;
}
}
}
assert(dependencyCheckers.size() == physicalWorkflow.size());
// check if DataProcessorSpec at i depends on j
auto checkDependencies = [&workflow = physicalWorkflow,
&dependencyCheckers](int i, int j) {
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
return checker(workflow[i], workflow[j]);
};
std::vector<std::pair<int, int>> edges;
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
if (i == j && checkDependencies(i, j)) {
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
}
bool both = false;
if (checkDependencies(i, j)) {
edges.emplace_back(j, i);
both = true;
}
if (checkDependencies(j, i)) {
edges.emplace_back(i, j);
if (both) {
std::ostringstream str;
describeDataProcessorSpec(str, physicalWorkflow[i]);
str << " has circular dependency with ";
describeDataProcessorSpec(str, physicalWorkflow[j]);
str << ":\n";
for (auto x : {i, j}) {
str << physicalWorkflow[x].name << ":\n";
str << "inputs:\n";
for (auto& input : physicalWorkflow[x].inputs) {
str << "- " << input << " " << (int)input.lifetime << "\n";
}
str << "outputs:\n";
for (auto& output : physicalWorkflow[x].outputs) {
str << "- " << output << " " << (int)output.lifetime << "\n";
}
}
throw std::runtime_error(str.str());
}
}
}
}
return edges;
};
} // namespace o2::framework
70 changes: 2 additions & 68 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include <memory>
#include "Framework/TopologyPolicyHelpers.h"
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <stdexcept>
#include "Framework/BoostOptionsRetriever.h"
Expand Down Expand Up @@ -2835,20 +2836,6 @@ std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
return std::make_unique<o2::framework::ServiceRegistry>();
}

void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
{
stream << spec.name;
if (!spec.labels.empty()) {
stream << "(";
bool first = false;
for (auto& label : spec.labels) {
stream << (first ? "" : ",") << label.value;
first = true;
}
stream << ")";
}
}

// This is a toy executor for the workflow spec
// What it needs to do is:
//
Expand Down Expand Up @@ -3034,65 +3021,12 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
[](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
}

std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
dependencyCheckers.reserve(physicalWorkflow.size());

for (auto& spec : physicalWorkflow) {
for (auto& policy : topologyPolicies) {
if (policy.matcher(spec)) {
dependencyCheckers.push_back(policy.checkDependency);
break;
}
}
}
assert(dependencyCheckers.size() == physicalWorkflow.size());
// check if DataProcessorSpec at i depends on j
auto checkDependencies = [&workflow = physicalWorkflow,
&dependencyCheckers](int i, int j) {
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
return checker(workflow[i], workflow[j]);
};

// Create a list of all the edges, so that we can do a topological sort
// before we create the graph.
std::vector<std::pair<int, int>> edges;

if (physicalWorkflow.size() > 1) {
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
if (i == j && checkDependencies(i, j)) {
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
}
bool both = false;
if (checkDependencies(i, j)) {
edges.emplace_back(j, i);
both = true;
}
if (checkDependencies(j, i)) {
edges.emplace_back(i, j);
if (both) {
std::ostringstream str;
describeDataProcessorSpec(str, physicalWorkflow[i]);
str << " has circular dependency with ";
describeDataProcessorSpec(str, physicalWorkflow[j]);
str << ":\n";
for (auto x : {i, j}) {
str << physicalWorkflow[x].name << ":\n";
str << "inputs:\n";
for (auto& input : physicalWorkflow[x].inputs) {
str << "- " << input << " " << (int)input.lifetime << "\n";
}
str << "outputs:\n";
for (auto& output : physicalWorkflow[x].outputs) {
str << "- " << output << " " << (int)output.lifetime << "\n";
}
}
throw std::runtime_error(str.str());
}
}
}
}
edges = TopologyPolicyHelpers::buildEdges(physicalWorkflow);

auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair<int, int>), edges.size());
if (topoInfos.size() != physicalWorkflow.size()) {
Expand Down
59 changes: 59 additions & 0 deletions Framework/Core/test/test_TopologyPolicies.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Mocking.h"
#include <catch_amalgamated.hpp>
#include "Framework/ChannelSpecHelpers.h"
#include "../src/DeviceSpecHelpers.h"
#include "../src/GraphvizHelpers.h"
#include "../src/WorkflowHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DataSpecUtils.h"
#include "../src/SimpleResourceManager.h"
#include "../src/ComputingResourceHelpers.h"
#include "test_HelperMacros.h"
#include "Framework/TopologyPolicyHelpers.h"

using namespace o2::framework;

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessingWithSporadic()
{
return {
{.name = "input-proxy", .outputs = {OutputSpec{"QEMC", "CELL", 1}, OutputSpec{"CTF", "DONE", 0}}},
{.name = "EMC-Cell-proxy", .inputs = Inputs{InputSpec{"a", "QEMC", "CELL", 1, Lifetime::Sporadic}}},
{.name = "calib-output-proxy-barrel-tf", .inputs = {InputSpec{"a", "CTF", "DONE", 0}}}};
}

TEST_CASE("TestBrokenSporadic")
{
auto workflow = defineDataProcessingWithSporadic();
auto configContext = makeEmptyConfigContext();
auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
auto completionPolicies = CompletionPolicy::createDefaultPolicies();
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
REQUIRE(channelPolicies.empty() == false);
REQUIRE(completionPolicies.empty() == false);
std::vector<DeviceSpec> devices;

std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
REQUIRE(resources.size() == 1);
REQUIRE(resources[0].startPort == 22000);
SimpleResourceManager rm(resources);
auto offers = rm.getAvailableOffers();
REQUIRE(offers.size() == 1);
REQUIRE(offers[0].startPort == 22000);
REQUIRE(offers[0].rangeSize == 5000);

DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
TopologyPolicyHelpers::buildEdges(workflow);
}