Skip to content

Commit 2db9446

Browse files
committed
DPL: move topological sort in a separate file
Simplifies testing.
1 parent aaf3d53 commit 2db9446

File tree

5 files changed

+178
-68
lines changed

5 files changed

+178
-68
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ o2_add_library(Framework
133133
src/TableConsumer.cxx
134134
src/TableTreeHelpers.cxx
135135
src/TopologyPolicy.cxx
136+
src/TopologyPolicyHelpers.cxx
136137
src/TextDriverClient.cxx
137138
src/TimesliceIndex.cxx
138139
src/TimingHelpers.cxx
@@ -248,6 +249,7 @@ add_executable(o2-test-framework-core
248249
test/test_TimeParallelPipelining.cxx
249250
test/test_TimesliceIndex.cxx
250251
test/test_TypeTraits.cxx
252+
test/test_TopologyPolicies.cxx
251253
test/test_Variants.cxx
252254
test/test_WorkflowHelpers.cxx
253255
test/test_WorkflowSerialization.cxx
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
13+
#define O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
14+
#include "Framework/WorkflowSpec.h"
15+
#include <vector>
16+
17+
namespace o2::framework
18+
{
19+
struct TopologyPolicyHelpers {
20+
static auto buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>;
21+
};
22+
} // namespace o2::framework
23+
#endif // O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/TopologyPolicyHelpers.h"
13+
#include "Framework/TopologyPolicy.h"
14+
15+
namespace o2::framework
16+
{
17+
namespace
18+
{
19+
void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
20+
{
21+
stream << spec.name;
22+
if (!spec.labels.empty()) {
23+
stream << "(";
24+
bool first = false;
25+
for (auto& label : spec.labels) {
26+
stream << (first ? "" : ",") << label.value;
27+
first = true;
28+
}
29+
stream << ")";
30+
}
31+
}
32+
} // namespace
33+
34+
auto TopologyPolicyHelpers::buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>
35+
{
36+
std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
37+
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
38+
dependencyCheckers.reserve(physicalWorkflow.size());
39+
40+
for (auto& spec : physicalWorkflow) {
41+
for (auto& policy : topologyPolicies) {
42+
if (policy.matcher(spec)) {
43+
dependencyCheckers.push_back(policy.checkDependency);
44+
break;
45+
}
46+
}
47+
}
48+
assert(dependencyCheckers.size() == physicalWorkflow.size());
49+
// check if DataProcessorSpec at i depends on j
50+
auto checkDependencies = [&workflow = physicalWorkflow,
51+
&dependencyCheckers](int i, int j) {
52+
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
53+
return checker(workflow[i], workflow[j]);
54+
};
55+
std::vector<std::pair<int, int>> edges;
56+
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
57+
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
58+
if (i == j && checkDependencies(i, j)) {
59+
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
60+
}
61+
bool both = false;
62+
if (checkDependencies(i, j)) {
63+
edges.emplace_back(j, i);
64+
both = true;
65+
}
66+
if (checkDependencies(j, i)) {
67+
edges.emplace_back(i, j);
68+
if (both) {
69+
std::ostringstream str;
70+
describeDataProcessorSpec(str, physicalWorkflow[i]);
71+
str << " has circular dependency with ";
72+
describeDataProcessorSpec(str, physicalWorkflow[j]);
73+
str << ":\n";
74+
for (auto x : {i, j}) {
75+
str << physicalWorkflow[x].name << ":\n";
76+
str << "inputs:\n";
77+
for (auto& input : physicalWorkflow[x].inputs) {
78+
str << "- " << input << " " << (int)input.lifetime << "\n";
79+
}
80+
str << "outputs:\n";
81+
for (auto& output : physicalWorkflow[x].outputs) {
82+
str << "- " << output << " " << (int)output.lifetime << "\n";
83+
}
84+
}
85+
throw std::runtime_error(str.str());
86+
}
87+
}
88+
}
89+
}
90+
return edges;
91+
};
92+
} // namespace o2::framework

Framework/Core/src/runDataProcessing.cxx

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111
#include <memory>
12+
#include "Framework/TopologyPolicyHelpers.h"
1213
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
1314
#include <stdexcept>
1415
#include "Framework/BoostOptionsRetriever.h"
@@ -2835,20 +2836,6 @@ std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
28352836
return std::make_unique<o2::framework::ServiceRegistry>();
28362837
}
28372838

2838-
void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
2839-
{
2840-
stream << spec.name;
2841-
if (!spec.labels.empty()) {
2842-
stream << "(";
2843-
bool first = false;
2844-
for (auto& label : spec.labels) {
2845-
stream << (first ? "" : ",") << label.value;
2846-
first = true;
2847-
}
2848-
stream << ")";
2849-
}
2850-
}
2851-
28522839
// This is a toy executor for the workflow spec
28532840
// What it needs to do is:
28542841
//
@@ -3034,65 +3021,12 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
30343021
[](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
30353022
}
30363023

