Skip to content

Commit

Permalink
DPL: reduce bloat in runDataProcessing.h
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jan 8, 2025
1 parent a9ccd58 commit dcad5cb
Showing 1 changed file with 27 additions and 43 deletions.
70 changes: 27 additions & 43 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef FRAMEWORK_RUN_DATA_PROCESSING_H
#define FRAMEWORK_RUN_DATA_PROCESSING_H

#include <fmt/format.h>
#include "Framework/ChannelConfigurationPolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/CompletionPolicy.h"
Expand All @@ -30,6 +31,7 @@
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "Framework/ConfigParamDiscovery.h"
#include "ResourcePolicy.h"
#include "ServiceRegistryRef.h"
#include <vector>

Expand Down Expand Up @@ -66,9 +68,7 @@ o2::framework::WorkflowSpec defineDataProcessing(o2::framework::ConfigContext co

// By default we leave the channel policies unchanged. Notice that the default still include
// a "match all" policy which uses pub / sub
// FIXME: add a debug statement saying that the default policy was used?

void defaultConfiguration(std::vector<o2::framework::ChannelConfigurationPolicy>& channelPolicies) {}
void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec>& globalWorkflowOptions)
{
o2::framework::call_if_defined<struct WorkflowOptions>([&](auto* ptr) {
Expand All @@ -80,19 +80,13 @@ void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec>& globalWor
});
}

void defaultConfiguration(std::vector<o2::framework::CompletionPolicy>& completionPolicies) {}
void defaultConfiguration(std::vector<o2::framework::DispatchPolicy>& dispatchPolicies) {}
void defaultConfiguration(std::vector<o2::framework::ResourcePolicy>& resourcePolicies) {}
void defaultConfiguration(std::vector<o2::framework::ServiceSpec>& services)
{
if (services.empty()) {
services = o2::framework::CommonServices::defaultServices();
}
}

void defaultConfiguration(std::vector<o2::framework::CallbacksPolicy>& callbacksPolicies) {}
void defaultConfiguration(std::vector<o2::framework::SendingPolicy>& callbacksPolicies) {}

/// Workflow options which are required by DPL in order to work.
std::vector<o2::framework::ConfigParamSpec> requiredWorkflowOptions();

Expand All @@ -101,19 +95,26 @@ void defaultConfiguration(o2::framework::OnWorkflowTerminationHook& hook)
hook = [](const char*) {};
}

template <typename T>
concept WithUserOverride = requires(T& something) { customize(something); };

template <typename T>
concept WithNonTrivialDefault = !WithUserOverride<T> && requires(T& something) { defaultConfiguration(something); };

struct UserCustomizationsHelper {
template <typename T>
static auto userDefinedCustomization(T& something, int preferUser) -> decltype(customize(something), void())
static auto userDefinedCustomization(WithUserOverride auto& something) -> void
{
customize(something);
}

template <typename T>
static auto userDefinedCustomization(T& something, long preferUser)
-> decltype(defaultConfiguration(something), void())
static auto userDefinedCustomization(WithNonTrivialDefault auto& something) -> void
{
defaultConfiguration(something);
}

static auto userDefinedCustomization(auto&) -> void
{
}
};

namespace o2::framework
Expand Down Expand Up @@ -144,12 +145,14 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
void doDefaultWorkflowTerminationHook();

template <typename T>
requires requires(T& policy) { { T::createDefaultPolicies() } -> std::same_as<std::vector<T>>; }
std::vector<T> injectCustomizations()
{
std::vector<T> policies;
UserCustomizationsHelper::userDefinedCustomization(policies, 0);
UserCustomizationsHelper::userDefinedCustomization(policies);
auto defaultPolicies = T::createDefaultPolicies();
policies.insert(std::end(policies), std::begin(policies), std::end(policies));
policies.insert(std::end(policies), std::begin(defaultPolicies), std::end(defaultPolicies));
return policies;
}

int mainNoCatch(int argc, char** argv)
Expand All @@ -158,34 +161,15 @@ int mainNoCatch(int argc, char** argv)
using namespace boost::program_options;

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies;
UserCustomizationsHelper::userDefinedCustomization(completionPolicies, 0);
auto defaultCompletionPolicies = CompletionPolicy::createDefaultPolicies();
completionPolicies.insert(std::end(completionPolicies), std::begin(defaultCompletionPolicies), std::end(defaultCompletionPolicies));

std::vector<DispatchPolicy> dispatchPolicies;
UserCustomizationsHelper::userDefinedCustomization(dispatchPolicies, 0);
auto defaultDispatchPolicies = DispatchPolicy::createDefaultPolicies();
dispatchPolicies.insert(std::end(dispatchPolicies), std::begin(defaultDispatchPolicies), std::end(defaultDispatchPolicies));

std::vector<ResourcePolicy> resourcePolicies;
UserCustomizationsHelper::userDefinedCustomization(resourcePolicies, 0);
auto defaultResourcePolicies = ResourcePolicy::createDefaultPolicies();
resourcePolicies.insert(std::end(resourcePolicies), std::begin(defaultResourcePolicies), std::end(defaultResourcePolicies));

std::vector<CallbacksPolicy> callbacksPolicies;
UserCustomizationsHelper::userDefinedCustomization(callbacksPolicies, 0);
auto defaultCallbacksPolicies = CallbacksPolicy::createDefaultPolicies();
callbacksPolicies.insert(std::end(callbacksPolicies), std::begin(defaultCallbacksPolicies), std::end(defaultCallbacksPolicies));

std::vector<SendingPolicy> sendingPolicies;
UserCustomizationsHelper::userDefinedCustomization(sendingPolicies, 0);
auto defaultSendingPolicies = SendingPolicy::createDefaultPolicies();
sendingPolicies.insert(std::end(sendingPolicies), std::begin(defaultSendingPolicies), std::end(defaultSendingPolicies));
std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::vector<std::unique_ptr<ParamRetriever>> retrievers;
std::unique_ptr<ParamRetriever> retriever{new BoostOptionsRetriever(true, argc, argv)};
Expand All @@ -206,10 +190,10 @@ int mainNoCatch(int argc, char** argv)
overridePipeline(configContext, specs);
overrideLabels(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices, 0);
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return doMain(argc, argv, specs,
Expand All @@ -229,7 +213,7 @@ int main(int argc, char** argv)

char* idstring = getIdString(argc, argv);
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
UserCustomizationsHelper::userDefinedCustomization(onWorkflowTerminationHook, 0);
UserCustomizationsHelper::userDefinedCustomization(onWorkflowTerminationHook);
onWorkflowTerminationHook(idstring);
doDefaultWorkflowTerminationHook();

Expand Down

0 comments on commit dcad5cb

Please sign in to comment.