Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Framework/Core/include/Framework/DataProcessorInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ struct DataProcessorInfo {
/// Name of the associated DataProcessorSpec
std::string name = "Unknown";
/// The executable name of the program which holds the DataProcessorSpec
std::string executable = "/bin/false";
std::string executable = "";
/// The plugin spec of the plugin which holds the DataProcessorSpec
std::string plugin = "";
/// The argument passed on the command line for this DataProcessorSpec
std::vector<std::string> cmdLineArgs = {};
/// The workflow options which are available for the associated DataProcessorSpec
Expand All @@ -34,6 +36,6 @@ struct DataProcessorInfo {
std::vector<std::string> channels = {};
};

} // namespace o2
} // namespace o2::framework

#endif // O2_FRAMEWORK_CORE_DATAPROCESSORINFO_H_
15 changes: 7 additions & 8 deletions Framework/Core/include/Framework/DeviceExecution.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,25 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_DEVICEEXECUTION_H
#define FRAMEWORK_DEVICEEXECUTION_H
#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_
#define O2_FRAMEWORK_DEVICEEXECUTION_H_

#include <vector>

namespace o2
{
namespace framework
namespace o2::framework
{

/// This represent one single execution of a Device. It's meant to hold
/// information which can change between one execution of a Device and the
/// other, e.g. the executable name or the arguments it is started with.
struct DeviceExecution {
std::string plugin;
/// The options passed to a given device
std::vector<char*> args;
/// The environment to be passed to a given device
std::vector<char*> environ;
};

} // namespace framework
} // namespace o2
#endif
} // namespace o2::framework

#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/Plugins.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
// using the arrow dataset API
RootObjectReadingImplementation,

// A plugin which defines a whole workflow. This will be used to separate
// workflows in shared libraries and run them via a separate loader.
Workflow,
// A plugin which was not initialised properly.
Unknown
};
Expand Down
56 changes: 56 additions & 0 deletions Framework/Core/include/Framework/WorkflowDefinitionContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_

#include "Framework/ConfigParamSpec.h"
#include "Framework/CompletionPolicy.h"
#include "Framework/DispatchPolicy.h"
#include "Framework/ResourcePolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/SendingPolicy.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include <vector>

namespace o2::framework
{

struct WorkflowDefinitionContext {
std::vector<ConfigParamSpec> workflowOptions;
std::vector<CompletionPolicy> completionPolicies;
std::vector<DispatchPolicy> dispatchPolicies;
std::vector<ResourcePolicy> resourcePolicies;
std::vector<CallbacksPolicy> callbacksPolicies;
std::vector<SendingPolicy> sendingPolicies;
std::vector<ConfigParamSpec> extraOptions;
std::vector<ChannelConfigurationPolicy> channelPolicies;
std::unique_ptr<ConfigContext> configContext;

// For the moment, let's put them here. We should
// probably move them to a different place, since these are not really part
// of the workflow definition but will be there also at runtine.
std::unique_ptr<ServiceRegistry> configRegistry{nullptr};
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};

o2::framework::WorkflowSpec specs;
};

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct WorkflowPlugin {
virtual o2::framework::WorkflowDefinition* create() = 0;
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
126 changes: 78 additions & 48 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/WorkflowDefinitionContext.h"
#include "Framework/Logger.h"
#include "Framework/Plugins.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "ResourcePolicy.h"
Expand Down Expand Up @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

// This comes from the framework itself. This way we avoid code duplication.
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
o2::framework::ConfigContext& configContext);
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);

void doDefaultWorkflowTerminationHook();

Expand Down Expand Up @@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();

int mainNoCatch(int argc, char** argv)
{
using namespace o2::framework;
char* getIdString(int argc, char** argv);

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
std::vector<ConfigParamSpec> extraOptions;
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);

