Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@ class BadChannelCalibrator final : public o2::calibration::TimeSlotCalibration<o
const BadChannelsVector& getBadChannelsVector() const { return mBadChannelsVector; }
const PedestalsVector& getPedestalsVector() const { return mPedestalsVector; }

void setLoggingInterval(int loggingInterval) { mLoggingInterval = loggingInterval; }

private:
TFType mTFStart;

BadChannelsVector mBadChannelsVector; ///< vector containing the unique IDs of the bad/noisy channels
PedestalsVector mPedestalsVector; ///< vector containing the source pedestal information used for bad channel decision

ClassDefOverride(BadChannelCalibrator, 1);
int mLoggingInterval = 0; ///< time interval between statistics logging messages

ClassDefOverride(BadChannelCalibrator, 2);
};

} // namespace o2::mch::calibration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ struct BadChannelCalibratorParam : public o2::conf::ConfigurableParamHelper<BadC
int minRequiredNofEntriesPerChannel = 10000; ///< mininum pedestal digits per channel needed to assess a channel quality
float minRequiredCalibratedFraction = 0.9f; ///< minimum fraction of channels for which we need a quality value to produce a bad channel map.

int nThreads = 1; ///< number of paralle threads for processing the pedestal data

bool onlyAtEndOfStream = {true}; ///< only produce bad channel map at end of stream (EoS). In that case the minRequiredCalibratedFraction and minRequiredNofEntriesPerChannel are irrelevant.

O2ParamDef(BadChannelCalibratorParam, "MCHBadChannelCalibratorParam");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class PedestalData

uint32_t size() const;

void setNThreads(int nThreads) { mNThreads = nThreads; }

private:
PedestalData::PedestalMatrix initPedestalMatrix(uint16_t solarId);

Expand All @@ -94,8 +96,9 @@ class PedestalData

PedestalsMap mPedestals{}; ///< internal storage of all PedestalChannel values
uint32_t mSize{0}; ///< total number of valid channels in the pedestals map
int mNThreads{1}; ///< number of parallel threads to process the pedestal digits

ClassDefNV(PedestalData, 1)
ClassDefNV(PedestalData, 2)
};

namespace impl
Expand Down
29 changes: 19 additions & 10 deletions Detectors/MUON/MCH/Calibration/src/BadChannelCalibrationDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void BadChannelCalibrationDevice::init(o2::framework::InitContext& ic)
mCalibrator = std::make_unique<o2::mch::calibration::BadChannelCalibrator>();
mCalibrator->setSlotLength(o2::calibration::INFINITE_TF);
mCalibrator->setUpdateAtTheEndOfRunOnly();
mCalibrator->setLoggingInterval(mLoggingInterval);
mTimeStamp = std::numeric_limits<uint64_t>::max();
}

Expand All @@ -52,18 +53,26 @@ void BadChannelCalibrationDevice::logStats(size_t dataSize)
static auto loggerEnd = loggerStart;
static size_t nDigits = 0;
static size_t nTF = 0;
static size_t nTFtot = 0;
static size_t nTFtotWithData = 0;

if (mLoggingInterval == 0) {
return;
}

nDigits += dataSize;
nTF += 1;
nTFtot += 1;
if (dataSize > 1000) {
nTFtotWithData += 1;
}

loggerEnd = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
if (loggerElapsed.count() > 1000) {
LOG(info) << "received " << nDigits << " digits in " << nTF << " time frames";
if (loggerElapsed.count() > mLoggingInterval) {
LOG(warning) << "received " << nDigits << " digits in " << nTF << " time frames";
LOG(warning) << "received " << nTFtotWithData << " time frames with data out of " << nTFtot << " total time frames ("
<< ((nTFtot > 0) ? (nTFtotWithData * 100.0) / nTFtot : 0.0) << "%)";
nDigits = 0;
nTF = 0;
loggerStart = std::chrono::high_resolution_clock::now();
Expand All @@ -86,7 +95,7 @@ void BadChannelCalibrationDevice::run(o2::framework::ProcessingContext& pc)
std::string reason;
if (mCalibrator->readyToSend(reason)) {
mHasEnoughStat = true;
LOGP(info, "We're ready to send output to CCDB ({})", reason);
LOGP(warning, "We're ready to send output to CCDB ({})", reason);
sendOutput(pc.outputs(), reason);
mSkipData = true;
}
Expand Down Expand Up @@ -139,12 +148,12 @@ void sendCalibrationOutput(o2::framework::DataAllocator& output,
using clbUtils = o2::calibration::Utils;
auto image = o2::ccdb::CcdbApi::createObjectImage(payload, payloadInfo);

LOG(info) << "Sending object " << payloadInfo->getPath()
<< " of type" << payloadInfo->getObjectType()
<< " /" << payloadInfo->getFileName()
<< " of size " << image->size()
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
<< " : " << payloadInfo->getEndValidityTimestamp();
LOG(warning) << "Sending object " << payloadInfo->getPath()
<< " of type" << payloadInfo->getObjectType()
<< " /" << payloadInfo->getFileName()
<< " of size " << image->size()
<< " bytes, valid for " << payloadInfo->getStartValidityTimestamp()
<< " : " << payloadInfo->getEndValidityTimestamp();

output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, "MCH_BADCHAN", subSpec}, *image.get());
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, "MCH_BADCHAN", subSpec}, *payloadInfo);
Expand All @@ -168,7 +177,7 @@ void BadChannelCalibrationDevice::sendOutput(o2::framework::DataAllocator& outpu
reason_with_entries = fmt::format("{} ; no entries", reason);
}

