Skip to content

Commit 72a2bce

Browse files
authored
DPL: make TimingInfo a service (#7436)
1 parent edf5b1d commit 72a2bce

File tree

8 files changed

+57
-35
lines changed

8 files changed

+57
-35
lines changed

Framework/Core/include/Framework/CommonServices.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ struct CommonServices {
6666
static ServiceSpec tracingSpec();
6767
static ServiceSpec threadPool(int numWorkers);
6868
static ServiceSpec dataProcessingStats();
69+
static ServiceSpec timingInfoSpec();
6970

7071
static std::vector<ServiceSpec> defaultServices(int numWorkers = 0);
7172
static std::vector<ServiceSpec> requiredServices();

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ class DataAllocator
8686
using value_type = T;
8787
};
8888

89-
DataAllocator(TimingInfo* timingInfo,
90-
ServiceRegistry* contextes,
89+
DataAllocator(ServiceRegistry* contextes,
9190
const AllowedOutputRoutes& routes);
9291

9392
DataChunk& newChunk(const Output&, size_t);
@@ -108,7 +107,8 @@ class DataAllocator
108107
if constexpr (is_specialization<T, UninitializedVector>::value) {
109108
// plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
110109
using ValueType = typename T::value_type;
111-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
110+
auto& timingInfo = mRegistry->get<TimingInfo>();
111+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
112112
auto& context = mRegistry->get<MessageContext>();
113113

114114
// Note: initial payload size is 0 and will be set by the context before sending
@@ -120,7 +120,8 @@ class DataAllocator
120120
// this catches all std::vector objects with messageable value type before checking if is also
121121
// has a root dictionary, so non-serialized transmission is preferred
122122
using ValueType = typename T::value_type;
123-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
123+
auto& timingInfo = mRegistry->get<TimingInfo>();
124+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
124125
auto& context = mRegistry->get<MessageContext>();
125126

126127
// Note: initial payload size is 0 and will be set by the context before sending
@@ -129,7 +130,8 @@ class DataAllocator
129130
} else if constexpr (has_root_dictionary<T>::value == true && is_messageable<T>::value == false) {
130131
// Extended support for types implementing the Root ClassDef interface, both TObject
131132
// derived types and others
132-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
133+
auto& timingInfo = mRegistry->get<TimingInfo>();
134+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
133135
auto& context = mRegistry->get<MessageContext>();
134136

135137
// Note: initial payload size is 0 and will be set by the context before sending
@@ -171,7 +173,8 @@ class DataAllocator
171173
if constexpr (is_messageable<T>::value == true) {
172174
auto [nElements] = std::make_tuple(args...);
173175
auto size = nElements * sizeof(T);
174-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
176+
auto& timingInfo = mRegistry->get<TimingInfo>();
177+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
175178
auto& context = mRegistry->get<MessageContext>();
176179

177180
FairMQMessagePtr headerMessage = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodNone, size);
@@ -237,7 +240,8 @@ class DataAllocator
237240
using type = T;
238241

239242
char* payload = reinterpret_cast<char*>(ptr);
240-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
243+
auto& timingInfo = mRegistry->get<TimingInfo>();
244+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
241245
// the correct payload size is set later when sending the
242246
// RawBufferContext, see DataProcessor::doSend
243247
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodNone, 0);
@@ -400,7 +404,8 @@ class DataAllocator
400404
//get the memory resource associated with an output
401405
o2::pmr::FairMQMemoryResource* getMemoryResource(const Output& spec)
402406
{
403-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
407+
auto& timingInfo = mRegistry->get<TimingInfo>();
408+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
404409
auto& context = mRegistry->get<MessageContext>();
405410
return *context.proxy().getTransport(channel);
406411
}
@@ -421,7 +426,8 @@ class DataAllocator
421426
{
422427
// Find a matching channel, extract the message for it form the container
423428
// and put it in the queue to be sent at the end of the processing
424-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
429+
auto& timingInfo = mRegistry->get<TimingInfo>();
430+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
425431

426432
auto& context = mRegistry->get<MessageContext>();
427433
FairMQMessagePtr payloadMessage = o2::pmr::getMessage(std::forward<ContainerT>(container), *context.proxy().getTransport(channel));
@@ -460,7 +466,6 @@ class DataAllocator
460466

461467
private:
462468
AllowedOutputRoutes mAllowedOutputRoutes;
463-
TimingInfo* mTimingInfo;
464469
ServiceRegistry* mRegistry;
465470

466471
std::string const& matchDataHeader(const Output& spec, size_t timeframeId);

Framework/Core/src/CommonServices.cxx

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,18 @@ o2::framework::ServiceSpec CommonServices::monitoringSpec()
115115
.kind = ServiceKind::Serial};
116116
}
117117

118+
// Make it a service so that it can be used easily from the analysis
119+
// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
120+
// basis when we get to it.
121+
o2::framework::ServiceSpec CommonServices::timingInfoSpec()
122+
{
123+
return ServiceSpec{
124+
.name = "timing-info",
125+
.init = simpleServiceInit<TimingInfo, TimingInfo>(),
126+
.configure = noConfiguration(),
127+
.kind = ServiceKind::Serial};
128+
}
129+
118130
o2::framework::ServiceSpec CommonServices::datatakingContextSpec()
119131
{
120132
return ServiceSpec{
@@ -588,6 +600,7 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
588600
std::vector<ServiceSpec> CommonServices::defaultServices(int numThreads)
589601
{
590602
std::vector<ServiceSpec> specs{
603+
timingInfoSpec(),
591604
timesliceIndex(),
592605
driverClientSpec(),
593606
datatakingContextSpec(),

Framework/Core/src/DataAllocator.cxx

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@ using DataHeader = o2::header::DataHeader;
3535
using DataDescription = o2::header::DataDescription;
3636
using DataProcessingHeader = o2::framework::DataProcessingHeader;
3737

38-
DataAllocator::DataAllocator(TimingInfo* timingInfo,
39-
ServiceRegistry* contextRegistry,
38+
DataAllocator::DataAllocator(ServiceRegistry* contextRegistry,
4039
const AllowedOutputRoutes& routes)
4140
: mAllowedOutputRoutes{routes},
42-
mTimingInfo{timingInfo},
4341
mRegistry{contextRegistry}
4442
{
4543
}
@@ -62,7 +60,8 @@ std::string const& DataAllocator::matchDataHeader(const Output& spec, size_t tim
6260

6361
DataChunk& DataAllocator::newChunk(const Output& spec, size_t size)
6462
{
65-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
63+
auto& timingInfo = mRegistry->get<TimingInfo>();
64+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
6665
auto& context = mRegistry->get<MessageContext>();
6766

6867
FairMQMessagePtr headerMessage = headerMessageFromOutput(spec, channel, //
@@ -77,7 +76,7 @@ void DataAllocator::adoptChunk(const Output& spec, char* buffer, size_t size, fa
7776
{
7877
// Find a matching channel, create a new message for it and put it in the
7978
// queue to be sent at the end of the processing
80-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
79+
std::string const& channel = matchDataHeader(spec, mRegistry->get<TimingInfo>().timeslice);
8180

8281
FairMQMessagePtr headerMessage = headerMessageFromOutput(spec, channel, //
8382
o2::header::gSerializationMethodNone, //
@@ -94,17 +93,18 @@ FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
9493
o2::header::SerializationMethod method, //
9594
size_t payloadSize) //
9695
{
96+
auto& timingInfo = mRegistry->get<TimingInfo>();
9797
DataHeader dh;
9898
dh.dataOrigin = spec.origin;
9999
dh.dataDescription = spec.description;
100100
dh.subSpecification = spec.subSpec;
101101
dh.payloadSize = payloadSize;
102102
dh.payloadSerializationMethod = method;
103-
dh.tfCounter = mTimingInfo->tfCounter;
104-
dh.firstTForbit = mTimingInfo->firstTFOrbit;
105-
dh.runNumber = mTimingInfo->runNumber;
103+
dh.tfCounter = timingInfo.tfCounter;
104+
dh.firstTForbit = timingInfo.firstTFOrbit;
105+
dh.runNumber = timingInfo.runNumber;
106106

107-
DataProcessingHeader dph{mTimingInfo->timeslice, 1};
107+
DataProcessingHeader dph{timingInfo.timeslice, 1};
108108
auto& context = mRegistry->get<MessageContext>();
109109

110110
auto channelAlloc = o2::pmr::getTransportAllocator(context.proxy().getTransport(channel, 0));
@@ -114,7 +114,8 @@ FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
114114
void DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage, const Output& spec,
115115
o2::header::SerializationMethod serializationMethod)
116116
{
117-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
117+
auto& timingInfo = mRegistry->get<TimingInfo>();
118+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
118119
auto headerMessage = headerMessageFromOutput(spec, channel, serializationMethod, 0);
119120

120121
// FIXME: this is kind of ugly, we know that we can change the content of the
@@ -132,7 +133,8 @@ void DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage, const Ou
132133
void DataAllocator::adopt(const Output& spec, std::string* ptr)
133134
{
134135
std::unique_ptr<std::string> payload(ptr);
135-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
136+
auto& timingInfo = mRegistry->get<TimingInfo>();
137+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
136138
// the correct payload size is set later when sending the
137139
// StringContext, see DataProcessor::doSend
138140
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodNone, 0);
@@ -189,7 +191,8 @@ void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
189191

190192
void DataAllocator::adopt(const Output& spec, TableBuilder* tb)
191193
{
192-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
194+
auto& timingInfo = mRegistry->get<TimingInfo>();
195+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
193196
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodArrow, 0);
194197
auto& context = mRegistry->get<ArrowContext>();
195198

@@ -210,7 +213,8 @@ void DataAllocator::adopt(const Output& spec, TableBuilder* tb)
210213

211214
void DataAllocator::adopt(const Output& spec, TreeToTable* t2t)
212215
{
213-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
216+
auto& timingInfo = mRegistry->get<TimingInfo>();
217+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
214218

215219
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodArrow, 0);
216220
auto& context = mRegistry->get<ArrowContext>();
@@ -234,7 +238,8 @@ void DataAllocator::adopt(const Output& spec, TreeToTable* t2t)
234238

235239
void DataAllocator::adopt(const Output& spec, std::shared_ptr<arrow::Table> ptr)
236240
{
237-
std::string const& channel = matchDataHeader(spec, mTimingInfo->timeslice);
241+
auto& timingInfo = mRegistry->get<TimingInfo>();
242+
std::string const& channel = matchDataHeader(spec, timingInfo.timeslice);
238243
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodArrow, 0);
239244
auto& context = mRegistry->get<ArrowContext>();
240245

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry
102102
mError{mSpec.algorithm.onError},
103103
mConfigRegistry{nullptr},
104104
mServiceRegistry{registry},
105-
mAllocator{&mTimingInfo, &registry, mSpec.outputs},
105+
mAllocator{&registry, mSpec.outputs},
106106
mQuotaEvaluator{registry.get<ComputingQuotaEvaluator>()},
107107
mAwakeHandle{nullptr},
108108
mProcessingPolicies{policies}
@@ -535,7 +535,7 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
535535
context.registry = &mServiceRegistry;
536536
context.completed = &mCompleted;
537537
context.expirationHandlers = &mExpirationHandlers;
538-
context.timingInfo = &mTimingInfo;
538+
context.timingInfo = &mServiceRegistry.get<TimingInfo>();
539539
context.allocator = &mAllocator;
540540
context.statefulProcess = &mStatefulProcess;
541541
context.statelessProcess = &mStatelessProcess;

Framework/Core/test/test_DataAllocator.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@ using namespace o2::framework;
4343
// this function is only used to do the static checks for API return types
4444
void doTypeChecks()
4545
{
46-
TimingInfo* timingInfo = nullptr;
4746
ServiceRegistry* contextes = nullptr;
4847
std::vector<OutputRoute> routes;
49-
DataAllocator allocator(timingInfo, contextes, routes);
48+
DataAllocator allocator(contextes, routes);
5049
const Output output{"TST", "DUMMY", 0, Lifetime::Timeframe};
5150
// we require references to objects owned by allocator context
5251
static_assert(std::is_lvalue_reference<decltype(allocator.make<int>(output))>::value);

Framework/Core/test/test_TableBuilder.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ BOOST_AUTO_TEST_CASE(TestSoAIntegration)
347347
BOOST_AUTO_TEST_CASE(TestDataAllocatorReturnType)
348348
{
349349
std::vector<OutputRoute> routes;
350-
DataAllocator allocator(nullptr, nullptr, routes);
350+
DataAllocator allocator(nullptr, routes);
351351
const Output output{"TST", "DUMMY", 0, Lifetime::Timeframe};
352352
// we require reference to object owned by allocator context
353353
static_assert(std::is_lvalue_reference<decltype(allocator.make<TableBuilder>(output))>::value);

Framework/TestWorkflows/src/o2AnalysisTaskExample.cxx

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ using namespace o2::framework;
1919

2020
// This is a stateful task, where we send the state downstream.
2121
struct ATask {
22-
explicit ATask(int state)
23-
: mSomeState{state} {}
22+
Service<TimingInfo> info;
23+
24+
// explicit ATask(int state)
25+
// : mSomeState{state} {}
2426

2527
void init(InitContext& ic)
2628
{
27-
mSomeState += 1;
2829
}
2930

3031
void run(ProcessingContext& pc)
@@ -47,12 +48,10 @@ struct ATask {
4748
hEta->SetName("Eta");
4849
hEta->Write();
4950
}
50-
51-
int mSomeState;
5251
};
5352

5453
WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
5554
{
5655
return WorkflowSpec{
57-
adaptAnalysisTask<ATask>(cfgc, TaskName{"mySimpleTrackAnalysis"}, 0)};
56+
adaptAnalysisTask<ATask>(cfgc, TaskName{"mySimpleTrackAnalysis"})};
5857
}

0 commit comments

Comments
 (0)