Skip to content

Commit a4605d0

Browse files
aferrero2707shahor02
authored andcommitted
[MCH] improvements to the pedestal calibrator
The pedestal processing code is modified to run in multi-threaded mode, to improve the processing speed on the calibrator node The periodic logging of pedestals statistics is also improved, to better monitor and debug the pedestals data taking.
1 parent 26acde4 commit a4605d0

File tree

7 files changed

+179
-44
lines changed

7 files changed

+179
-44
lines changed

Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,17 @@ class BadChannelCalibrator final : public o2::calibration::TimeSlotCalibration<o
6868
const BadChannelsVector& getBadChannelsVector() const { return mBadChannelsVector; }
6969
const PedestalsVector& getPedestalsVector() const { return mPedestalsVector; }
7070

71+
void setLoggingInterval(int loggingInterval) { mLoggingInterval = loggingInterval; }
72+
7173
private:
7274
TFType mTFStart;
7375

7476
BadChannelsVector mBadChannelsVector; ///< vector containing the unique IDs of the bad/noisy channels
7577
PedestalsVector mPedestalsVector; ///< vector containing the source pedestal information used for bad channel decision
7678

77-
ClassDefOverride(BadChannelCalibrator, 1);
79+
int mLoggingInterval = 0; ///< time interval between statistics logging messages
80+
81+
ClassDefOverride(BadChannelCalibrator, 2);
7882
};
7983

8084
} // namespace o2::mch::calibration

Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibratorParam.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ struct BadChannelCalibratorParam : public o2::conf::ConfigurableParamHelper<BadC
3030
int minRequiredNofEntriesPerChannel = 10000; ///< mininum pedestal digits per channel needed to assess a channel quality
3131
float minRequiredCalibratedFraction = 0.9f; ///< minimum fraction of channels for which we need a quality value to produce a bad channel map.
3232

33+
int nThreads = 1; ///< number of paralle threads for processing the pedestal data
34+
3335
bool onlyAtEndOfStream = {true}; ///< only produce bad channel map at end of stream (EoS). In that case the minRequiredCalibratedFraction and minRequiredNofEntriesPerChannel are irrelevant.
3436

3537
O2ParamDef(BadChannelCalibratorParam, "MCHBadChannelCalibratorParam");

Detectors/MUON/MCH/Calibration/include/MCHCalibration/PedestalData.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class PedestalData
8686

8787
uint32_t size() const;
8888

89+
void setNThreads(int nThreads) { mNThreads = nThreads; }
90+
8991
private:
9092
PedestalData::PedestalMatrix initPedestalMatrix(uint16_t solarId);
9193

@@ -94,8 +96,9 @@ class PedestalData
9496

9597
PedestalsMap mPedestals{}; ///< internal storage of all PedestalChannel values
9698
uint32_t mSize{0}; ///< total number of valid channels in the pedestals map
99+
int mNThreads{1}; ///< number of parallel threads to process the pedestal digits
97100

98-
ClassDefNV(PedestalData, 1)
101+
ClassDefNV(PedestalData, 2)
99102
};
100103

101104
namespace impl

Detectors/MUON/MCH/Calibration/src/BadChannelCalibrationDevice.cxx

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ void BadChannelCalibrationDevice::init(o2::framework::InitContext& ic)
3737
mCalibrator = std::make_unique<o2::mch::calibration::BadChannelCalibrator>();
3838
mCalibrator->setSlotLength(o2::calibration::INFINITE_TF);
3939
mCalibrator->setUpdateAtTheEndOfRunOnly();
40+
mCalibrator->setLoggingInterval(mLoggingInterval);
4041
mTimeStamp = std::numeric_limits<uint64_t>::max();
4142
}
4243

@@ -52,18 +53,26 @@ void BadChannelCalibrationDevice::logStats(size_t dataSize)
5253
static auto loggerEnd = loggerStart;
5354
static size_t nDigits = 0;
5455
static size_t nTF = 0;
56+
static size_t nTFtot = 0;
57+
static size_t nTFtotWithData = 0;
5558

5659
if (mLoggingInterval == 0) {
5760
return;
5861
}
5962

6063
nDigits += dataSize;
6164
nTF += 1;
65+
nTFtot += 1;
66+
if (dataSize > 1000) {
67+
nTFtotWithData += 1;
68+
}
6269

