Skip to content

Commit 399a65e

Browse files
committed
[MCH] improvements to the pedestal calibrator
The pedestal processing code is modified to run in multi-threaded mode, to improve the processing speed on the calibrator node The periodic logging of pedestals statistics is also improved, to better monitor and debug the pedestals data taking.
1 parent db916c2 commit 399a65e

File tree

6 files changed

+147
-44
lines changed

6 files changed

+147
-44
lines changed

Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,17 @@ class BadChannelCalibrator final : public o2::calibration::TimeSlotCalibration<o
6868
const BadChannelsVector& getBadChannelsVector() const { return mBadChannelsVector; }
6969
const PedestalsVector& getPedestalsVector() const { return mPedestalsVector; }
7070

71+
void setLoggingInterval(int loggingInterval) { mLoggingInterval = loggingInterval; }
72+
7173
private:
7274
TFType mTFStart;
7375

7476
BadChannelsVector mBadChannelsVector; ///< vector containing the unique IDs of the bad/noisy channels
7577
PedestalsVector mPedestalsVector; ///< vector containing the source pedestal information used for bad channel decision
7678

77-
ClassDefOverride(BadChannelCalibrator, 1);
79+
int mLoggingInterval = 0; ///< time interval between statistics logging messages
80+
81+
ClassDefOverride(BadChannelCalibrator, 2);
7882
};
7983

8084
} // namespace o2::mch::calibration

Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibratorParam.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ struct BadChannelCalibratorParam : public o2::conf::ConfigurableParamHelper<BadC
3030
int minRequiredNofEntriesPerChannel = 10000; ///< mininum pedestal digits per channel needed to assess a channel quality
3131
float minRequiredCalibratedFraction = 0.9f; ///< minimum fraction of channels for which we need a quality value to produce a bad channel map.
3232

33+
int nThreads = 1; ///< number of paralle threads for processing the pedestal data
34+
3335
bool onlyAtEndOfStream = {true}; ///< only produce bad channel map at end of stream (EoS). In that case the minRequiredCalibratedFraction and minRequiredNofEntriesPerChannel are irrelevant.
3436

3537
O2ParamDef(BadChannelCalibratorParam, "MCHBadChannelCalibratorParam");

Detectors/MUON/MCH/Calibration/include/MCHCalibration/PedestalData.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class PedestalData
8686

8787
uint32_t size() const;
8888

89+
void setNThreads(int nThreads) { mNThreads = nThreads; }
90+
8991
private:
9092
PedestalData::PedestalMatrix initPedestalMatrix(uint16_t solarId);
9193

@@ -94,8 +96,9 @@ class PedestalData
9496

9597
PedestalsMap mPedestals{}; ///< internal storage of all PedestalChannel values
9698
uint32_t mSize{0}; ///< total number of valid channels in the pedestals map
99+
int mNThreads; ///< number of parallel threads to process the pedestal digits
97100

98-
ClassDefNV(PedestalData, 1)
101+
ClassDefNV(PedestalData, 2)
99102
};
100103

101104
namespace impl

Detectors/MUON/MCH/Calibration/src/BadChannelCalibrationDevice.cxx

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ void BadChannelCalibrationDevice::init(o2::framework::InitContext& ic)
3737
mCalibrator = std::make_unique<o2::mch::calibration::BadChannelCalibrator>();
3838
mCalibrator->setSlotLength(o2::calibration::INFINITE_TF);
3939
mCalibrator->setUpdateAtTheEndOfRunOnly();
40+
mCalibrator->setLoggingInterval(mLoggingInterval);
4041
mTimeStamp = std::numeric_limits<uint64_t>::max();
4142
}
4243

@@ -52,18 +53,26 @@ void BadChannelCalibrationDevice::logStats(size_t dataSize)
5253
static auto loggerEnd = loggerStart;
5354
static size_t nDigits = 0;
5455
static size_t nTF = 0;
56+
static size_t nTFtot = 0;
57+
static size_t nTFtotWithData = 0;
5558

5659
if (mLoggingInterval == 0) {
5760
return;
5861
}
5962

6063
nDigits += dataSize;
6164
nTF += 1;
65+
nTFtot += 1;
66+
if (dataSize > 1000) {
67+
nTFtotWithData += 1;
68+
}
6269

