Skip to content

Commit c191058

Browse files
authored
Ctpdev: counters to QCDB (#13350)
* dev:scalers to qcdb * clang
1 parent bf35985 commit c191058

File tree

6 files changed

+87
-24
lines changed

6 files changed

+87
-24
lines changed

DataFormats/Detectors/CTP/src/Configuration.cxx

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,8 @@ int CTPConfiguration::processConfigurationLineRun3(std::string& line, int& level
510510
}
511511
int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& level, std::map<int, std::vector<int>>& descInputsIndex)
512512
{
513-
LOG(info) << "Processing line";
514-
LOG(info) << "line:" << line << " lev:" << level;
513+
LOG(debug) << "Processing line";
514+
LOG(debug) << "line:" << line << " lev:" << level;
515515
//
516516
std::vector<std::string> tokens = o2::utils::Str::tokenize(line, ' ');
517517
size_t ntokens = tokens.size();
@@ -557,7 +557,7 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
557557
level = UNKNOWN;
558558
}
559559
}
560-
LOG(info) << "Level:" << level;
560+
LOG(debug) << "Level:" << level;
561561
switch (level) {
562562
case VERSION: {
563563
break;
@@ -585,7 +585,7 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
585585
uint32_t index = std::stoul(tokens[2]);
586586
ctpinp.inputMask = (1ull << (index - 1));
587587
mInputs.push_back(ctpinp);
588-
LOG(info) << "Input:" << ctpinp.name << " index:" << index;
588+
LOG(debug) << "Input:" << ctpinp.name << " index:" << index;
589589
break;
590590
}
591591
case MASKS: {
@@ -596,15 +596,15 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
596596
}
597597
bcmask.setBCmask(tokens);
598598
mBCMasks.push_back(bcmask);
599-
LOG(info) << "BC mask added:" << bcmask.name;
599+
LOG(debug) << "BC mask added:" << bcmask.name;
600600
break;
601601
}
602602
case GENS: {
603603
CTPGenerator gen;
604604
gen.name = tokens[0];
605605
gen.frequency = tokens[1];
606606
mGenerators.push_back(gen);
607-
LOG(info) << "Gen added:" << line;
607+
LOG(debug) << "Gen added:" << line;
608608
break;
609609
}
610610
case DESCRIPTORS: {
@@ -630,9 +630,9 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
630630
o2::detectors::DetID det(detname.c_str());
631631
if (isDetector(det)) {
632632
ctpdet.detID = det.getID();
633-
LOG(info) << "Detector found:" << det.getID() << " " << detname;
633+
LOG(debug) << "Detector found:" << det.getID() << " " << detname;
634634
} else {
635-
LOG(info) << "Unknown detectors:" << line;
635+
LOG(error) << "Unknown detectors:" << line;
636636
}
637637
mDetectors.push_back(ctpdet);
638638
level = LTGitems;
@@ -642,18 +642,18 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
642642
if (ntokens == 1) {
643643
mDetectors.back().mode = tokens[0];
644644
}
645-
LOG(info) << "LTGitem:" << line;
645+
LOG(debug) << "LTGitem:" << line;
646646
break;
647647
}
648648
case CLUSTER: {
649649
CTPCluster cluster;
650650
try {
651651
cluster.hwMask = std::stoull(tokens[0]);
652652
} catch (...) {
653-
LOG(info) << "Cluster syntax error:" << line;
653+
LOG(error) << "Cluster syntax error:" << line;
654654
return level;
655655
}
656-
LOG(info) << "Cluster:" << line;
656+
LOG(debug) << "Cluster:" << line;
657657
cluster.name = tokens[2];
658658
o2::detectors::DetID::mask_t mask;
659659
for (uint32_t item = 3; item < ntokens; item++) {
@@ -680,10 +680,10 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
680680
try {
681681
index = std::stoull(tokens[1]);
682682
} catch (...) {
683-
LOG(info) << "Class syntax error:" << line;
683+
LOG(error) << "Class syntax error:" << line;
684684
return level;
685685
}
686-
LOG(info) << "Class:" << line;
686+
LOG(debug) << "Class:" << line;
687687
CTPClass cls;
688688
cls.classMask = 1ull << index;
689689
cls.name = tokens[0];
@@ -716,7 +716,7 @@ int CTPConfiguration::processConfigurationLineRun3v2(std::string& line, int& lev
716716
break;
717717
}
718718
default: {
719-
LOG(info) << "unknown line:" << line;
719+
LOG(warning) << "unknown line:" << line;
720720
}
721721
}
722722
return 0;

Detectors/CTP/workflowScalers/include/CTPWorkflowScalers/RunManager.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ struct CTPActiveRun {
3838
counters_t cntslast0; // last minus one read counters needed for overflow correction
3939
counters_t cntslast; // last read counters
4040
counters64_t overflows;
41+
// QC
42+
int qcwpcount = 0;
4143
};
4244
class CTPRunManager : public ctpCCDBManager
4345
{
@@ -53,6 +55,7 @@ class CTPRunManager : public ctpCCDBManager
5355
int loadScalerNames();
5456
int getNRuns();
5557
void setBKHost(std::string host) { mBKHost = host; };
58+
void setQCWritePeriod(int period) { mQCWritePeriod = period; };
5659
uint64_t checkOverflow(uint32_t lcnt0, uint32_t lcnt1, uint64_t lcntcor);
5760
void printCounters();
5861

@@ -67,8 +70,8 @@ class CTPRunManager : public ctpCCDBManager
6770
std::unique_ptr<BkpClient> mBKClient;
6871
int mEOX = 0; // redundancy check
6972
int mNew = 1; // 1 - no CCDB: used for QC
70-
71-
ClassDefNV(CTPRunManager, 6);
73+
int mQCWritePeriod = 3; // Time in 10secs between two writes to QCCD
74+
ClassDefNV(CTPRunManager, 7);
7275
};
7376
} // namespace ctp
7477
} // namespace o2

