Skip to content

Commit 3fbb21c

Browse files
committed
DPL: add dummy workflow mimicking CCDB populator behaviour
1 parent 642289b commit 3fbb21c

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

Framework/TestWorkflows/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ o2_add_dpl_workflow(diamond-workflow
3333
SOURCES src/o2DiamondWorkflow.cxx
3434
COMPONENT_NAME TestWorkflows)
3535

36+
o2_add_dpl_workflow(dummy-populator-workflow
37+
SOURCES src/o2DummyPopulatorWorkflow.cxx
38+
COMPONENT_NAME TestWorkflows)
39+
3640
o2_add_dpl_workflow(analysis-histograms
3741
SOURCES src/o2TestHistograms.cxx
3842
COMPONENT_NAME TestWorkflows)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "Framework/ConfigParamSpec.h"
12+
#include "Framework/DataTakingContext.h"
13+
#include "Framework/CompletionPolicyHelpers.h"
14+
#include "Framework/DeviceSpec.h"
15+
#include "Framework/RawDeviceService.h"
16+
#include "Framework/ControlService.h"
17+
#include "Framework/Configurable.h"
18+
#include "Framework/RunningWorkflowInfo.h"
19+
#include "Framework/CallbackService.h"
20+
#include "Framework/RateLimiter.h"
21+
#include <fairmq/Device.h>
22+
23+
#include <iostream>
24+
#include <chrono>
25+
#include <thread>
26+
#include <vector>
27+
28+
using namespace o2::framework;
29+
30+
struct WorkflowOptions {
31+
Configurable<int> anInt{"anInt", 1, ""};
32+
Configurable<float> aFloat{"aFloat", 2.0f, {"a float option"}};
33+
Configurable<double> aDouble{"aDouble", 3., {"a double option"}};
34+
Configurable<std::string> aString{"aString", "foobar", {"a string option"}};
35+
Configurable<bool> aBool{"aBool", true, {"a boolean option"}};
36+
};
37+
38+
void customize(std::vector<CompletionPolicy>& policies)
39+
{
40+
auto a = CompletionPolicyHelpers::consumeWhenAll();
41+
a.order = CompletionPolicy::CompletionOrder::Timeslice;
42+
policies.clear();
43+
policies.push_back(a);
44+
}
45+
46+
#include "Framework/runDataProcessing.h"
47+
48+
// This is how you can define your processing in a declarative way
49+
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
50+
{
51+
DataProcessorSpec a{
52+
.name = "A",
53+
.algorithm = AlgorithmSpec{adaptStateless(
54+
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
55+
for (unsigned int i = 0; i < 10; i++) {
56+
outputs.snapshot(Output{"TS1", "A1", i}, i);
57+
outputs.snapshot(Output{"TS2", "A2", i}, i);
58+
}
59+
})},
60+
.options = {
61+
ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}},
62+
}};
63+
64+
a.outputs.emplace_back(ConcreteDataTypeMatcher{"TS1", "A1"}, Lifetime::Sporadic);
65+
a.outputs.emplace_back(ConcreteDataTypeMatcher{"TS2", "A2"}, Lifetime::Sporadic);
66+
67+
DataProcessorSpec d{
68+
.name = "D",
69+
.inputs = {InputSpec{"a", "TS1", Lifetime::Sporadic}, InputSpec{"b", "TS2", Lifetime::Sporadic}},
70+
.algorithm = AlgorithmSpec{adaptStateless(
71+
[](InputRecord& inputs) {
72+
auto ref = inputs.get("b");
73+
if (!ref.header) {
74+
LOG(info) << "Header is not there";
75+
return;
76+
}
77+
auto dph = o2::header::get<const DataProcessingHeader*>(ref.header);
78+
auto dh = o2::header::get<const o2::header::DataHeader*>(ref.header);
79+
LOG(info) << "Start time: " << dph->startTime;
80+
LOG(info) << "Subspec: " << dh->subSpecification;
81+
})},
82+
};
83+
84+
return workflow::concat(WorkflowSpec{a}, WorkflowSpec{d});
85+
}

0 commit comments

Comments
 (0)