3037-
std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
3038-
std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
3039-
dependencyCheckers.reserve(physicalWorkflow.size());
3040-
3041-
for (auto& spec : physicalWorkflow) {
3042-
for (auto& policy : topologyPolicies) {
3043-
if (policy.matcher(spec)) {
3044-
dependencyCheckers.push_back(policy.checkDependency);
3045-
break;
3046-
}
3047-
}
3048-
}
3049-
assert(dependencyCheckers.size() == physicalWorkflow.size());
3050-
// check if DataProcessorSpec at i depends on j
3051-
auto checkDependencies = [&workflow = physicalWorkflow,
3052-
&dependencyCheckers](int i, int j) {
3053-
TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
3054-
return checker(workflow[i], workflow[j]);
3055-
};
3056-
30573024
// Create a list of all the edges, so that we can do a topological sort
30583025
// before we create the graph.
30593026
std::vector<std::pair<int, int>> edges;
30603027

30613028
if (physicalWorkflow.size() > 1) {
3062-
for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
3063-
for (size_t j = i; j < physicalWorkflow.size(); ++j) {
3064-
if (i == j && checkDependencies(i, j)) {
3065-
throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
3066-
}
3067-
bool both = false;
3068-
if (checkDependencies(i, j)) {
3069-
edges.emplace_back(j, i);
3070-
both = true;
3071-
}
3072-
if (checkDependencies(j, i)) {
3073-
edges.emplace_back(i, j);
3074-
if (both) {
3075-
std::ostringstream str;
3076-
describeDataProcessorSpec(str, physicalWorkflow[i]);
3077-
str << " has circular dependency with ";
3078-
describeDataProcessorSpec(str, physicalWorkflow[j]);
3079-
str << ":\n";
3080-
for (auto x : {i, j}) {
3081-
str << physicalWorkflow[x].name << ":\n";
3082-
str << "inputs:\n";
3083-
for (auto& input : physicalWorkflow[x].inputs) {
3084-
str << "- " << input << " " << (int)input.lifetime << "\n";
3085-
}
3086-
str << "outputs:\n";
3087-
for (auto& output : physicalWorkflow[x].outputs) {
3088-
str << "- " << output << " " << (int)output.lifetime << "\n";
3089-
}
3090-
}
3091-
throw std::runtime_error(str.str());
3092-
}
3093-
}
3094-
}
3095-
}
3029+
edges = TopologyPolicyHelpers::buildEdges(physicalWorkflow);
30963030

30973031
auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair<int, int>), edges.size());
30983032
if (topoInfos.size() != physicalWorkflow.size()) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Mocking.h"
13+
#include <catch_amalgamated.hpp>
14+
#include "Framework/ChannelSpecHelpers.h"
15+
#include "../src/DeviceSpecHelpers.h"
16+
#include "../src/GraphvizHelpers.h"
17+
#include "../src/WorkflowHelpers.h"
18+
#include "Framework/DeviceSpec.h"
19+
#include "Framework/WorkflowSpec.h"
20+
#include "Framework/DataSpecUtils.h"
21+
#include "../src/SimpleResourceManager.h"
22+
#include "../src/ComputingResourceHelpers.h"
23+
#include "test_HelperMacros.h"
24+
#include "Framework/TopologyPolicyHelpers.h"
25+
26+
using namespace o2::framework;
27+
28+
// This is how you can define your processing in a declarative way
29+
WorkflowSpec defineDataProcessingWithSporadic()
30+
{
31+
return {
32+
{.name = "input-proxy", .outputs = {OutputSpec{"QEMC", "CELL", 1}, OutputSpec{"CTF", "DONE", 0}}},
33+
{.name = "EMC-Cell-proxy", .inputs = Inputs{InputSpec{"a", "QEMC", "CELL", 1, Lifetime::Sporadic}}},
34+
{.name = "calib-output-proxy-barrel-tf", .inputs = {InputSpec{"a", "CTF", "DONE", 0}}}};
35+
}
36+
37+
TEST_CASE("TestBrokenSporadic")
38+
{
39+
auto workflow = defineDataProcessingWithSporadic();
40+
auto configContext = makeEmptyConfigContext();
41+
auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
42+
auto completionPolicies = CompletionPolicy::createDefaultPolicies();
43+
auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
44+
REQUIRE(channelPolicies.empty() == false);
45+
REQUIRE(completionPolicies.empty() == false);
46+
std::vector<DeviceSpec> devices;
47+
48+
std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
49+
REQUIRE(resources.size() == 1);
50+
REQUIRE(resources[0].startPort == 22000);
51+
SimpleResourceManager rm(resources);
52+
auto offers = rm.getAvailableOffers();
53+
REQUIRE(offers.size() == 1);
54+
REQUIRE(offers[0].startPort == 22000);
55+
REQUIRE(offers[0].rangeSize == 5000);
56+
57+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
58+
TopologyPolicyHelpers::buildEdges(workflow);
59+
}

0 commit comments

Comments
 (0)