6370
loggerEnd = std::chrono::high_resolution_clock::now();
6471
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
65-
if (loggerElapsed.count() > 1000) {
66-
LOG(info) << "received " << nDigits << " digits in " << nTF << " time frames";
72+
if (loggerElapsed.count() > mLoggingInterval) {
73+
LOG(warning) << "received " << nDigits << " digits in " << nTF << " time frames";
74+
LOG(warning) << "received " << nTFtotWithData << " time frames with data out of " << nTFtot << " total time frames ("
75+
<< ((nTFtot > 0) ? (nTFtotWithData * 100.0) / nTFtot : 0.0) << "%)";
6776
nDigits = 0;
6877
nTF = 0;
6978
loggerStart = std::chrono::high_resolution_clock::now();
@@ -86,7 +95,7 @@ void BadChannelCalibrationDevice::run(o2::framework::ProcessingContext& pc)
8695
std::string reason;
8796
if (mCalibrator->readyToSend(reason)) {
8897
mHasEnoughStat = true;
89-
LOGP(info, "We're ready to send output to CCDB ({})", reason);
98+
LOGP(warning, "We're ready to send output to CCDB ({})", reason);
9099
sendOutput(pc.outputs(), reason);
91100
mSkipData = true;
92101
}
@@ -139,12 +148,12 @@ void sendCalibrationOutput(o2::framework::DataAllocator& output,
139148
using clbUtils = o2::calibration::Utils;
140149
auto image = o2::ccdb::CcdbApi::createObjectImage(payload, payloadInfo);
141150

142-
LOG(info) << "Sending object " << payloadInfo->getPath()
143-
<< " of type" << payloadInfo->getObjectType()
144-
<< " /" << payloadInfo->getFileName()
145-
<< " of size " << image->size()
146-
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
147-
<< " : " << payloadInfo->getEndValidityTimestamp();
151+
LOG(warning) << "Sending object " << payloadInfo->getPath()
152+
<< " of type" << payloadInfo->getObjectType()
153+
<< " /" << payloadInfo->getFileName()
154+
<< " of size " << image->size()
155+
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
156+
<< " : " << payloadInfo->getEndValidityTimestamp();
148157

149158
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, "MCH_BADCHAN", subSpec}, *image.get());
150159
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, "MCH_BADCHAN", subSpec}, *payloadInfo);
@@ -168,7 +177,7 @@ void BadChannelCalibrationDevice::sendOutput(o2::framework::DataAllocator& outpu
168177
reason_with_entries = fmt::format("{} ; no entries", reason);
169178
}
170179

171-
LOGP(info, "sendOutput: {}", reason_with_entries);
180+
LOGP(warning, "sendOutput: {}", reason_with_entries);
172181
mCalibrator->finalize();
173182

174183
// the bad channels table is only updated if there is enough statistics

Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <iostream>
2121
#include <iterator>
2222
#include <numeric>
23+
#include <limits>
2324
#include <sstream>
2425

2526
namespace o2::mch::calibration
@@ -65,6 +66,8 @@ void BadChannelCalibrator::finalize()
6566