LOGP(info, "sendOutput: {}", reason_with_entries);
LOGP(warning, "sendOutput: {}", reason_with_entries);
mCalibrator->finalize();

// the bad channels table is only updated if there is enough statistics
Expand Down
39 changes: 35 additions & 4 deletions Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <iostream>
#include <iterator>
#include <numeric>
#include <limits>
#include <sstream>

namespace o2::mch::calibration
Expand Down Expand Up @@ -65,6 +66,8 @@ void BadChannelCalibrator::finalize()

bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const
{
static auto loggerStart = std::chrono::high_resolution_clock::now();
static auto loggerEnd = loggerStart;
const int minNofEntries = BadChannelCalibratorParam::Instance().minRequiredNofEntriesPerChannel;
const o2::mch::calibration::PedestalData* pedData = slot.getContainer();
auto nofChannels = pedData->size();
Expand All @@ -75,9 +78,35 @@ bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const

bool hasEnough = nofCalibrated > requiredChannels;

LOGP(info,
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
// logging of calibration statistics
loggerEnd = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
if (mLoggingInterval > 0 && loggerElapsed.count() > mLoggingInterval) {
int minEntriesPerChannel{std::numeric_limits<int>::max()};
int maxEntriesPerChannel{0};
uint64_t averageEntriesPerChannel = 0;
std::for_each(pedData->cbegin(), pedData->cend(),
[&](const PedestalChannel& c) {
if (c.mEntries == 0) {
return;
}
if (c.mEntries > maxEntriesPerChannel) {
maxEntriesPerChannel = c.mEntries;
}
if (c.mEntries < minEntriesPerChannel) {
minEntriesPerChannel = c.mEntries;
}
averageEntriesPerChannel += c.mEntries;
});
if (nofChannels > 0) {
averageEntriesPerChannel /= nofChannels;
}
LOGP(warning, "channel stats: min={} max={} average={}", minEntriesPerChannel, maxEntriesPerChannel, averageEntriesPerChannel);
LOGP(warning,
"nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}",
minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough);
loggerStart = std::chrono::high_resolution_clock::now();
}

return hasEnough;
}
Expand All @@ -92,7 +121,7 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
mBadChannelsVector.clear();

o2::mch::calibration::PedestalData* pedestalData = slot.getContainer();
LOG(info) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();
LOG(warning) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd();

// keep track of first TimeFrame
if (slot.getTFStart() < mTFStart) {
Expand Down Expand Up @@ -120,9 +149,11 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot)
BadChannelCalibrator::Slot&
BadChannelCalibrator::emplaceNewSlot(bool front, TFType tstart, TFType tend)
{
const int nThreads = static_cast<int>(BadChannelCalibratorParam::Instance().nThreads);
auto& cont = getSlots();
auto& slot = front ? cont.emplace_front(tstart, tend) : cont.emplace_back(tstart, tend);
slot.setContainer(std::make_unique<PedestalData>());
slot.getContainer()->setNThreads(nThreads);
return slot;
}

Expand Down
121 changes: 93 additions & 28 deletions Detectors/MUON/MCH/Calibration/src/PedestalData.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <iostream>
#include <iterator>
#include <limits>
#include <mutex>
#include <thread>
#include <queue>

namespace o2::mch::calibration
{
Expand Down Expand Up @@ -69,45 +72,107 @@ PedestalData::PedestalMatrix PedestalData::initPedestalMatrix(uint16_t solarId)
void PedestalData::fill(gsl::span<const PedestalDigit> digits)
{
bool mDebug = false;
static std::mutex pedestalMutex;
static std::set<uint16_t> solarIds = o2::mch::raw::getSolarUIDs<o2::mch::raw::ElectronicMapperGenerated>();

for (auto& d : digits) {
uint16_t solarId = d.getSolarId();
uint8_t dsId = d.getDsId();
uint8_t channel = d.getChannel();
if (digits.empty()) {
return;
}

auto iPedestal = mPedestals.find(solarId);
LOGP(info, "processing {} digits with {} threads", (int)digits.size(), mNThreads);

if (iPedestal == mPedestals.end()) {
auto iPedestalsNew = mPedestals.emplace(std::make_pair(solarId, initPedestalMatrix(solarId)));
iPedestal = iPedestalsNew.first;
}
// fill the queue of SOLAR IDs to be processed
std::queue<uint16_t> solarQueue;
for (auto solarId : solarIds) {
solarQueue.push(solarId);
}

if (iPedestal == mPedestals.end()) {
LOGP(fatal, "failed to insert new element in padestals map");
break;
}
auto processSolarDigits = [&]() {
while (true) {
int targetSolarId = -1;
PedestalsMap::iterator iPedestal;
bool pedestalsAreInitialized;

// non thread-safe access to solarQueue, protected by the pedestalMutex
{
std::lock_guard<std::mutex> lock(pedestalMutex);

// stop when there are no mor SOLAR IDs to process
if (solarQueue.empty()) {
break;
}

// get the next SOLAR ID to be processed
targetSolarId = solarQueue.front();
solarQueue.pop();

// update the iterator to the pedestal data for the target SOLAR
iPedestal = mPedestals.find(targetSolarId);
if (iPedestal == mPedestals.end()) {
pedestalsAreInitialized = false;
} else {
pedestalsAreInitialized = true;
}
}

auto& ped = iPedestal->second[dsId][channel];
// loop over digits, selecting only those belonging to the target SOLAR
for (auto& d : digits) {
uint16_t solarId = d.getSolarId();
if (solarId != targetSolarId) {
continue;
}

for (uint16_t i = 0; i < d.nofSamples(); i++) {
auto s = d.getSample(i);
// non thread-safe access to Pedestals structure, protected by the pedestalMutex
if (!pedestalsAreInitialized) {
std::lock_guard<std::mutex> lock(pedestalMutex);

ped.mEntries += 1;
uint64_t N = ped.mEntries;
// create the pedestals structure corresponding to the SOLAR ID to be processed
iPedestal = mPedestals.emplace(std::make_pair(targetSolarId, initPedestalMatrix(targetSolarId))).first;

double p0 = ped.mPedestal;
double p = p0 + (s - p0) / N;
ped.mPedestal = p;
if (iPedestal == mPedestals.end()) {
LOGP(fatal, "failed to insert new element in padestals map");
break;
}
pedestalsAreInitialized = true;
}

double M0 = ped.mVariance;
double M = M0 + (s - p0) * (s - p);
ped.mVariance = M;
}
uint8_t dsId = d.getDsId();
uint8_t channel = d.getChannel();

auto& ped = iPedestal->second[dsId][channel];

for (uint16_t i = 0; i < d.nofSamples(); i++) {
auto s = d.getSample(i);

if (mDebug) {
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
ped.mEntries += 1;
uint64_t N = ped.mEntries;

double p0 = ped.mPedestal;
double p = p0 + (s - p0) / N;
ped.mPedestal = p;

double M0 = ped.mVariance;
double M = M0 + (s - p0) * (s - p);
ped.mVariance = M;
}

if (mDebug) {
LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}",
(int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance);
}
}
}
};

// process the digits in parallel threads
std::vector<std::thread> threads;
for (int ti = 0; ti < mNThreads; ti++) {
threads.emplace_back(processSolarDigits);
}

// wait for all threads to finish processing
for (auto& thread : threads) {
thread.join();
}
}

Expand Down
21 changes: 21 additions & 0 deletions Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ BOOST_AUTO_TEST_CASE(TestIteratorOnCompletePedestalData)
++n;
}
BOOST_TEST(n == allDigits.size());

// multi-threaded version
PedestalData pdmt;
pdmt.setNThreads(8);
pdmt.fill(allDigits);

int nmt{0};
for (const auto& ped : pdmt) {
++nmt;
}
BOOST_TEST(nmt == allDigits.size());
}

BOOST_AUTO_TEST_CASE(TestIteratorEquality)
Expand Down Expand Up @@ -113,6 +124,16 @@ BOOST_AUTO_TEST_CASE(TestIteratorPreIncrementable)
n++;
}
BOOST_TEST(n == 2768);

// multi-threaded version
PedestalData pdmt;
pdmt.setNThreads(8);
pdmt.fill(digits);
int nmt{0};
for (auto rec : pdmt) {
nmt++;
}
BOOST_TEST(nmt == 2768);
// 2768 = 1856 pads in solar 328 + 721 pads in solar 721
// Note that solar 328 has 29 dual sampas
// solar 721 has 15 dual sampas
Expand Down