Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Framework/TestWorkflows/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ o2_add_dpl_workflow(simple-sink
SOURCES src/o2SimpleSink.cxx
COMPONENT_NAME TestWorkflows)

o2_add_dpl_workflow(simple-processor
SOURCES src/o2SimpleProcessor.cxx
COMPONENT_NAME TestWorkflows)

o2_add_dpl_workflow(analysis-workflow
SOURCES src/o2AnalysisWorkflow.cxx
COMPONENT_NAME TestWorkflows)
Expand Down
7 changes: 7 additions & 0 deletions Framework/TestWorkflows/scripts/mock-calibration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#/bin/sh -ex
export DPL_SIGNPOSTS="calibration"
stage/bin/o2-dpl-raw-proxy --exit-transition-timeout 20 --data-processing-timeout 10 --dataspec "tst:TST/A/0" --channel-config "readout-proxy:address=tcp://0.0.0.0:4200,method=connect,type=pair" | \
stage/bin/o2-testworkflows-simple-processor --exit-transition-timeout 20 --data-processing-timeout 10 --name reconstruction --processing-delay 5000 --eos-dataspec tst3:TST/C/0 --in-dataspec "tst2:TST/A/0" --out-dataspec "tst:TST/B/0" | \
stage/bin/o2-testworkflows-simple-processor --exit-transition-timeout 20 --data-processing-timeout 10 --name calibration --processing-delay 1000 --in-dataspec "tst2:TST/C/0?lifetime=sporadic" --out-dataspec "tst:TCL/C/0?lifetime=sporadic" | \
stage/bin/o2-testworkflows-simple-sink --exit-transition-timeout 20 --data-processing-timeout 10 --name calibration-publisher --dataspec "tst2:TCL/C/0?lifetime=sporadic" | \
stage/bin/o2-testworkflows-simple-sink --exit-transition-timeout 20 --data-processing-timeout 10 --dataspec "tst:TST/B/0"
3 changes: 3 additions & 0 deletions Framework/TestWorkflows/scripts/mock-flp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#/bin/sh -ex
stage/bin/o2-testworkflows-simple-source --dataspec tst:TST/A/0 --delay 1000 | \
stage/bin/o2-dpl-output-proxy --dataspec "tst:TST/A/0" --channel-config "downstream:address=tcp://0.0.0.0:4200,method=bind,type=pair"
99 changes: 99 additions & 0 deletions Framework/TestWorkflows/src/o2SimpleProcessor.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/ConfigParamSpec.h"
#include "Framework/RawDeviceService.h"

#include <thread>
#include <chrono>
#include <vector>
#include <fairmq/Device.h>

using namespace o2::framework;

