Skip to content
Closed
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/DataProcessorInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ struct DataProcessorInfo {
/// Name of the associated DataProcessorSpec
std::string name = "Unknown";
/// The executable name of the program which holds the DataProcessorSpec
std::string executable = "/bin/false";
std::string executable = "";
/// The plugin spec of the plugin which holds the DataProcessorSpec
std::string plugin = "";
/// The argument passed on the command line for this DataProcessorSpec
std::vector<std::string> cmdLineArgs = {};
/// The workflow options which are available for the associated DataProcessorSpec
Expand Down
15 changes: 7 additions & 8 deletions Framework/Core/include/Framework/DeviceExecution.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,25 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_DEVICEEXECUTION_H
#define FRAMEWORK_DEVICEEXECUTION_H
#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_
#define O2_FRAMEWORK_DEVICEEXECUTION_H_

#include <vector>

namespace o2
{
namespace framework
namespace o2::framework
{

/// This represent one single execution of a Device. It's meant to hold
/// information which can change between one execution of a Device and the
/// other, e.g. the executable name or the arguments it is started with.
struct DeviceExecution {
std::string plugin;
/// The options passed to a given device
std::vector<char*> args;
/// The environment to be passed to a given device
std::vector<char*> environ;
};

} // namespace framework
} // namespace o2
#endif
} // namespace o2::framework

#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/PluginManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct PluginManager {
#else
auto libraryName = fmt::format("lib{}.so", loadablePlugin.library);
#endif
auto ret = uv_dlopen(libraryName.c_str(), &handle);
auto ret = uv_dlopen(loadablePlugin.library.empty() ? nullptr : libraryName.c_str(), &handle);
if (ret != 0) {
LOGP(error, "Could not load library {}", loadablePlugin.library);
LOG(error) << uv_dlerror(&handle);
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/Plugins.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
// using the arrow dataset API
RootObjectReadingImplementation,

// A plugin which defines a whole workflow. This will be used to separate
// workflows in shared libraries and run them via a separate loader.
Workflow,
// A plugin which was not initialised properly.
Unknown
};
Expand Down
56 changes: 56 additions & 0 deletions Framework/Core/include/Framework/WorkflowDefinitionContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_

#include "Framework/ConfigParamSpec.h"
#include "Framework/CompletionPolicy.h"
#include "Framework/DispatchPolicy.h"
#include "Framework/ResourcePolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/SendingPolicy.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include <vector>

namespace o2::framework
{

struct WorkflowDefinitionContext {
std::vector<ConfigParamSpec> workflowOptions;
std::vector<CompletionPolicy> completionPolicies;
std::vector<DispatchPolicy> dispatchPolicies;
std::vector<ResourcePolicy> resourcePolicies;
std::vector<CallbacksPolicy> callbacksPolicies;
std::vector<SendingPolicy> sendingPolicies;
std::vector<ConfigParamSpec> extraOptions;
std::vector<ChannelConfigurationPolicy> channelPolicies;
std::unique_ptr<ConfigContext> configContext;

// For the moment, let's put them here. We should
// probably move them to a different place, since these are not really part
// of the workflow definition but will be there also at runtine.
std::unique_ptr<ServiceRegistry> configRegistry{nullptr};
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};

o2::framework::WorkflowSpec specs;
};

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct WorkflowPlugin {
virtual o2::framework::WorkflowDefinition* create() = 0;
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
126 changes: 78 additions & 48 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/WorkflowDefinitionContext.h"
#include "Framework/Logger.h"
#include "Framework/Plugins.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "ResourcePolicy.h"
Expand Down Expand Up @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

// This comes from the framework itself. This way we avoid code duplication.
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
o2::framework::ConfigContext& configContext);
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);

void doDefaultWorkflowTerminationHook();

Expand Down Expand Up @@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();

int mainNoCatch(int argc, char** argv)
{
using namespace o2::framework;
char* getIdString(int argc, char** argv);

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
std::vector<ConfigParamSpec> extraOptions;
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);