Detectors/CTP/workflowScalers/include/CTPWorkflowScalers/ctpCCDBManager.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,27 @@ class ctpCCDBManager
2525
public:
2626
ctpCCDBManager() = default;
2727
int saveRunScalersToCCDB(CTPRunScalers& scalers, long timeStart, long timeStop);
28+
int saveRunScalersToQCDB(CTPRunScalers& scalers, long timeStart, long timeStop);
2829
int saveRunConfigToCCDB(CTPConfiguration* cfg, long timeStart);
2930
static CTPConfiguration getConfigFromCCDB(long timestamp, std::string run, bool& ok);
3031
static CTPConfiguration getConfigFromCCDB(long timestamp, std::string run);
3132
CTPRunScalers getScalersFromCCDB(long timestamp, std::string, bool& ok);
3233
void setCCDBPathConfig(std::string path) { mCCDBPathCTPConfig = path; };
3334
void setCCDBPathScalers(std::string path) { mCCDBPathCTPScalers = path; };
35+
void setQCDBPathScalers(std::string path) { mQCDBPathCTPScalers = path; };
3436
static void setCCDBHost(std::string host) { mCCDBHost = host; };
37+
static void setQCDBHost(std::string host) { mQCDBHost = host; };
3538

3639
protected:
3740
/// Database constants
3841
// std::string mCCDBHost = "http://ccdb-test.cern.ch:8080";
42+
// std::string mQCDBHost = "http://ali-qcdb.cern.ch:8083";
3943
static std::string mCCDBHost;
44+
static std::string mQCDBHost;
4045
std::string mCCDBPathCTPScalers = "CTP/Calib/Scalers";
4146
std::string mCCDBPathCTPConfig = "CTP/Config/Config";
42-
43-
ClassDefNV(ctpCCDBManager, 0);
47+
std::string mQCDBPathCTPScalers = "qc/CTP/Scalers";
48+
ClassDefNV(ctpCCDBManager, 1);
4449
};
4550
} // namespace ctp
4651
} // namespace o2

