Skip to content

Commit e2ba424

Browse files
committed
DPL: first step to make workflow definition plugins
This is the first step towards having the workflow definition inside plugins rather than in executables. This will allow accumulating plugins which are needed to instantiate a topology and do the option parsing / topology building only once, simplifying the current case. The end goal is to allow the driver to preload certain common services (e.g. ROOT) and share it among the different tasks (which at the moment it's not allowed because different tasks are in different executables). Moreover this will allow us to coalesce strictly coupled dataprocessors and reduce the number of running processes. For now the plugins are embedded in the executables and behave exactly like before.
1 parent ad25169 commit e2ba424

File tree

10 files changed

+394
-150
lines changed

10 files changed

+394
-150
lines changed

Framework/Core/include/Framework/DataProcessorInfo.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ struct DataProcessorInfo {
2525
/// Name of the associated DataProcessorSpec
2626
std::string name = "Unknown";
2727
/// The executable name of the program which holds the DataProcessorSpec
28-
std::string executable = "/bin/false";
28+
std::string executable = "";
29+
/// The plugin spec of the plugin which holds the DataProcessorSpec
30+
std::string plugin = "";
2931
/// The argument passed on the command line for this DataProcessorSpec
3032
std::vector<std::string> cmdLineArgs = {};
3133
/// The workflow options which are available for the associated DataProcessorSpec
@@ -34,6 +36,6 @@ struct DataProcessorInfo {
3436
std::vector<std::string> channels = {};
3537
};
3638

37-
} // namespace o2
39+
} // namespace o2::framework
3840

3941
#endif // O2_FRAMEWORK_CORE_DATAPROCESSORINFO_H_

Framework/Core/include/Framework/DeviceExecution.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,25 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_DEVICEEXECUTION_H
12-
#define FRAMEWORK_DEVICEEXECUTION_H
11+
#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_
12+
#define O2_FRAMEWORK_DEVICEEXECUTION_H_
1313

1414
#include <vector>
1515

16-
namespace o2
17-
{
18-
namespace framework
16+
namespace o2::framework
1917
{
2018

2119
/// This represent one single execution of a Device. It's meant to hold
2220
/// information which can change between one execution of a Device and the
2321
/// other, e.g. the executable name or the arguments it is started with.
2422
struct DeviceExecution {
23+
std::string plugin;
2524
/// The options passed to a given device
2625
std::vector<char*> args;
2726
/// The environment to be passed to a given device
2827
std::vector<char*> environ;
2928
};
3029

31-
} // namespace framework
32-
} // namespace o2
33-
#endif
30+
} // namespace o2::framework
31+
32+
#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_

Framework/Core/include/Framework/Plugins.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
4444
// using the arrow dataset API
4545
RootObjectReadingImplementation,
4646