o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideAll(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return doMain(argc, argv, specs,
channelPolicies, completionPolicies, dispatchPolicies,
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
#define STRINGIZE_NX(A) #A
#define STRINGIZE(A) STRINGIZE_NX(A)

// This is to allow the old "executable" based behavior
// Each executable will contain a plugin called InternalWorkflow
// In case one wants to use the new DSO based approach, the
// name of the plugin an the library name where it is located
// will have to be specified at build time.
#ifndef DPL_WORKFLOW_PLUGIN_NAME
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
#error Missing DPL_WORKFLOW_PLUGIN_NAME
#endif
#define DPL_WORKFLOW_PLUGIN_LIBRARY
#endif

consteval char const* pluginName()
{
return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
}

int callMain(int argc, char** argv, int (*)(int, char**));
char* getIdString(int argc, char** argv);
// Executables behave this way
int callMain(int argc, char** argv, char const* pluginName);

int main(int argc, char** argv)
{
using namespace o2::framework;

int result = callMain(argc, argv, mainNoCatch);
int result = callMain(argc, argv, pluginName());

char* idstring = getIdString(argc, argv);
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
callWorkflowTermination(onWorkflowTerminationHook, idstring);

return result;
}

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
o2::framework::WorkflowDefinition* create() override
{
return new o2::framework::WorkflowDefinition{
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
using namespace o2::framework;
WorkflowDefinitionContext workflowContext;

UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();

workflowContext.configRegistry = createRegistry();
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);

workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
overrideAll(*workflowContext.configContext, workflowContext.specs);
for (auto& spec : workflowContext.specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return workflowContext;
}};
}
};

// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
extern "C" {
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
{
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
return previous;
}
}

#endif
4 changes: 2 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,8 @@ void DataProcessingDevice::InitTask()
if (deviceContext.sigusr1Handle == nullptr) {
deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
deviceContext.sigusr1Handle->data = &mServiceRegistry;
uv_signal_init(state.loop, deviceContext.sigusr1Handle);
uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
// uv_signal_init(state.loop, deviceContext.sigusr1Handle);
// uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
}
// If there is any signal, we want to make sure they are active
for (auto& handle : state.activeSignals) {
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
}
O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s",
spec.id.c_str(), str.str().c_str());
// Copy the plugin over from the DataProcessingInfo
execution.plugin = pi->plugin;
}
}

Expand Down
19 changes: 17 additions & 2 deletions Framework/Core/src/WorkflowSerializationHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
IN_DATAPROCESSOR_INFO,
IN_DATAPROCESSOR_INFO_NAME,
IN_DATAPROCESSOR_INFO_EXECUTABLE,
IN_DATAPROCESSOR_INFO_PLUGIN,
IN_DATAPROCESSOR_INFO_ARGS,
IN_DATAPROCESSOR_INFO_ARG,
IN_DATAPROCESSOR_INFO_CHANNELS,
Expand Down Expand Up @@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
case State::IN_DATAPROCESSOR_INFO_EXECUTABLE:
s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
break;
case State::IN_DATAPROCESSOR_INFO_PLUGIN:
s << "IN_DATAPROCESSOR_INFO_PLUGIN";
break;
case State::IN_DATAPROCESSOR_INFO_ARGS:
s << "IN_DATAPROCESSOR_INFO_ARGS";
break;
Expand Down Expand Up @@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
push(State::IN_DATAPROCESSOR_INFO_NAME);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_PLUGIN);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_ARGS);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
Expand All @@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
} else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) {
assert(metadata.size());
metadata.back().executable = s;
} else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) {
assert(metadata.size());
metadata.back().plugin = s;
} else if (in(State::IN_INPUT_BINDING)) {
binding = s;
} else if (in(State::IN_INPUT_ORIGIN)) {
Expand Down Expand Up @@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
w.StartObject();
w.Key("name");
w.String(info.name.c_str());
w.Key("executable");
w.String(info.executable.c_str());
if (!info.executable.empty()) {
w.Key("executable");
w.String(info.executable.c_str());
}
if (!info.plugin.empty()) {
w.Key("plugin");
w.String(info.plugin.c_str());
}
w.Key("cmdLineArgs");
w.StartArray();
for (auto& arg : info.cmdLineArgs) {
Expand Down
Loading