Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 27 additions & 43 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef FRAMEWORK_RUN_DATA_PROCESSING_H
#define FRAMEWORK_RUN_DATA_PROCESSING_H

#include <fmt/format.h>
#include "Framework/ChannelConfigurationPolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/CompletionPolicy.h"
Expand All @@ -30,6 +31,7 @@
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "Framework/ConfigParamDiscovery.h"
#include "ResourcePolicy.h"
#include "ServiceRegistryRef.h"
#include <vector>

Expand Down Expand Up @@ -66,9 +68,7 @@ o2::framework::WorkflowSpec defineDataProcessing(o2::framework::ConfigContext co

// By default we leave the channel policies unchanged. Notice that the default still include
// a "match all" policy which uses pub / sub
// FIXME: add a debug statement saying that the default policy was used?

void defaultConfiguration(std::vector<o2::framework::ChannelConfigurationPolicy>& channelPolicies) {}
void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec>& globalWorkflowOptions)
{
o2::framework::call_if_defined<struct WorkflowOptions>([&](auto* ptr) {
Expand All @@ -80,19 +80,13 @@ void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec>& globalWor
});
}

void defaultConfiguration(std::vector<o2::framework::CompletionPolicy>& completionPolicies) {}
void defaultConfiguration(std::vector<o2::framework::DispatchPolicy>& dispatchPolicies) {}
void defaultConfiguration(std::vector<o2::framework::ResourcePolicy>& resourcePolicies) {}
void defaultConfiguration(std::vector<o2::framework::ServiceSpec>& services)
{
if (services.empty()) {
services = o2::framework::CommonServices::defaultServices();
}
}

void defaultConfiguration(std::vector<o2::framework::CallbacksPolicy>& callbacksPolicies) {}
void defaultConfiguration(std::vector<o2::framework::SendingPolicy>& callbacksPolicies) {}

/// Workflow options which are required by DPL in order to work.
std::vector<o2::framework::ConfigParamSpec> requiredWorkflowOptions();

Expand All @@ -101,19 +95,26 @@ void defaultConfiguration(o2::framework::OnWorkflowTerminationHook& hook)
hook = [](const char*) {};
}

template <typename T>
concept WithUserOverride = requires(T& something) { customize(something); };

template <typename T>
concept WithNonTrivialDefault = !WithUserOverride<T> && requires(T& something) { defaultConfiguration(something); };

struct UserCustomizationsHelper {
template <typename T>
static auto userDefinedCustomization(T& something, int preferUser) -> decltype(customize(something), void())
static auto userDefinedCustomization(WithUserOverride auto& something) -> void
{
customize(something);
}

template <typename T>
static auto userDefinedCustomization(T& something, long preferUser)
-> decltype(defaultConfiguration(something), void())
static auto userDefinedCustomization(WithNonTrivialDefault auto& something) -> void
{
defaultConfiguration(something);
}

static auto userDefinedCustomization(auto&) -> void
{
}
};

namespace o2::framework
Expand Down Expand Up @@ -144,12 +145,14 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
void doDefaultWorkflowTerminationHook();

template <typename T>
requires requires(T& policy) { { T::createDefaultPolicies() } -> std::same_as<std::vector<T>>; }
std::vector<T> injectCustomizations()
{
std::vector<T> policies;
UserCustomizationsHelper::userDefinedCustomization(policies, 0);
UserCustomizationsHelper::userDefinedCustomization(policies);
auto defaultPolicies = T::createDefaultPolicies();
policies.insert(std::end(policies), std::begin(policies), std::end(policies));
policies.insert(std::end(policies), std::begin(defaultPolicies), std::end(defaultPolicies));
return policies;
}

int mainNoCatch(int argc, char** argv)
Expand All @@ -158,34 +161,15 @@ int mainNoCatch(int argc, char** argv)
using namespace boost::program_options;

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies;
UserCustomizationsHelper::userDefinedCustomization(completionPolicies, 0);
auto defaultCompletionPolicies = CompletionPolicy::createDefaultPolicies();
completionPolicies.insert(std::end(completionPolicies), std::begin(defaultCompletionPolicies), std::end(defaultCompletionPolicies));

std::vector<DispatchPolicy> dispatchPolicies;
UserCustomizationsHelper::userDefinedCustomization(dispatchPolicies, 0);
auto defaultDispatchPolicies = DispatchPolicy::createDefaultPolicies();
dispatchPolicies.insert(std::end(dispatchPolicies), std::begin(defaultDispatchPolicies), std::end(defaultDispatchPolicies));

std::vector<ResourcePolicy> resourcePolicies;
UserCustomizationsHelper::userDefinedCustomization(resourcePolicies, 0);
auto defaultResourcePolicies = ResourcePolicy::createDefaultPolicies();
resourcePolicies.insert(std::end(resourcePolicies), std::begin(defaultResourcePolicies), std::end(defaultResourcePolicies));

std::vector<CallbacksPolicy> callbacksPolicies;
UserCustomizationsHelper::userDefinedCustomization(callbacksPolicies, 0);
auto defaultCallbacksPolicies = CallbacksPolicy::createDefaultPolicies();
callbacksPolicies.insert(std::end(callbacksPolicies), std::begin(defaultCallbacksPolicies), std::end(defaultCallbacksPolicies));

std::vector<SendingPolicy> sendingPolicies;
UserCustomizationsHelper::userDefinedCustomization(sendingPolicies, 0);
auto defaultSendingPolicies = SendingPolicy::createDefaultPolicies();
sendingPolicies.insert(std::end(sendingPolicies), std::begin(defaultSendingPolicies), std::end(defaultSendingPolicies));
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::vector<std::unique_ptr<ParamRetriever>> retrievers;
std::unique_ptr<ParamRetriever> retriever{new BoostOptionsRetriever(true, argc, argv)};
Expand All @@ -206,10 +190,10 @@ int mainNoCatch(int argc, char** argv)
overridePipeline(configContext, specs);
overrideLabels(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices, 0);
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return doMain(argc, argv, specs,
Expand All @@ -229,7 +213,7 @@ int main(int argc, char** argv)

char* idstring = getIdString(argc, argv);
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
UserCustomizationsHelper::userDefinedCustomization(onWorkflowTerminationHook, 0);
UserCustomizationsHelper::userDefinedCustomization(onWorkflowTerminationHook);
onWorkflowTerminationHook(idstring);
doDefaultWorkflowTerminationHook();

Expand Down
Loading