Detectors/CTP/workflowScalers/src/RunManager.cxx

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ void CTPRunManager::init()
8181
} else {
8282
LOG(info) << "BK not sent";
8383
}
84+
setQCDBHost(mQCDBHost);
85+
LOG(info) << "QC host:" << mQCDBHost;
86+
LOG(info) << "QCDB writing every:" << mQCWritePeriod << " 10 secs";
8487
LOG(info) << "CCDB host:" << mCCDBHost;
85-
LOG(info) << "CTP vNew:" << mNew;
88+
LOG(info) << "CTP vNew cfg:" << mNew;
8689
LOG(info) << "CTPRunManager initialised.";
8790
}
8891
int CTPRunManager::loadRun(const std::string& cfg)
@@ -136,6 +139,7 @@ int CTPRunManager::stopRun(uint32_t irun, long timeStamp)
136139
// const long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
137140
mActiveRuns[irun]->timeStop = timeStamp * 1000.;
138141
saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
142+
saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
139143
delete mActiveRuns[irun];
140144
mActiveRuns[irun] = nullptr;
141145
return 0;
@@ -160,7 +164,7 @@ int CTPRunManager::addScalers(uint32_t irun, std::time_t time, bool start)
160164
std::string c1a = "cla1a" + std::to_string(cls + 1);
161165
CTPScalerRaw scalraw;
162166
scalraw.classIndex = (uint32_t)cls;
163-
std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
167+
// std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
164168
scalraw.lmBefore = mCounters[mScalerName2Position[cmb]];
165169
scalraw.lmAfter = mCounters[mScalerName2Position[cma]];
166170
scalraw.l0Before = mCounters[mScalerName2Position[c0b]];
@@ -276,6 +280,12 @@ int CTPRunManager::processMessage(std::string& topic, const std::string& message
276280
// active , do scalers
277281
LOG(info) << "Run continue:" << mCounters[i];
278282
addScalers(i, tt);
283+
// LOG(info) << " QC period:" << mActiveRunNumbers[i] << " " << mActiveRuns[i]->qcwpcount << " " << mQCWritePeriod;
284+
if (mActiveRuns[i]->qcwpcount > mQCWritePeriod) {
285+
saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
286+
mActiveRuns[i]->qcwpcount = 0;
287+
}
288+
mActiveRuns[i]->qcwpcount++;
279289
} else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == 0)) {
280290
LOG(info) << "Run started:" << mCounters[i];
281291
auto run = mRunsLoaded.find(mCounters[i]);
@@ -284,6 +294,7 @@ int CTPRunManager::processMessage(std::string& topic, const std::string& message
284294
mActiveRuns[i] = run->second;
285295
mRunsLoaded.erase(run);
286296
addScalers(i, tt, 1);
297+
saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
287298
} else {
288299
LOG(error) << "Trying to start run which is not loaded:" << mCounters[i];
289300
}

Detectors/CTP/workflowScalers/src/ctp-proxy.cxx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@
4646
#include "BookkeepingApi/BkpClient.h"
4747
using namespace o2::framework;
4848
using DetID = o2::detectors::DetID;
49-
InjectorFunction dcs2dpl(std::string& ccdbhost, std::string& bkhost)
49+
InjectorFunction dcs2dpl(std::string& ccdbhost, std::string& bkhost, std::string& qchost, int qcwriteperiod)
5050
{
5151
auto runMgr = std::make_shared<o2::ctp::CTPRunManager>();
5252
runMgr->setCCDBHost(ccdbhost);
5353
runMgr->setBKHost(bkhost);
54+
runMgr->setQCDBHost(qchost);
55+
runMgr->setQCWritePeriod(qcwriteperiod);
5456
runMgr->init();
5557
// runMgr->setClient(client);
5658
return [runMgr](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) -> bool {
@@ -75,6 +77,8 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
7577
workflowOptions.push_back(ConfigParamSpec{"subscribe-to", VariantType::String, "type=sub,method=connect,address=tcp://188.184.30.57:5556,rateLogging=10,transport=zeromq", {"channel subscribe to"}});
7678
workflowOptions.push_back(ConfigParamSpec{"ccdb-host", VariantType::String, "http://o2-ccdb.internal:8080", {"ccdb host"}});
7779
workflowOptions.push_back(ConfigParamSpec{"bk-host", VariantType::String, "none", {"bk host"}});
80+
workflowOptions.push_back(ConfigParamSpec{"qc-host", VariantType::String, "none", {"qc host"}});
81+
workflowOptions.push_back(ConfigParamSpec{"qc-writeperiod", VariantType::Int, 30, {"Period of writing to QCDB in units of 10secs, default = 30 (5 mins)"}});
7882
}
7983

8084
#include "Framework/runDataProcessing.h"
@@ -98,6 +102,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
98102
auto chan = config.options().get<std::string>("subscribe-to");
99103
std::string ccdbhost = config.options().get<std::string>("ccdb-host");
100104
std::string bkhost = config.options().get<std::string>("bk-host");
105+
std::string qchost = config.options().get<std::string>("qc-host");
106+
int qcwriteperiod = config.options().get<int>("qc-writeperiod");
101107
if (chan.empty()) {
102108
throw std::runtime_error("input channel is not provided");
103109
}
@@ -112,7 +118,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
112118
std::move(ctpCountersOutputs),
113119
// this is just default, can be overriden by --ctp-config-proxy '--channel-config..'
114120
chan.c_str(),
115-
dcs2dpl(ccdbhost, bkhost));
121+
dcs2dpl(ccdbhost, bkhost, qchost, qcwriteperiod));
116122
ctpProxy.labels.emplace_back(DataProcessorLabel{"input-proxy"});
117123
LOG(info) << "===> Proxy done";
118124
WorkflowSpec workflow;