6370
loggerEnd = std::chrono::high_resolution_clock::now();
6471
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
65-
if (loggerElapsed.count() > 1000) {
66-
LOG(info) << "received " << nDigits << " digits in " << nTF << " time frames";
72+
if (loggerElapsed.count() > mLoggingInterval) {
73+
LOG(warning) << "received " << nDigits << " digits in " << nTF << " time frames";
74+
LOG(warning) << "received " << nTFtotWithData << " time frames with data out of " << nTFtot << " total time frames ("
75+
<< ((nTFtot > 0) ? (nTFtotWithData * 100.0) / nTFtot : 0.0) << "%)";
6776
nDigits = 0;
6877
nTF = 0;
6978
loggerStart = std::chrono::high_resolution_clock::now();
@@ -86,7 +95,7 @@ void BadChannelCalibrationDevice::run(o2::framework::ProcessingContext& pc)
8695
std::string reason;
8796
if (mCalibrator->readyToSend(reason)) {
8897
mHasEnoughStat = true;
89-
LOGP(info, "We're ready to send output to CCDB ({})", reason);
98+
LOGP(warning, "We're ready to send output to CCDB ({})", reason);
9099
sendOutput(pc.outputs(), reason);
91100
mSkipData = true;
92101
}
@@ -139,12 +148,12 @@ void sendCalibrationOutput(o2::framework::DataAllocator& output,
139148
using clbUtils = o2::calibration::Utils;
140149
auto image = o2::ccdb::CcdbApi::createObjectImage(payload, payloadInfo);
141150

142-
LOG(info) << "Sending object " << payloadInfo->getPath()
143-
<< " of type" << payloadInfo->getObjectType()
144-
<< " /" << payloadInfo->getFileName()
145-
<< " of size " << image->size()
146-
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
147-
<< " : " << payloadInfo->getEndValidityTimestamp();
151+
LOG(warning) << "Sending object " << payloadInfo->getPath()
152+
<< " of type" << payloadInfo->getObjectType()
153+
<< " /" << payloadInfo->getFileName()
154+
<< " of size " << image->size()
155+
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
156+
<< " : " << payloadInfo->getEndValidityTimestamp();
148157

149158
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, "MCH_BADCHAN", subSpec}, *image.get());
150159
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, "MCH_BADCHAN", subSpec}, *payloadInfo);
@@ -168,7 +177,7 @@ void BadChannelCalibrationDevice::sendOutput(o2::framework::DataAllocator& outpu
168177
reason_with_entries = fmt::format("{} ; no entries", reason);
169178
}
170179

171-
LOGP(info, "sendOutput: {}", reason_with_entries);
180+
LOGP(warning, "sendOutput: {}", reason_with_entries);
172181
mCalibrator->finalize();
173182

174183
// the bad channels table is only updated if there is enough statistics

Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <iostream>
2121
#include <iterator>
2222
#include <numeric>
23+
#include <limits>
2324
#include <sstream>
2425

2526
namespace o2::mch::calibration
@@ -65,6 +66,8 @@ void BadChannelCalibrator::finalize()
6566