47+
// A plugin which defines a whole workflow. This will be used to separate
48+
// workflows in shared libraries and run them via a separate loader.
49+
Workflow,
4750
// A plugin which was not initialised properly.
4851
Unknown
4952
};
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
12+
#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
13+
14+
#include "Framework/ConfigParamSpec.h"
15+
#include "Framework/CompletionPolicy.h"
16+
#include "Framework/DispatchPolicy.h"
17+
#include "Framework/ResourcePolicy.h"
18+
#include "Framework/CallbacksPolicy.h"
19+
#include "Framework/SendingPolicy.h"
20+
#include "Framework/WorkflowSpec.h"
21+
#include "Framework/ChannelConfigurationPolicy.h"
22+
#include <vector>
23+
24+
namespace o2::framework
25+
{
26+
27+
struct WorkflowDefinitionContext {
28+
std::vector<ConfigParamSpec> workflowOptions;
29+
std::vector<CompletionPolicy> completionPolicies;
30+
std::vector<DispatchPolicy> dispatchPolicies;
31+
std::vector<ResourcePolicy> resourcePolicies;
32+
std::vector<CallbacksPolicy> callbacksPolicies;
33+
std::vector<SendingPolicy> sendingPolicies;
34+
std::vector<ConfigParamSpec> extraOptions;
35+
std::vector<ChannelConfigurationPolicy> channelPolicies;
36+
std::unique_ptr<ConfigContext> configContext;
37+
38+
// For the moment, let's put them here. We should
39+
// probably move them to a different place, since these are not really part
40+
// of the workflow definition but will be there also at runtine.
41+
std::unique_ptr<ServiceRegistry> configRegistry{nullptr};
42+
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
43+
44+
o2::framework::WorkflowSpec specs;
45+
};
46+
47+
struct WorkflowDefinition {
48+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
49+
};
50+
51+
struct WorkflowPlugin {
52+
virtual o2::framework::WorkflowDefinition* create() = 0;
53+
};
54+
55+
} // namespace o2::framework
56+
#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include "Framework/CustomWorkflowTerminationHook.h"
2727
#include "Framework/CommonServices.h"
2828
#include "Framework/WorkflowCustomizationHelpers.h"
29+
#include "Framework/WorkflowDefinitionContext.h"
2930
#include "Framework/Logger.h"
31+
#include "Framework/Plugins.h"
3032
#include "Framework/CheckTypes.h"
3133
#include "Framework/StructToTuple.h"
3234
#include "ResourcePolicy.h"
@@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
125127
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
126128

127129
// This comes from the framework itself. This way we avoid code duplication.
128-
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
129-
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
130-
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
131-
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
132-
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
133-
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
134-
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
135-
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
136-
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
137-
o2::framework::ConfigContext& configContext);
130+
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);
138131

139132
void doDefaultWorkflowTerminationHook();
140133

@@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)
167160

168161
void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
169162

170-
o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
171-
o2::framework::ServiceRegistry& configRegistry,
172-
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173-
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
163+
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
164+
o2::framework::ServiceRegistry& configRegistry,
165+
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
166+
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
174167

175168
std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();
176169

177-
int mainNoCatch(int argc, char** argv)
178-
{
179-
using namespace o2::framework;
170+
char* getIdString(int argc, char** argv);
180171

181-
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
182-
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
183-
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
184-
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
185-
186-
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187-
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188-
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189-
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190-
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
191-
192-
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
193-
std::vector<ConfigParamSpec> extraOptions;
194-
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
195-
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
196-
197-
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
198-
overrideAll(configContext, specs);
199-
for (auto& spec : specs) {
200-
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
201-
}
202-
std::vector<ChannelConfigurationPolicy> channelPolicies;
203-
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
204-
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205-
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206-
return doMain(argc, argv, specs,
207-
channelPolicies, completionPolicies, dispatchPolicies,
208-
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
172+
#define STRINGIZE_NX(A) #A
173+
#define STRINGIZE(A) STRINGIZE_NX(A)
174+
175+
// This is to allow the old "executable" based behavior
176+
// Each executable will contain a plugin called InternalWorkflow
177+
// In case one wants to use the new DSO based approach, the
178+
// name of the plugin an the library name where it is located
179+
// will have to be specified at build time.
180+
#ifndef DPL_WORKFLOW_PLUGIN_NAME
181+
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
182+
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
183+
#error Missing DPL_WORKFLOW_PLUGIN_NAME
184+
#endif
185+
#define DPL_WORKFLOW_PLUGIN_LIBRARY
186+
#endif
187+
188+
consteval char const* pluginName()
189+
{
190+
return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
209191
}
210192

211-
int callMain(int argc, char** argv, int (*)(int, char**));
212-
char* getIdString(int argc, char** argv);
193+
// Executables behave this way
194+
int callMain(int argc, char** argv, char const* pluginName);
213195

214196
int main(int argc, char** argv)
215197
{
216198
using namespace o2::framework;
217199

218-
int result = callMain(argc, argv, mainNoCatch);
200+
int result = callMain(argc, argv, pluginName());
219201

220202
char* idstring = getIdString(argc, argv);
221203
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
222204
callWorkflowTermination(onWorkflowTerminationHook, idstring);
223205

224206
return result;
225207
}
208+
209+
struct WorkflowDefinition {
210+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
211+
};
212+
213+
struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
214+
o2::framework::WorkflowDefinition* create() override
215+
{
216+
return new o2::framework::WorkflowDefinition{
217+
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
218+
using namespace o2::framework;
219+
WorkflowDefinitionContext workflowContext;
220+
221+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
222+
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
223+
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
224+
225+
workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
226+
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
227+
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
228+
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
229+
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();
230+
231+
workflowContext.configRegistry = createRegistry();
232+
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);
233+
234+
workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
235+
overrideAll(*workflowContext.configContext, workflowContext.specs);
236+
for (auto& spec : workflowContext.specs) {
237+
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
238+
}
239+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
240+
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
241+
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
242+
return workflowContext;
243+
}};
244+
}
245+
};
246+
247+
// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
248+
extern "C" {
249+
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
250+
{
251+
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
252+
return previous;
253+
}
254+
}
255+
226256
#endif

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,6 +1711,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
17111711
}
17121712
O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s",
17131713
spec.id.c_str(), str.str().c_str());
1714+
// Copy the plugin over from the DataProcessingInfo
1715+
execution.plugin = pi->plugin;
17141716
}
17151717
}
17161718