Detectors/CTP/workflowScalers/src/ctpCCDBManager.cxx

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <fairlogger/Logger.h>
2222
using namespace o2::ctp;
2323
std::string ctpCCDBManager::mCCDBHost = "http://o2-ccdb.internal";
24+
std::string ctpCCDBManager::mQCDBHost = "http://ali-qcdb.cern.ch:8083";
25+
// std::string ctpCCDBManager::mQCDBHost = "none";
2426
//
2527
int ctpCCDBManager::saveRunScalersToCCDB(CTPRunScalers& scalers, long timeStart, long timeStop)
2628
{
@@ -43,7 +45,39 @@ int ctpCCDBManager::saveRunScalersToCCDB(CTPRunScalers& scalers, long timeStart,
4345
api.init(mCCDBHost.c_str()); // or http://localhost:8080 for a local installation
4446
// store abitrary user object in strongly typed manner
4547
int ret = api.storeAsTFileAny(&(scalers), mCCDBPathCTPScalers, metadata, tmin, tmax);
46-
LOG(info) << "CTP scalers saved in ccdb:" << mCCDBHost << " run:" << scalers.getRunNumber() << " tmin:" << tmin << " tmax:" << tmax;
48+
if (ret == 0) {
49+
LOG(info) << "CTP scalers saved in ccdb:" << mCCDBHost << " run:" << scalers.getRunNumber() << " tmin:" << tmin << " tmax:" << tmax;
50+
} else {
51+
LOG(FATAL) << "Problem writing to database ret:" << ret;
52+
}
53+
return ret;
54+
}
55+
int ctpCCDBManager::saveRunScalersToQCDB(CTPRunScalers& scalers, long timeStart, long timeStop)
56+
{
57+
// data base
58+
if (mQCDBHost == "none") {
59+
LOG(info) << "Scalers not written to QCDB none";
60+
return 0;
61+
}
62+
// CTPActiveRun* run = mActiveRuns[i];q
63+
using namespace std::chrono_literals;
64+
std::chrono::seconds days3 = 259200s;
65+
std::chrono::seconds min10 = 600s;
66+
long time3days = std::chrono::duration_cast<std::chrono::milliseconds>(days3).count();
67+
long time10min = std::chrono::duration_cast<std::chrono::milliseconds>(min10).count();
68+
long tmin = timeStart - time10min;
69+
long tmax = timeStop + time3days;
70+
o2::ccdb::CcdbApi api;
71+
map<string, string> metadata; // can be empty
72+
metadata["runNumber"] = std::to_string(scalers.getRunNumber());
73+
api.init(mQCDBHost.c_str()); // or http://localhost:8080 for a local installation
74+
// store abitrary user object in strongly typed manner
75+
int ret = api.storeAsTFileAny(&(scalers), mQCDBPathCTPScalers, metadata, tmin, tmax);
76+
if (ret == 0) {
77+
LOG(info) << "CTP scalers saved in qcdb:" << mQCDBHost << " run:" << scalers.getRunNumber() << " tmin:" << tmin << " tmax:" << tmax;
78+
} else {
79+
LOG(FATAL) << "CTP scalers Problem writing to database qcdb ret:" << ret;
80+
}
4781
return ret;
4882
}
4983
int ctpCCDBManager::saveRunConfigToCCDB(CTPConfiguration* cfg, long timeStart)
@@ -66,7 +100,11 @@ int ctpCCDBManager::saveRunConfigToCCDB(CTPConfiguration* cfg, long timeStart)
66100
api.init(mCCDBHost.c_str()); // or http://localhost:8080 for a local installation
67101
// store abitrary user object in strongly typed manner
68102
int ret = api.storeAsTFileAny(cfg, CCDBPathCTPConfig, metadata, tmin, tmax);
69-
LOG(info) << "CTP config saved in ccdb:" << mCCDBHost << " run:" << cfg->getRunNumber() << " tmin:" << tmin << " tmax:" << tmax;
103+
if (ret == 0) {
104+
LOG(info) << "CTP config saved in ccdb:" << mCCDBHost << " run:" << cfg->getRunNumber() << " tmin:" << tmin << " tmax:" << tmax;
105+
} else {
106+
LOG(FATAL) << "CTPConfig: Problem writing to database ret:" << ret;
107+
}
70108
return ret;
71109
}
72110
CTPConfiguration ctpCCDBManager::getConfigFromCCDB(long timestamp, std::string run, bool& ok)

0 commit comments

Comments
 (0)