|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | +#include "Framework/ConfigParamSpec.h" |
| 12 | +#include "Framework/RawDeviceService.h" |
| 13 | + |
| 14 | +#include <thread> |
| 15 | +#include <chrono> |
| 16 | +#include <vector> |
| 17 | +#include <fairmq/Device.h> |
| 18 | + |
| 19 | +using namespace o2::framework; |
| 20 | + |
| 21 | +void customize(std::vector<ConfigParamSpec>& workflowOptions) |
| 22 | +{ |
| 23 | + workflowOptions.emplace_back( |
| 24 | + ConfigParamSpec{"in-dataspec", VariantType::String, "", {"DataSpec for the outputs"}}); |
| 25 | + workflowOptions.emplace_back( |
| 26 | + ConfigParamSpec{"out-dataspec", VariantType::String, "", {"DataSpec for the outputs"}}); |
| 27 | + workflowOptions.emplace_back( |
| 28 | + ConfigParamSpec{"eos-dataspec", VariantType::String, "", {"DataSpec for the outputs during EoS"}}); |
| 29 | + workflowOptions.emplace_back( |
| 30 | + ConfigParamSpec{"processing-delay", VariantType::Int, 0, {"How long the processing takes"}}); |
| 31 | + workflowOptions.emplace_back( |
| 32 | + ConfigParamSpec{"eos-delay", VariantType::Int, 0, {"How long the takes to do eos"}}); |
| 33 | + workflowOptions.emplace_back( |
| 34 | + ConfigParamSpec{"name", VariantType::String, "test-processor", {"Name of the processor"}}); |
| 35 | +} |
| 36 | +#include "Framework/runDataProcessing.h" |
| 37 | + |
| 38 | +// This is how you can define your processing in a declarative way |
| 39 | +WorkflowSpec defineDataProcessing(ConfigContext const& ctx) |
| 40 | +{ |
| 41 | + // Get the dataspec option and creates OutputSpecs from it |
| 42 | + auto inDataspec = ctx.options().get<std::string>("in-dataspec"); |
| 43 | + auto outDataspec = ctx.options().get<std::string>("out-dataspec"); |
| 44 | + // For data created at the End-Of-Stream |
| 45 | + auto eosDataspec = ctx.options().get<std::string>("eos-dataspec"); |
| 46 | + |
| 47 | + auto processingDelay = ctx.options().get<int>("processing-delay"); |
| 48 | + auto eosDelay = ctx.options().get<int>("eos-delay"); |
| 49 | + |
| 50 | + std::vector<InputSpec> inputs = select(inDataspec.c_str()); |
| 51 | + |
| 52 | + for (auto& input : inputs) { |
| 53 | + LOGP(info, "{} : lifetime {}", DataSpecUtils::describe(input), (int)input.lifetime); |
| 54 | + } |
| 55 | + |
| 56 | + std::vector<InputSpec> matchers = select(outDataspec.c_str()); |
| 57 | + std::vector<std::string> outputRefs; |
| 58 | + std::vector<OutputSpec> outputs; |
| 59 | + |
| 60 | + for (auto const& matcher : matchers) { |
| 61 | + outputRefs.emplace_back(matcher.binding); |
| 62 | + outputs.emplace_back(DataSpecUtils::asOutputSpec(matcher)); |
| 63 | + } |
| 64 | + |
| 65 | + std::vector<InputSpec> eosMatchers = select(eosDataspec.c_str()); |
| 66 | + std::vector<std::string> eosRefs; |
| 67 | + std::vector<OutputSpec> eosOutputs; |
| 68 | + |
| 69 | + for (auto const& matcher : eosMatchers) { |
| 70 | + eosRefs.emplace_back(matcher.binding); |
| 71 | + auto eosOut = DataSpecUtils::asOutputSpec(matcher); |
| 72 | + eosOut.lifetime = Lifetime::Sporadic; |
| 73 | + outputs.emplace_back(eosOut); |
| 74 | + } |
| 75 | + |
| 76 | + AlgorithmSpec algo = adaptStateful([outputRefs, eosRefs, processingDelay, eosDelay](CallbackService& service) { |
| 77 | + service.set<o2::framework::CallbackService::Id::EndOfStream>([eosRefs, eosDelay](EndOfStreamContext&) { |
| 78 | + LOG(info) << "Creating objects on end of stream reception."; |
| 79 | + std::this_thread::sleep_for(std::chrono::seconds(eosDelay)); |
| 80 | + }); |
| 81 | + |
| 82 | + return adaptStateless( |
| 83 | + [outputRefs, processingDelay](InputRecord& inputs, DataAllocator& outputs) { |
| 84 | + LOG(info) << "Received " << inputs.size() << " messages. Converting."; |
| 85 | + auto i = 0; |
| 86 | + std::this_thread::sleep_for(std::chrono::milliseconds(processingDelay)); |
| 87 | + for (auto& ref : outputRefs) { |
| 88 | + LOGP(info, "Creating {}.", ref); |
| 89 | + outputs.make<int>(ref, ++i); |
| 90 | + } |
| 91 | + }); |
| 92 | + }); |
| 93 | + |
| 94 | + return WorkflowSpec{ |
| 95 | + {.name = ctx.options().get<std::string>("name"), |
| 96 | + .inputs = inputs, |
| 97 | + .outputs = outputs, |
| 98 | + .algorithm = algo}}; |
| 99 | +} |
0 commit comments