Framework/Core/src/WorkflowSerializationHelpers.cxx

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
9191
IN_DATAPROCESSOR_INFO,
9292
IN_DATAPROCESSOR_INFO_NAME,
9393
IN_DATAPROCESSOR_INFO_EXECUTABLE,
94+
IN_DATAPROCESSOR_INFO_PLUGIN,
9495
IN_DATAPROCESSOR_INFO_ARGS,
9596
IN_DATAPROCESSOR_INFO_ARG,
9697
IN_DATAPROCESSOR_INFO_CHANNELS,
@@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
263264
case State::IN_DATAPROCESSOR_INFO_EXECUTABLE:
264265
s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
265266
break;
267+
case State::IN_DATAPROCESSOR_INFO_PLUGIN:
268+
s << "IN_DATAPROCESSOR_INFO_PLUGIN";
269+
break;
266270
case State::IN_DATAPROCESSOR_INFO_ARGS:
267271
s << "IN_DATAPROCESSOR_INFO_ARGS";
268272
break;
@@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
706710
push(State::IN_DATAPROCESSOR_INFO_NAME);
707711
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
708712
push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE);
713+
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) {
714+
push(State::IN_DATAPROCESSOR_INFO_PLUGIN);
709715
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
710716
push(State::IN_DATAPROCESSOR_INFO_ARGS);
711717
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
@@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
732738
} else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) {
733739
assert(metadata.size());
734740
metadata.back().executable = s;
741+
} else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) {
742+
assert(metadata.size());
743+
metadata.back().plugin = s;
735744
} else if (in(State::IN_INPUT_BINDING)) {
736745
binding = s;
737746
} else if (in(State::IN_INPUT_ORIGIN)) {
@@ -888,7 +897,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
888897
if (!states.empty()) {
889898
debug << " now in " << states.back();
890899
}
891-
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
900+
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size() + 1}, "import", "POP: %s", debug.str().c_str());
892901
return result;
893902
}
894903
bool in(State o)
@@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
12541263
w.StartObject();
12551264
w.Key("name");
12561265
w.String(info.name.c_str());
1257-
w.Key("executable");
1258-
w.String(info.executable.c_str());
1266+
if (!info.executable.empty()) {
1267+
w.Key("executable");
1268+
w.String(info.executable.c_str());
1269+
}
1270+
if (!info.plugin.empty()) {
1271+
w.Key("plugin");
1272+
w.String(info.plugin.c_str());
1273+
}
12591274
w.Key("cmdLineArgs");
12601275
w.StartArray();
12611276
for (auto& arg : info.cmdLineArgs) {

0 commit comments

Comments
 (0)