6667
bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const
6768
{
69+
static auto loggerStart = std::chrono::high_resolution_clock::now();
70+
static auto loggerEnd = loggerStart;
6871
const int minNofEntries = BadChannelCalibratorParam::Instance().minRequiredNofEntriesPerChannel;
6972
const o2::mch::calibration::PedestalData* pedData = slot.getContainer();
7073
auto nofChannels = pedData->size();
@@ -75,9 +78,35 @@ bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const
7578

7679
bool hasEnough = nofCalibrated > requiredChannels;
7780

78-
LOGP(info,
79-
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
80-
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
81+
// logging of calibration statistics
82+
loggerEnd = std::chrono::high_resolution_clock::now();
83+
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
84+
if (mLoggingInterval > 0 && loggerElapsed.count() > mLoggingInterval) {
85+
int minEntriesPerChannel{std::numeric_limits<int>::max()};
86+
int maxEntriesPerChannel{0};
87+
uint64_t averageEntriesPerChannel = 0;
88+
std::for_each(pedData->cbegin(), pedData->cend(),
89+
[&](const PedestalChannel& c) {
90+
if (c.mEntries == 0) {
91+
return;
92+
}
93+
if (c.mEntries > maxEntriesPerChannel) {
94+
maxEntriesPerChannel = c.mEntries;
95+
}
96+
if (c.mEntries < minEntriesPerChannel) {
97+
minEntriesPerChannel = c.mEntries;
98+
}
99+
averageEntriesPerChannel += c.mEntries;
100+
});
101+
if (nofChannels > 0) {
102+
averageEntriesPerChannel /= nofChannels;
103+
}
104+
LOGP(warning, "channel stats: min={} max={} average={}", minEntriesPerChannel, maxEntriesPerChannel, averageEntriesPerChannel);
105+
LOGP(warning,
106+
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
107+
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
108+
loggerStart = std::chrono::high_resolution_clock::now();
109+
}
81110

82111
return hasEnough;
83112
}
@@ -92,7 +121,7 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
92121
mBadChannelsVector.clear();
93122

94123
o2::mch::calibration::PedestalData* pedestalData = slot.getContainer();
95-
LOG(info) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();
124+
LOG(warning) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();
96125

97126
// keep track of first TimeFrame
98127
if (slot.getTFStart() < mTFStart) {
@@ -120,9 +149,11 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
120149
BadChannelCalibrator::Slot&
121150
BadChannelCalibrator::emplaceNewSlot(bool front, TFType tstart, TFType tend)
122151
{
152+
const int nThreads = static_cast<int>(BadChannelCalibratorParam::Instance().nThreads);
123153
auto& cont = getSlots();
124154
auto& slot = front ? cont.emplace_front(tstart, tend) : cont.emplace_back(tstart, tend);
125155
slot.setContainer(std::make_unique<PedestalData>());
156+
slot.getContainer()->setNThreads(nThreads);
126157
return slot;
127158
}
128159

Detectors/MUON/MCH/Calibration/src/PedestalData.cxx

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
#include <iostream>
2020
#include <iterator>
2121
#include <limits>
22+
#include <mutex>
23+
#include <thread>
24+
#include <queue>
2225

2326
namespace o2::mch::calibration
2427
{
@@ -69,45 +72,96 @@ PedestalData::PedestalMatrix PedestalData::initPedestalMatrix(uint16_t solarId)
6972
void PedestalData::fill(gsl::span<const PedestalDigit> digits)
7073
{
7174
bool mDebug = false;
75+
static std::mutex pedestalMutex;
76+
static std::set<uint16_t> solarIds = o2::mch::raw::getSolarUIDs<o2::mch::raw::ElectronicMapperGenerated>();
7277

73-
for (auto& d : digits) {
74-
uint16_t solarId = d.getSolarId();
75-
uint8_t dsId = d.getDsId();
76-
uint8_t channel = d.getChannel();
78+
if (digits.empty()) {
79+
return;
80+
}
7781

78-
auto iPedestal = mPedestals.find(solarId);
82+
LOGP(info, "processing {} digits with {} threads", (int)digits.size(), mNThreads);
7983

80-
if (iPedestal == mPedestals.end()) {
81-
auto iPedestalsNew = mPedestals.emplace(std::make_pair(solarId, initPedestalMatrix(solarId)));
82-
iPedestal = iPedestalsNew.first;
83-
}
84+
// fill the queue of SOLAR IDs to be processed
85+
std::queue<uint16_t> solarQueue;
86+
for (auto solarId : solarIds) {
87+
solarQueue.push(solarId);
88+
}
8489

85-
if (iPedestal == mPedestals.end()) {
86-
LOGP(fatal, "failed to insert new element in padestals map");
87-
break;
88-
}
90+
auto processSolarDigits = [&]() {
91+
while (true) {
92+
int targetSolarId = -1;
93+
auto iPedestal = mPedestals.end();
94+
// non thread-safe access to solarQueue and Pedestals structure,
95+
// protected by the pedestalMutex
96+
{
97+
std::lock_guard<std::mutex> lock(pedestalMutex);
98+
99+
// stop when there are no mor SOLAR IDs to process
100+
if (solarQueue.empty()) {
101+
break;
102+
}
103+
104+
// get the next SOLAR ID to be processed
105+
targetSolarId = solarQueue.front();
106+
solarQueue.pop();
107+
108+
// get the pedestals structure corresponding to the SOLAR ID to be processed, create one if not existing yet
109+
iPedestal = mPedestals.find(targetSolarId);
110+
if (iPedestal == mPedestals.end()) {
111+
auto iPedestalsNew = mPedestals.emplace(std::make_pair(targetSolarId, initPedestalMatrix(targetSolarId)));
112+
iPedestal = iPedestalsNew.first;
113+
}
114+
115+
if (iPedestal == mPedestals.end()) {
116+
LOGP(fatal, "failed to insert new element in padestals map");
117+
break;
118+
}
119+
}
89120

90-
auto& ped = iPedestal->second[dsId][channel];
121+
// loop over digits, selecting only those beloging to the target SOLAR
122+
for (auto& d : digits) {
123+
uint16_t solarId = d.getSolarId();
124+
if (solarId != targetSolarId) {
125+
continue;
126+
}
91127

92-
for (uint16_t i = 0; i < d.nofSamples(); i++) {
93-
auto s = d.getSample(i);
128+
uint8_t dsId = d.getDsId();
129+
uint8_t channel = d.getChannel();
94130

95-
ped.mEntries += 1;
96-
uint64_t N = ped.mEntries;
131+
auto& ped = iPedestal->second[dsId][channel];
97132

98-
double p0 = ped.mPedestal;
99-
double p = p0 + (s - p0) / N;
100-
ped.mPedestal = p;
133+
for (uint16_t i = 0; i < d.nofSamples(); i++) {
134+
auto s = d.getSample(i);
101135

102-
double M0 = ped.mVariance;
103-
double M = M0 + (s - p0) * (s - p);
104-
ped.mVariance = M;
105-
}
136+
ped.mEntries += 1;
137+
uint64_t N = ped.mEntries;
106138

107-
if (mDebug) {
108-
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
109-
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
139+
double p0 = ped.mPedestal;
140+
double p = p0 + (s - p0) / N;
141+
ped.mPedestal = p;
142+
143+
double M0 = ped.mVariance;
144+
double M = M0 + (s - p0) * (s - p);
145+
ped.mVariance = M;
146+
}
147+
148+
if (mDebug) {
149+
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
150+
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
151+
}
152+
}
110153
}
154+
};
155+
156+
// process the digits in parallel threads
157+
std::vector<std::thread> threads;
158+
for (int ti = 0; ti < mNThreads; ti++) {
159+
threads.emplace_back(processSolarDigits);
160+
}
161+
162+
// wait for all threads to finish processing
163+
for (auto& thread : threads) {
164+
thread.join();
111165
}
112166
}
113167

0 commit comments

Comments
 (0)