Skip to content

Commit c381e69

Browse files
committed
DPL: first step to make workflow definition plugins
This is the first step towards having the workflow definition inside plugins rather than in executables. This will allow accumulating plugins which are needed to instantiate a topology and do the option parsing / topology building only once, simplifying the current case. The end goal is to allow the driver to preload certain common services (e.g. ROOT) and share it among the different tasks (which at the moment it's not allowed because different tasks are in different executables). Moreover this will allow us to coalesce strictly coupled dataprocessors and reduce the number of running processes. For now the plugins are embedded in the executables and behave exactly like before.
1 parent 02090ec commit c381e69

File tree

3 files changed

+122
-85
lines changed

3 files changed

+122
-85
lines changed

Framework/Core/include/Framework/Plugins.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
4444
// using the arrow dataset API
4545
RootObjectReadingImplementation,
4646

47+
// A plugin which defines a whole workflow. This will be used to separate
48+
// workflows in shared libraries and run them via a separate loader.
49+
Workflow,
4750
// A plugin which was not initialised properly.
4851
Unknown
4952
};

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include "Framework/CustomWorkflowTerminationHook.h"
2727
#include "Framework/CommonServices.h"
2828
#include "Framework/WorkflowCustomizationHelpers.h"
29+
#include "Framework/WorkflowDefinitionContext.h"
2930
#include "Framework/Logger.h"
31+
#include "Framework/Plugins.h"
3032
#include "Framework/CheckTypes.h"
3133
#include "Framework/StructToTuple.h"
3234
#include "ResourcePolicy.h"
@@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
125127
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
126128

127129
// This comes from the framework itself. This way we avoid code duplication.
128-
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
129-
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
130-
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
131-
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
132-
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
133-
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
134-
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
135-
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
136-
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
137-
o2::framework::ConfigContext& configContext);
130+
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);
138131

139132
void doDefaultWorkflowTerminationHook();
140133

@@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)
167160

168161
void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
169162

170-
o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
171-
o2::framework::ServiceRegistry& configRegistry,
172-
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173-
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
163+
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
164+
o2::framework::ServiceRegistry& configRegistry,
165+
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
166+
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
174167

175168
std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();
176169

177-
int mainNoCatch(int argc, char** argv)
178-
{
179-
using namespace o2::framework;
170+
char* getIdString(int argc, char** argv);
180171

181-
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
182-
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
183-
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
184-
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
185-
186-
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187-
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188-
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189-
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190-
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
191-
192-
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
193-
std::vector<ConfigParamSpec> extraOptions;
194-
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
195-
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
196-
197-
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
198-
overrideAll(configContext, specs);
199-
for (auto& spec : specs) {
200-
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
201-
}
202-
std::vector<ChannelConfigurationPolicy> channelPolicies;
203-
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
204-
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205-
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206-
return doMain(argc, argv, specs,
207-
channelPolicies, completionPolicies, dispatchPolicies,
208-
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
172+
#define STRINGIZE_NX(A) #A
173+
#define STRINGIZE(A) STRINGIZE_NX(A)
174+
175+
// This is to allow the old "executable" based behavior
176+
// Each executable will contain a plugin called InternalWorkflow
177+
// In case one wants to use the new DSO based approach, the
178+
// name of the plugin an the library name where it is located
179+
// will have to be specified at build time.
180+
#ifndef DPL_WORKFLOW_PLUGIN_NAME
181+
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
182+
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
183+
#error Missing DPL_WORKFLOW_PLUGIN_NAME
184+
#endif
185+
#define DPL_WORKFLOW_PLUGIN_LIBRARY ""
186+
#endif
187+
188+
consteval char const* pluginName()
189+
{
190+
return DPL_WORKFLOW_PLUGIN_LIBRARY ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
209191
}
210192

211-
int callMain(int argc, char** argv, int (*)(int, char**));
212-
char* getIdString(int argc, char** argv);
193+
// Executables behave this way
194+
int callMain(int argc, char** argv, char const* pluginName);
213195

214196
int main(int argc, char** argv)
215197
{
216198
using namespace o2::framework;
217199

218-
int result = callMain(argc, argv, mainNoCatch);
200+
int result = callMain(argc, argv, pluginName());
219201

220202
char* idstring = getIdString(argc, argv);
221203
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
222204
callWorkflowTermination(onWorkflowTerminationHook, idstring);
223205

224206
return result;
225207
}
208+
209+
struct WorkflowDefinition {
210+
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
211+
};
212+
213+
struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
214+
o2::framework::WorkflowDefinition* create() override
215+
{
216+
return new o2::framework::WorkflowDefinition{
217+
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
218+
using namespace o2::framework;
219+
WorkflowDefinitionContext workflowContext;
220+
221+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
222+
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
223+
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
224+
225+
workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
226+
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
227+
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
228+
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
229+
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();
230+
231+
workflowContext.configRegistry = createRegistry();
232+
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);
233+
234+
workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
235+
overrideAll(*workflowContext.configContext, workflowContext.specs);
236+
for (auto& spec : workflowContext.specs) {
237+
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
238+
}
239+
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
240+
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
241+
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
242+
return workflowContext;
243+
}};
244+
}
245+
};
246+
247+
// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
248+
extern "C" {
249+
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
250+
{
251+
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
252+
return previous;
253+
}
254+
}
255+
226256
#endif

0 commit comments

Comments
 (0)