o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideAll(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return doMain(argc, argv, specs,
channelPolicies, completionPolicies, dispatchPolicies,
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
#define STRINGIZE_NX(A) #A
#define STRINGIZE(A) STRINGIZE_NX(A)

// This is to allow the old "executable" based behavior
// Each executable will contain a plugin called InternalWorkflow
// In case one wants to use the new DSO based approach, the
// name of the plugin an the library name where it is located
// will have to be specified at build time.
#ifndef DPL_WORKFLOW_PLUGIN_NAME
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
#error Missing DPL_WORKFLOW_PLUGIN_NAME
#endif
#define DPL_WORKFLOW_PLUGIN_LIBRARY
#endif

consteval char const* pluginName()
{
return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
}

int callMain(int argc, char** argv, int (*)(int, char**));
char* getIdString(int argc, char** argv);
// Executables behave this way
int callMain(int argc, char** argv, char const* pluginName);

int main(int argc, char** argv)
{
using namespace o2::framework;

int result = callMain(argc, argv, mainNoCatch);
int result = callMain(argc, argv, pluginName());

char* idstring = getIdString(argc, argv);
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
callWorkflowTermination(onWorkflowTerminationHook, idstring);

return result;
}

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
o2::framework::WorkflowDefinition* create() override
{
return new o2::framework::WorkflowDefinition{
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
using namespace o2::framework;
WorkflowDefinitionContext workflowContext;

UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();

workflowContext.configRegistry = createRegistry();
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);

workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
overrideAll(*workflowContext.configContext, workflowContext.specs);
for (auto& spec : workflowContext.specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return workflowContext;
}};
}
};

// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
extern "C" {
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
{
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
return previous;
}
}

#endif
4 changes: 4 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("early-forward-policy", bpo::value<std::string>());
realOdesc.add_options()("session", bpo::value<std::string>());
realOdesc.add_options()("signposts", bpo::value<std::string>());
realOdesc.add_options()("workflow-plugin", bpo::value<std::string>());
filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc);
wordfree(&expansions);
return;
Expand Down Expand Up @@ -1711,6 +1712,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
}
O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s",
spec.id.c_str(), str.str().c_str());
// Copy the plugin over from the DataProcessingInfo
execution.plugin = pi->plugin;
}
}

Expand Down Expand Up @@ -1755,6 +1758,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("network-interface", bpo::value<std::string>(), "network interface to which to bind tpc fmq ports without specified address") //
("early-forward-policy", bpo::value<EarlyForwardPolicy>()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") //
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
("workflow-plugin", bpo::value<std::string>(), "workflow configuration plugin") //
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //
Expand Down
21 changes: 18 additions & 3 deletions Framework/Core/src/WorkflowSerializationHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
IN_DATAPROCESSOR_INFO,
IN_DATAPROCESSOR_INFO_NAME,
IN_DATAPROCESSOR_INFO_EXECUTABLE,
IN_DATAPROCESSOR_INFO_PLUGIN,
IN_DATAPROCESSOR_INFO_ARGS,
IN_DATAPROCESSOR_INFO_ARG,
IN_DATAPROCESSOR_INFO_CHANNELS,
Expand Down Expand Up @@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
case State::IN_DATAPROCESSOR_INFO_EXECUTABLE:
s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
break;
case State::IN_DATAPROCESSOR_INFO_PLUGIN:
s << "IN_DATAPROCESSOR_INFO_PLUGIN";
break;
case State::IN_DATAPROCESSOR_INFO_ARGS:
s << "IN_DATAPROCESSOR_INFO_ARGS";
break;
Expand Down Expand Up @@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
push(State::IN_DATAPROCESSOR_INFO_NAME);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_PLUGIN);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
push(State::IN_DATAPROCESSOR_INFO_ARGS);
} else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
Expand All @@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
} else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) {
assert(metadata.size());
metadata.back().executable = s;
} else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) {
assert(metadata.size());
metadata.back().plugin = s;
} else if (in(State::IN_INPUT_BINDING)) {
binding = s;
} else if (in(State::IN_INPUT_ORIGIN)) {
Expand Down Expand Up @@ -888,7 +897,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
if (!states.empty()) {
debug << " now in " << states.back();
}
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size() + 1}, "import", "POP: %s", debug.str().c_str());
return result;
}
bool in(State o)
Expand Down Expand Up @@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
w.StartObject();
w.Key("name");
w.String(info.name.c_str());
w.Key("executable");
w.String(info.executable.c_str());
if (!info.executable.empty()) {
w.Key("executable");
w.String(info.executable.c_str());
}
if (!info.plugin.empty()) {
w.Key("plugin");
w.String(info.plugin.c_str());
}
w.Key("cmdLineArgs");
w.StartArray();
for (auto& arg : info.cmdLineArgs) {
Expand Down
Loading