Skip to content

Commit 7bbcfad

Browse files
committed
QC-1293 Working prototype of late tasks
1 parent 77d49c5 commit 7bbcfad

27 files changed

+927
-10
lines changed

Framework/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ add_library(O2QualityControl
135135
src/RootFileStorage.cxx
136136
src/ReductorHelpers.cxx
137137
src/KafkaPoller.cxx
138-
src/FlagHelpers.cxx)
138+
src/FlagHelpers.cxx
139+
src/LateTaskRunner.cxx
140+
src/LateTaskRunnerFactory.cxx
141+
src/LateTaskInterface.cxx)
139142

140143
target_include_directories(
141144
O2QualityControl
@@ -178,6 +181,7 @@ add_root_dictionary(O2QualityControl
178181
HEADERS
179182
include/QualityControl/CheckInterface.h
180183
include/QualityControl/TaskInterface.h
184+
include/QualityControl/LateTaskInterface.h
181185
include/QualityControl/UserCodeInterface.h
182186
include/QualityControl/AggregatorInterface.h
183187
include/QualityControl/PostProcessingInterface.h

Framework/basic.json

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,40 @@
8989
}
9090
}
9191
}
92+
},
93+
"latecheck": {
94+
"active": "true",
95+
"className": "o2::quality_control_modules::skeleton::SkeletonCheck",
96+
"moduleName": "QcSkeleton",
97+
"policy": "OnAny",
98+
"detectorName": "TST",
99+
"dataSource": [{
100+
"type": "LateTask",
101+
"name": "late",
102+
"MOs": ["graph_example"]
103+
}],
104+
"extendedCheckParameters": {
105+
"physics": {
106+
"pp": {
107+
"myOwnKey1": "myOwnValue1c"
108+
}
109+
}
110+
}
111+
}
112+
},
113+
"lateTasks": {
114+
"late": {
115+
"active": "true",
116+
"className": "o2::quality_control_modules::skeleton::SkeletonLateTask",
117+
"moduleName": "QcSkeleton",
118+
"policy": "OnAny",
119+
"detectorName": "TST",
120+
"dataSources": [{
121+
"type": "Task",
122+
"name": "QcTask",
123+
"MOs": ["example"]
124+
}]
125+
92126
}
93127
}
94128
},
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
//
2+
// Created by pkonopka on 23/06/25.
3+
//
4+
5+
#ifndef ACTOR_H
6+
#define ACTOR_H
7+
8+
#endif //ACTOR_H

Framework/include/QualityControl/DataSourceSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ enum class DataSourceType {
3232
Check,
3333
Aggregator,
3434
PostProcessingTask,
35+
LateTask,
3536
ExternalTask,
3637
Invalid
3738
};

Framework/include/QualityControl/InfrastructureGenerator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class InfrastructureGenerator
230230
static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
231231
static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
232232
static void generateBookkeepingQualitySink(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
233+
static void generateLateTasks(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
233234
};
234235

235236
} // namespace core

Framework/include/QualityControl/InfrastructureSpec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "QualityControl/AggregatorSpec.h"
2424
#include "QualityControl/PostProcessingTaskSpec.h"
2525
#include "QualityControl/ExternalTaskSpec.h"
26+
#include "QualityControl/LateTaskSpec.h"
2627

2728
#include <vector>
2829

@@ -37,6 +38,7 @@ struct InfrastructureSpec {
3738
std::vector<checker::AggregatorSpec> aggregators;
3839
std::vector<postprocessing::PostProcessingTaskSpec> postProcessingTasks;
3940
std::vector<ExternalTaskSpec> externalTasks;
41+
std::vector<LateTaskSpec> lateTasks;
4042
};
4143

4244
} // namespace o2::quality_control::core

Framework/include/QualityControl/InfrastructureSpecReader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "QualityControl/DataSourceSpec.h"
2323
#include "QualityControl/CheckSpec.h"
2424
#include "QualityControl/PostProcessingTaskSpec.h"
25+
#include "QualityControl/LateTaskSpec.h"
2526
#include "QualityControl/RecoRequestSpecs.h"
2627
#include <boost/property_tree/ptree_fwd.hpp>
2728