6667
bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const
6768
{
69+
static auto loggerStart = std::chrono::high_resolution_clock::now();
70+
static auto loggerEnd = loggerStart;
6871
const int minNofEntries = BadChannelCalibratorParam::Instance().minRequiredNofEntriesPerChannel;
6972
const o2::mch::calibration::PedestalData* pedData = slot.getContainer();
7073
auto nofChannels = pedData->size();
@@ -75,9 +78,35 @@ bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const
7578

7679
bool hasEnough = nofCalibrated > requiredChannels;
7780

78-
LOGP(info,
79-
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
80-
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
81+
// logging of calibration statistics
82+
loggerEnd = std::chrono::high_resolution_clock::now();
83+
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
84+
if (mLoggingInterval > 0 && loggerElapsed.count() > mLoggingInterval) {
85+
int minEntriesPerChannel{std::numeric_limits<int>::max()};
86+
int maxEntriesPerChannel{0};
87+
uint64_t averageEntriesPerChannel = 0;
88+
std::for_each(pedData->cbegin(), pedData->cend(),
89+
[&](const PedestalChannel& c) {
90+
if (c.mEntries == 0) {
91+
return;
92+
}
93+
if (c.mEntries > maxEntriesPerChannel) {
94+
maxEntriesPerChannel = c.mEntries;
95+
}
96+
if (c.mEntries < minEntriesPerChannel) {
97+
minEntriesPerChannel = c.mEntries;
98+
}
99+
averageEntriesPerChannel += c.mEntries;
100+
});
101+
if (nofChannels > 0) {
102+
averageEntriesPerChannel /= nofChannels;
103+
}
104+
LOGP(warning, "channel stats: min={} max={} average={}", minEntriesPerChannel, maxEntriesPerChannel, averageEntriesPerChannel);
105+
LOGP(warning,
106+
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
107+
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
108+
loggerStart = std::chrono::high_resolution_clock::now();
109+
}
81110

82111
return hasEnough;
83112
}
@@ -92,7 +121,7 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
92121
mBadChannelsVector.clear();
93122

94123
o2::mch::calibration::PedestalData* pedestalData = slot.getContainer();
95-
LOG(info) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();
124+
LOG(warning) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();
96125

97126
// keep track of first TimeFrame
98127
if (slot.getTFStart() < mTFStart) {
@@ -120,9 +149,11 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
120149
BadChannelCalibrator::Slot&
121150
BadChannelCalibrator::emplaceNewSlot(bool front, TFType tstart, TFType tend)
122151
{
152+
const int nThreads = static_cast<int>(BadChannelCalibratorParam::Instance().nThreads);
123153
auto& cont = getSlots();
124154
auto& slot = front ? cont.emplace_front(tstart, tend) : cont.emplace_back(tstart, tend);
125155
slot.setContainer(std::make_unique<PedestalData>());
156+
slot.getContainer()->setNThreads(nThreads);
126157
return slot;
127158
}
128159

Detectors/MUON/MCH/Calibration/src/PedestalData.cxx

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
#include <iostream>
2020
#include <iterator>
2121
#include <limits>
22+
#include <mutex>
23+
#include <thread>
24+
#include <queue>
2225

2326
namespace o2::mch::calibration
2427
{
@@ -69,45 +72,107 @@ PedestalData::PedestalMatrix PedestalData::initPedestalMatrix(uint16_t solarId)
6972
void PedestalData::fill(gsl::span<const PedestalDigit> digits)
7073
{
7174
bool mDebug = false;
75+
static std::mutex pedestalMutex;
76+
static std::set<uint16_t> solarIds = o2::mch::raw::getSolarUIDs<o2::mch::raw::ElectronicMapperGenerated>();
7277

73-
for (auto& d : digits) {
74-
uint16_t solarId = d.getSolarId();
75-
uint8_t dsId = d.getDsId();
76-
uint8_t channel = d.getChannel();
78+
if (digits.empty()) {
79+
return;
80+
}
7781

78-
auto iPedestal = mPedestals.find(solarId);
82+
LOGP(info, "processing {} digits with {} threads", (int)digits.size(), mNThreads);
7983

80-
if (iPedestal == mPedestals.end()) {
81-
auto iPedestalsNew = mPedestals.emplace(std::make_pair(solarId, initPedestalMatrix(solarId)));
82-
iPedestal = iPedestalsNew.first;
83-
}
84+
// fill the queue of SOLAR IDs to be processed
85+
std::queue<uint16_t> solarQueue;
86+
for (auto solarId : solarIds) {
87+
solarQueue.push(solarId);
88+
}
8489

85-
if (iPedestal == mPedestals.end()) {
86-
LOGP(fatal, "failed to insert new element in padestals map");
87-
break;
88-
}
90+
auto processSolarDigits = [&]() {
91+
while (true) {
92+
int targetSolarId = -1;
93+
PedestalsMap::iterator iPedestal;
94+
bool pedestalsAreInitialized;
95+
96+
// non thread-safe access to solarQueue, protected by the pedestalMutex
97+
{
98+
std::lock_guard<std::mutex> lock(pedestalMutex);
99+
100+
// stop when there are no mor SOLAR IDs to process
101+
if (solarQueue.empty()) {
102+
break;
103+
}
104+
105+
// get the next SOLAR ID to be processed
106+
targetSolarId = solarQueue.front();
107+
solarQueue.pop();
108+
109+
// update the iterator to the pedestal data for the target SOLAR
110+
iPedestal = mPedestals.find(targetSolarId);
111+
if (iPedestal == mPedestals.end()) {
112+
pedestalsAreInitialized = false;
113+
} else {
114+
pedestalsAreInitialized = true;
115+
}
116+
}
89117

90-
auto& ped = iPedestal->second[dsId][channel];
118+
// loop over digits, selecting only those belonging to the target SOLAR
119+
for (auto& d : digits) {
120+
uint16_t solarId = d.getSolarId();
121+
if (solarId != targetSolarId) {
122+
continue;
123+
}
91124

92-
for (uint16_t i = 0; i < d.nofSamples(); i++) {
93-
auto s = d.getSample(i);
125+
// non thread-safe access to Pedestals structure, protected by the pedestalMutex
126+
if (!pedestalsAreInitialized) {
127+
std::lock_guard<std::mutex> lock(pedestalMutex);
94128

95-
ped.mEntries += 1;
96-
uint64_t N = ped.mEntries;
129+
// create the pedestals structure corresponding to the SOLAR ID to be processed
130+
iPedestal = mPedestals.emplace(std::make_pair(targetSolarId, initPedestalMatrix(targetSolarId))).first;
97131

98-
double p0 = ped.mPedestal;
99-
double p = p0 + (s - p0) / N;
100-
ped.mPedestal = p;
132+
if (iPedestal == mPedestals.end()) {
133+
LOGP(fatal, "failed to insert new element in padestals map");
134+
break;
135+
}
136+
pedestalsAreInitialized = true;
137+
}
101138

102-
double M0 = ped.mVariance;
103-
double M = M0 + (s - p0) * (s - p);
104-
ped.mVariance = M;
105-
}
139+
uint8_t dsId = d.getDsId();
140+
uint8_t channel = d.getChannel();
141+
142+
auto& ped = iPedestal->second[dsId][channel];
143+
144+
for (uint16_t i = 0; i < d.nofSamples(); i++) {
145+
auto s = d.getSample(i);
106146

107-
if (mDebug) {
108-
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
109-
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
147+
ped.mEntries += 1;
148+
uint64_t N = ped.mEntries;
149+
150+
double p0 = ped.mPedestal;
151+
double p = p0 + (s - p0) / N;
152+
ped.mPedestal = p;
153+
154+
double M0 = ped.mVariance;
155+
double M = M0 + (s - p0) * (s - p);
156+
ped.mVariance = M;
157+
}
158+
159+
if (mDebug) {
160+
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
161+
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
162+
}
163+
}
110164
}
165+
};
166+
167+
// process the digits in parallel threads
168+
std::vector<std::thread> threads;
169+
for (int ti = 0; ti < mNThreads; ti++) {
170+
threads.emplace_back(processSolarDigits);
171+
}
172+
173+
// wait for all threads to finish processing
174+
for (auto& thread : threads) {
175+
thread.join();
111176
}
112177
}
113178

Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ BOOST_AUTO_TEST_CASE(TestIteratorOnCompletePedestalData)
8484
++n;
8585
}
8686
BOOST_TEST(n == allDigits.size());
87+
88+
// multi-threaded version
89+
PedestalData pdmt;
90+
pdmt.setNThreads(8);
91+
pdmt.fill(allDigits);
92+
93+
int nmt{0};
94+
for (const auto& ped : pdmt) {
95+
++nmt;
96+
}
97+
BOOST_TEST(nmt == allDigits.size());
8798
}
8899

89100
BOOST_AUTO_TEST_CASE(TestIteratorEquality)
@@ -113,6 +124,16 @@ BOOST_AUTO_TEST_CASE(TestIteratorPreIncrementable)
113124
n++;
114125
}
115126
BOOST_TEST(n == 2768);
127+
128+
// multi-threaded version
129+
PedestalData pdmt;
130+
pdmt.setNThreads(8);
131+
pdmt.fill(digits);
132+
int nmt{0};
133+
for (auto rec : pdmt) {
134+
nmt++;
135+
}
136+
BOOST_TEST(nmt == 2768);
116137
// 2768 = 1856 pads in solar 328 + 721 pads in solar 721
117138
// Note that solar 328 has 29 dual sampas
118139
// solar 721 has 15 dual sampas

0 commit comments

Comments
 (0)