void customize(std::vector<ConfigParamSpec>& workflowOptions)
{
workflowOptions.emplace_back(
ConfigParamSpec{"in-dataspec", VariantType::String, "", {"DataSpec for the outputs"}});
workflowOptions.emplace_back(
ConfigParamSpec{"out-dataspec", VariantType::String, "", {"DataSpec for the outputs"}});
workflowOptions.emplace_back(
ConfigParamSpec{"eos-dataspec", VariantType::String, "", {"DataSpec for the outputs during EoS"}});
workflowOptions.emplace_back(
ConfigParamSpec{"processing-delay", VariantType::Int, 0, {"How long the processing takes"}});
workflowOptions.emplace_back(
ConfigParamSpec{"eos-delay", VariantType::Int, 0, {"How long the takes to do eos"}});
workflowOptions.emplace_back(
ConfigParamSpec{"name", VariantType::String, "test-processor", {"Name of the processor"}});
}
#include "Framework/runDataProcessing.h"

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& ctx)
{
// Get the dataspec option and creates OutputSpecs from it
auto inDataspec = ctx.options().get<std::string>("in-dataspec");
auto outDataspec = ctx.options().get<std::string>("out-dataspec");
// For data created at the End-Of-Stream
auto eosDataspec = ctx.options().get<std::string>("eos-dataspec");

auto processingDelay = ctx.options().get<int>("processing-delay");
auto eosDelay = ctx.options().get<int>("eos-delay");

std::vector<InputSpec> inputs = select(inDataspec.c_str());

for (auto& input : inputs) {
LOGP(info, "{} : lifetime {}", DataSpecUtils::describe(input), (int)input.lifetime);
}

std::vector<InputSpec> matchers = select(outDataspec.c_str());
std::vector<std::string> outputRefs;
std::vector<OutputSpec> outputs;

for (auto const& matcher : matchers) {
outputRefs.emplace_back(matcher.binding);
outputs.emplace_back(DataSpecUtils::asOutputSpec(matcher));
}

std::vector<InputSpec> eosMatchers = select(eosDataspec.c_str());
std::vector<std::string> eosRefs;
std::vector<OutputSpec> eosOutputs;

for (auto const& matcher : eosMatchers) {
eosRefs.emplace_back(matcher.binding);
auto eosOut = DataSpecUtils::asOutputSpec(matcher);
eosOut.lifetime = Lifetime::Sporadic;
outputs.emplace_back(eosOut);
}

AlgorithmSpec algo = adaptStateful([outputRefs, eosRefs, processingDelay, eosDelay](CallbackService& service) {
service.set<o2::framework::CallbackService::Id::EndOfStream>([eosRefs, eosDelay](EndOfStreamContext&) {
LOG(info) << "Creating objects on end of stream reception.";
std::this_thread::sleep_for(std::chrono::seconds(eosDelay));
});

return adaptStateless(
[outputRefs, processingDelay](InputRecord& inputs, DataAllocator& outputs) {
LOG(info) << "Received " << inputs.size() << " messages. Converting.";
auto i = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(processingDelay));
for (auto& ref : outputRefs) {
LOGP(info, "Creating {}.", ref);
outputs.make<int>(ref, ++i);
}
});
});

return WorkflowSpec{
{.name = ctx.options().get<std::string>("name"),
.inputs = inputs,
.outputs = outputs,
.algorithm = algo}};
}
9 changes: 7 additions & 2 deletions Framework/TestWorkflows/src/o2SimpleSource.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
ConfigParamSpec{"name", VariantType::String, "test-source", {"Name of the source"}});
workflowOptions.emplace_back(
ConfigParamSpec{"timer", VariantType::String, "", {"What to use as timer intervals. Format is <period>:<validity since start>[, ...]"}});
workflowOptions.emplace_back(
ConfigParamSpec{"delay", VariantType::Int, 0, {"How long it takes to do the processing (in ms)"}});
}

#include "Framework/runDataProcessing.h"
Expand All @@ -39,6 +41,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& ctx)
// Get the dataspec option and creates OutputSpecs from it
auto dataspec = ctx.options().get<std::string>("dataspec");
auto timer = ctx.options().get<std::string>("timer");
auto delay = ctx.options().get<int>("delay");

std::vector<InputSpec> inputs;
std::vector<TimerSpec> timers;
if (timer.empty() == false) {
Expand Down Expand Up @@ -74,13 +78,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& ctx)
.inputs = inputs,
.outputs = outputSpecs,
.algorithm = AlgorithmSpec{adaptStateful(
[outputSpecs](ConfigParamRegistry const& options) {
[outputSpecs, delay](ConfigParamRegistry const& options) {
// the size of the messages is also a workflow option
auto dataSize = options.get<int64_t>("data-size");
return adaptStateless(
[outputSpecs, dataSize](DataAllocator& outputs, ProcessingContext& ctx) {
[outputSpecs, dataSize, delay](DataAllocator& outputs, ProcessingContext& ctx) {
for (auto const& output : outputSpecs) {
auto concrete = DataSpecUtils::asConcreteDataMatcher(output);
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
outputs.make<char>(Output{concrete.origin, concrete.description, concrete.subSpec}, dataSize);
}
});
Expand Down
2 changes: 1 addition & 1 deletion Framework/Utils/src/raw-proxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)

workflowOptions.push_back(
ConfigParamSpec{
"dataspec", VariantType::String, "A:FLP/RAWDATA;B:FLP/DISTSUBTIMEFRAME/0", {"selection string for the data to be proxied"}});
"dataspec", VariantType::String, "tst:TST/A", {"selection string for the data to be proxied"}});

workflowOptions.push_back(
ConfigParamSpec{
Expand Down