Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 18 additions & 23 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define FRAMEWORK_RUN_DATA_PROCESSING_H

#include <fmt/format.h>
#include "Framework/ConfigParamSpec.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/CompletionPolicy.h"
Expand All @@ -22,17 +23,13 @@
#include "Framework/SendingPolicy.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ConfigContext.h"
#include "Framework/BoostOptionsRetriever.h"
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/ResourcePolicyHelpers.h"
#include "Framework/Logger.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "Framework/ConfigParamDiscovery.h"
#include "ResourcePolicy.h"
#include "ServiceRegistryRef.h"
#include <vector>

namespace o2::framework
Expand Down Expand Up @@ -120,7 +117,9 @@ struct UserCustomizationsHelper {
namespace o2::framework
{
class ConfigContext;
}
class ConfigParamRegistry;
class ConfigParamSpec;
} // namespace o2::framework
/// Helper used to customize a workflow pipelining options
void overridePipeline(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

Expand Down Expand Up @@ -155,10 +154,18 @@ std::vector<T> injectCustomizations()
return policies;
}

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();

int mainNoCatch(int argc, char** argv)
{
using namespace o2::framework;
using namespace boost::program_options;

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
Expand All @@ -171,24 +178,13 @@ int mainNoCatch(int argc, char** argv)
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::vector<std::unique_ptr<ParamRetriever>> retrievers;
std::unique_ptr<ParamRetriever> retriever{new BoostOptionsRetriever(true, argc, argv)};
retrievers.emplace_back(std::move(retriever));
auto workflowOptionsStore = std::make_unique<ConfigParamStore>(workflowOptions, std::move(retrievers));
workflowOptionsStore->preload();
workflowOptionsStore->activate();
ConfigParamRegistry workflowOptionsRegistry(std::move(workflowOptionsStore));
auto extraOptions = o2::framework::ConfigParamDiscovery::discover(workflowOptionsRegistry, argc, argv);
for (auto& extra : extraOptions) {
workflowOptions.push_back(extra);
}
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
std::vector<ConfigParamSpec> extraOptions;
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);

ServiceRegistry configRegistry;
ConfigContext configContext(workflowOptionsRegistry, ServiceRegistryRef{configRegistry}, argc, argv);
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideCloning(configContext, specs);
overridePipeline(configContext, specs);
overrideLabels(configContext, specs);
overrideAll(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
Expand All @@ -207,7 +203,6 @@ char* getIdString(int argc, char** argv);
int main(int argc, char** argv)
{
using namespace o2::framework;
using namespace boost::program_options;

int result = callMain(argc, argv, mainNoCatch);

Expand Down
34 changes: 34 additions & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include <memory>
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <stdexcept>
#include "Framework/BoostOptionsRetriever.h"
Expand Down Expand Up @@ -69,6 +70,7 @@
#include "HTTPParser.h"
#include "DPLWebSocket.h"
#include "ArrowSupport.h"
#include "Framework/ConfigParamDiscovery.h"

#include "ComputingResourceHelpers.h"
#include "DataProcessingStatus.h"
Expand Down Expand Up @@ -2806,6 +2808,38 @@ void enableSignposts(std::string const& signpostsToEnable)
}
}

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow)
{
overrideCloning(ctx, workflow);
overridePipeline(ctx, workflow);
overrideLabels(ctx, workflow);
}

o2::framework::ConfigContext createConfigContext(std::unique_ptr<ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv)
{
std::vector<std::unique_ptr<o2::framework::ParamRetriever>> retrievers;
std::unique_ptr<o2::framework::ParamRetriever> retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)};
retrievers.emplace_back(std::move(retriever));
auto workflowOptionsStore = std::make_unique<o2::framework::ConfigParamStore>(workflowOptions, std::move(retrievers));
workflowOptionsStore->preload();
workflowOptionsStore->activate();
workflowOptionsRegistry = std::make_unique<ConfigParamRegistry>(std::move(workflowOptionsStore));
extraOptions = o2::framework::ConfigParamDiscovery::discover(*workflowOptionsRegistry, argc, argv);
for (auto& extra : extraOptions) {
workflowOptions.push_back(extra);
}

return o2::framework::ConfigContext(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv);
}

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
{
return std::make_unique<o2::framework::ServiceRegistry>();
}

// This is a toy executor for the workflow spec
// What it needs to do is:
//
Expand Down
Loading