@@ -52,6 +53,8 @@ checker::AggregatorSpec readSpecEntry<checker::AggregatorSpec>(const std::string
5253
template <>
5354
postprocessing::PostProcessingTaskSpec readSpecEntry<postprocessing::PostProcessingTaskSpec>(const std::string& entryID, const boost::property_tree::ptree& entryTree, const boost::property_tree::ptree& wholeTree);
5455
template <>
56+
LateTaskSpec readSpecEntry<LateTaskSpec>(const std::string& entryID, const boost::property_tree::ptree& entryTree, const boost::property_tree::ptree& wholeTree);
57+
template <>
5558
ExternalTaskSpec readSpecEntry<ExternalTaskSpec>(const std::string& entryID, const boost::property_tree::ptree& entryTree, const boost::property_tree::ptree& wholeTree);
5659
template <>
5760
GRPGeomRequestSpec readSpecEntry<GRPGeomRequestSpec>(const std::string& entryID, const boost::property_tree::ptree& entryTree, const boost::property_tree::ptree& wholeTree);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//
2+
// Created by pkonopka on 23/06/25.
3+
//
4+
5+
#ifndef LATETASKFACTORY_H
6+
#define LATETASKFACTORY_H
7+
8+
// STL
9+
#include <memory>
10+
// QC
11+
#include "QualityControl/LateTaskRunnerConfig.h"
12+
#include "QualityControl/LateTaskInterface.h"
13+
#include "QualityControl/RootClassFactory.h"
14+
15+
namespace o2::quality_control::core
16+
{
17+
18+
class LateTaskInterface;
19+
class ObjectsManager;
20+
21+
/// \brief Factory in charge of creating late tasks
22+
///
23+
/// The factory needs a library name and a class name provided as an object of type LateTaskConfig.
24+
/// The class loaded in the library must inherit from LateTaskInterface.
25+
class LateTaskFactory
26+
{
27+
public:
28+
LateTaskFactory() = default;
29+
virtual ~LateTaskFactory() = default;
30+
31+
/// \brief Create a new instance of a LateTaskInterface.
32+
/// The LateTaskInterface actual class is decided based on the parameters passed.
33+
static LateTaskInterface* create(const LateTaskRunnerConfig& taskConfig, std::shared_ptr<ObjectsManager> objectsManager)
34+
{
35+
auto* result = root_class_factory::create<LateTaskInterface>(taskConfig.moduleName, taskConfig.className);
36+
result->setName(taskConfig.taskName);
37+
result->setObjectsManager(objectsManager);
38+
// result->setCustomParameters(taskConfig.customParameters);
39+
// result->setCcdbUrl(taskConfig.ccdbUrl);
40+
41+
return result;
42+
}
43+
};
44+
45+
} // namespace o2::quality_control::core
46+
47+
#endif //LATETASKFACTORY_H

Framework/include/QualityControl/LateTaskInterface.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
/// \author Piotr Konopka
1515
///
1616

17-
#ifndef LATETASKINTERFACE_H
18-
#define LATETASKINTERFACE_H
17+
#ifndef QC_CORE_LATETASKINTERFACE_H
18+
#define QC_CORE_LATETASKINTERFACE_H
1919

2020
#include <memory>
2121
// O2
@@ -89,7 +89,14 @@ class LateTaskInterface : public UserCodeInterface
8989
// Definition of the methods for the template method pattern
9090
virtual void initialize(o2::framework::InitContext& ctx) = 0;
9191
virtual void startOfActivity(const Activity& activity) = 0;
92-
virtual void process(std::map<std::string, std::shared_ptr<const core::MonitorObject>>& moMap, std::map<std::string, std::shared_ptr<const core::QualityObject>>& qoMap) = 0;
92+
// todo:
93+
// we could come up with a dedicated QC data interface which supports our data sources.
94+
// similarly to InputRecord, it could provide a facade to MOs and QOs cached by us in our internal format and expose
95+
// methods to check if a requested resource is there, to get it, and to iterate over all available resources.
96+
// optionally, it could also hide DPL's InputRecord or decorate it with a method which allows to access sampled
97+
// and unsampled data in a unified way.
98+
// virtual void process(std::map<std::string, std::shared_ptr<const core::MonitorObject>>& moMap, std::map<std::string, std::shared_ptr<const core::QualityObject>>& qoMap) = 0;
99+
virtual void process(o2::framework::ProcessingContext& ctx) = 0;
93100
virtual void endOfActivity(const Activity& activity) = 0;
94101
virtual void reset() = 0;
95102

@@ -99,7 +106,6 @@ class LateTaskInterface : public UserCodeInterface
99106
// Setters and getters
100107
void setObjectsManager(std::shared_ptr<ObjectsManager> objectsManager);
101108
void setMonitoring(const std::shared_ptr<o2::monitoring::Monitoring>& mMonitoring);
102-
103109
protected:
104110
std::shared_ptr<ObjectsManager> getObjectsManager();
105111
std::shared_ptr<o2::monitoring::Monitoring> mMonitoring;
@@ -110,4 +116,4 @@ class LateTaskInterface : public UserCodeInterface
110116

111117
} // namespace o2::quality_control::core
112118

113-
#endif // LATETASKINTERFACE_H
119+
#endif // QC_CORE_LATETASKINTERFACE_H
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef QC_CORE_LATETASKRUNNER_H
13+
#define QC_CORE_LATETASKRUNNER_H
14+
15+
///
16+
/// \file LateTaskRunner.h
17+
/// \author Piotr Konopka
18+
///
19+
20+
#include <Framework/Task.h>
21+
#include <Framework/DataProcessorSpec.h>
22+
#include <Framework/CompletionPolicy.h>
23+
24+
#include "QualityControl/LateTaskRunnerConfig.h"
25+
26+
namespace o2::quality_control::core {
27+
28+
class LateTaskInterface;
29+
class ObjectsManager;
30+
31+
class LateTaskRunner : public framework::Task
32+
{
33+
/// \brief Number of bytes in data description used for hashing of Task names. See HashDataDescription.h for details
34+
static constexpr size_t taskDescriptionHashLength = 4;
35+
static_assert(taskDescriptionHashLength <= o2::header::DataDescription::size);
36+
37+
public:
38+
explicit LateTaskRunner(const LateTaskRunnerConfig& config);
39+
~LateTaskRunner() override = default;
40+
41+
/// \brief LateTaskRunner's init callback
42+
void init(framework::InitContext& iCtx) override;
43+
/// \brief LateTaskRunner's process callback
44+
void run(framework::ProcessingContext& pCtx) override;
45+
/// \brief LateTaskRunner's finaliseCCDB callback
46+
// void finaliseCCDB(framework::ConcreteDataMatcher& matcher, void* obj) override;
47+
48+
std::string getDeviceName() const { return mTaskConfig.deviceName; };
49+
const framework::Inputs& getInputsSpecs() const { return mTaskConfig.inputSpecs; };
50+
const framework::OutputSpec& getOutputSpec() const { return mTaskConfig.moSpec; };
51+
const framework::Options& getOptions() const { return mTaskConfig.options; };
52+
53+
/// \brief Data Processor Label to identify all Late Task Runners
54+
static framework::DataProcessorLabel getLabel() { return { "qc-late-task" }; }
55+
/// \brief ID string for all LateTaskRunner devices
56+
static std::string createIdString();
57+
/// \brief Unified DataOrigin for Quality Control tasks
58+
static header::DataOrigin createDataOrigin(const std::string& detectorCode);
59+
/// \brief Unified DataDescription naming scheme for all tasks
60+
static header::DataDescription createDataDescription(const std::string& taskName);
61+
62+
/// \brief Callback for CallbackService::Id::EndOfStream
63+
// void endOfStream(framework::EndOfStreamContext& eosContext) override;
64+
65+
private:
66+
/// \brief Callback for CallbackService::Id::Start (DPL) a.k.a. RUN transition (FairMQ)
67+
// void start(framework::ServiceRegistryRef services);
68+
/// \brief Callback for CallbackService::Id::Stop (DPL) a.k.a. STOP transition (FairMQ)
69+
// void stop(framework::ServiceRegistryRef services);
70+
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
71+
// void reset();
72+
73+
void startOfActivity();
74+
void endOfActivity();
75+
int publish(framework::DataAllocator& outputs);
76+
// void publishCycleStats();
77+
// void updateMonitoringStats(framework::ProcessingContext& pCtx);
78+
// void registerToBookkeeping();
79+
80+
private:
81+
LateTaskRunnerConfig mTaskConfig;
82+
// std::shared_ptr<monitoring::Monitoring> mCollector;
83+
std::shared_ptr<LateTaskInterface> mTask;
84+
std::shared_ptr<ObjectsManager> mObjectsManager;
85+
// std::shared_ptr<Timekeeper> mTimekeeper;
86+
Activity mActivity;
87+
ValidityInterval mValidity;
88+
89+
// stats
90+
// int mNumberMessagesReceivedInCycle = 0;
91+
// int mNumberObjectsPublishedInCycle = 0;
92+
// int mTotalNumberObjectsPublished = 0; // over a run
93+
// double mLastPublicationDuration = 0;
94+
// uint64_t mDataReceivedInCycle = 0;
95+
};
96+
97+
98+
}
99+
#endif //QC_CORE_LATETASKRUNNER_H

0 commit comments

Comments
 (0)