Skip to content

Commit b041057

Browse files
committed
Extend run-time-span-file and invert-irframe-selection TF selections to raw-tf-reader
1 parent 38e8f24 commit b041057

File tree

5 files changed

+147
-9
lines changed

5 files changed

+147
-9
lines changed

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class CTFReaderSpec : public o2::framework::Task
112112
int mCTFCounterAcc = 0;
113113
int mNFailedFiles = 0;
114114
int mFilesRead = 0;
115-
int mTFLength = 128;
115+
int mTFLength = 32;
116116
int mNWaits = 0;
117117
int mRunNumberPrev = -1;
118118
long mTotalWaitTime = 0;
@@ -234,7 +234,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
234234
{
235235
std::ifstream inputFile(flname);
236236
if (!inputFile) {
237-
LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
237+
LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
238238
}
239239
std::string line;
240240
size_t cntl = 0, cntr = 0;
@@ -286,7 +286,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
286286
logError();
287287
}
288288
}
289-
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
289+
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
290290
inputFile.close();
291291
}
292292

Detectors/Raw/TFReaderDD/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ o2_add_library(TFReaderDD
1616
O2::Headers
1717
O2::Framework
1818
O2::DetectorsRaw
19+
O2::DataFormatsParameters
1920
O2::CommonUtils
2021
O2::Algorithm
2122
FairMQ::FairMQ)

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@
3131
#include "TFReaderSpec.h"
3232
#include "TFReaderDD/SubTimeFrameFileReader.h"
3333
#include "TFReaderDD/SubTimeFrameFile.h"
34+
#include "CommonUtils/StringUtils.h"
3435
#include "CommonUtils/FileFetcher.h"
3536
#include "CommonUtils/FIFO.h"
37+
#include "CommonUtils/IRFrameSelector.h"
38+
#include "DataFormatsParameters/AggregatedRunInfo.h"
39+
#include "CCDB/BasicCCDBManager.h"
40+
#include "CommonConstants/LHCConstants.h"
41+
#include "Algorithm/RangeTokenizer.h"
3642
#include <unistd.h>
3743
#include <algorithm>
3844
#include <unordered_map>
@@ -66,6 +72,8 @@ class TFReaderSpec : public o2f::Task
6672
void endOfStream(o2f::EndOfStreamContext& ec) final;
6773

6874
private:
75+
void loadRunTimeSpans(const std::string& flname);
76+
void runTimeRangesToIRFrameSelector(int runNumber);
6977
void stopProcessing(o2f::ProcessingContext& ctx);
7078
void TFBuilder();
7179

@@ -76,9 +84,13 @@ class TFReaderSpec : public o2f::Task
7684
o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
7785
// std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
7886
std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87+
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
88+
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
89+
int mConvRunTimeRangesToOrbits = -1; // not defined yet
7990
int mTFCounter = 0;
8091
int mTFBuilderCounter = 0;
8192
int mNWaits = 0;
93+
int mTFLength = 32;
8294
long mTotalWaitTime = 0;
8395
size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
8496
bool mRunning = false;
@@ -105,6 +117,9 @@ void TFReaderSpec::init(o2f::InitContext& ic)
105117
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
106118
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
107119
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
120+
if (!mInput.fileRunTimeSpans.empty()) {
121+
loadRunTimeSpans(mInput.fileRunTimeSpans);
122+
}
108123
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
109124
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
110125
mFileFetcher->setMaxLoops(mInput.maxLoops);
@@ -142,10 +157,6 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
142157
if (verbose && mInput.verbosity > 0) {
143158
LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
144159
}
145-
if (dph->startTime != this->mTFCounter) {
146-
LOGP(fatal, "Local tf counter {} != TF timeslice {} for {}", this->mTFCounter, dph->startTime,
147-
o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{hd->dataOrigin, hd->dataDescription, hd->subSpecification}));
148-
}
149160
if (hd->splitPayloadIndex == 0) { // check the 1st one only
150161
auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
151162
if (entry.count != this->mTFCounter) {
@@ -412,8 +423,25 @@ void TFReaderSpec::TFBuilder()
412423
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
413424
bool acceptTF = true;
414425
if (tf) {
426+
if (mRunTimeRanges.size()) {
427+
const auto* dataptr = (*tf->begin()->second.get())[0].GetData();
428+
const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
429+
static int runNumberPrev = -1;
430+
if (runNumberPrev != hd0->runNumber) {
431+
runNumberPrev = hd0->runNumber;
432+
runTimeRangesToIRFrameSelector(runNumberPrev);
433+
}
434+
if (mIRFrameSelector.isSet()) {
435+
o2::InteractionRecord ir0(0, hd0->firstTForbit);
436+
o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff);
437+
auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
438+
acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
439+
LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
440+
irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
441+
}
442+
}
415443
locID++;
416-
if (!mInput.tfIDs.empty()) {
444+
if (!mInput.tfIDs.empty() && acceptTF) {
417445
acceptTF = false;
418446
if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
419447
mWaitSendingLast = false;
@@ -448,6 +476,110 @@ void TFReaderSpec::TFBuilder()
448476
}
449477
}
450478

479+
//_________________________________________________________
480+
void TFReaderSpec::loadRunTimeSpans(const std::string& flname)
481+
{
482+
std::ifstream inputFile(flname);
483+
if (!inputFile) {
484+
LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
485+
}
486+
std::string line;
487+
size_t cntl = 0, cntr = 0;
488+
while (std::getline(inputFile, line)) {
489+
cntl++;
490+
for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
491+
if (ch == ';' || ch == '\t' || ch == ',') {
492+
ch = ' ';
493+
}
494+
}
495+
o2::utils::Str::trim(line);
496+
if (line.size() < 1 || line[0] == '#') {
497+
continue;
498+
}
499+
auto tokens = o2::utils::Str::tokenize(line, ' ');
500+
auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
501+
if (tokens.size() >= 3) {
502+
int run = 0;
503+
long rmin, rmax;
504+
try {
505+
run = std::stoi(tokens[0]);
506+
rmin = std::stol(tokens[1]);
507+
rmax = std::stol(tokens[2]);
508+
} catch (...) {
509+
logError();
510+
continue;
511+
}
512+
513+
constexpr long ISTimeStamp = 1514761200000L;
514+
int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
515+
if (rmin > rmax) {
516+
LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
517+
}
518+
if (mConvRunTimeRangesToOrbits == -1) {
519+
if (convmn != convmx) {
520+
LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
521+
}
522+
mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
523+
LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
524+
} else {
525+
if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
526+
LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
527+
}
528+
}
529+
530+
mRunTimeRanges[run].emplace_back(rmin, rmax);
531+
cntr++;
532+
} else {
533+
logError();
534+
}
535+
}
536+
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
537+
inputFile.close();
538+
}
539+
540+
//_________________________________________________________
541+
void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber)
542+
{
543+
// convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
544+
mIRFrameSelector.clear();
545+
auto ent = mRunTimeRanges.find(runNumber);
546+
if (ent == mRunTimeRanges.end()) {
547+
LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
548+
return;
549+
}
550+
o2::parameters::AggregatedRunInfo rinfo;
551+
auto& ccdb = o2::ccdb::BasicCCDBManager::instance();
552+
rinfo = o2::parameters::AggregatedRunInfo::buildAggregatedRunInfo(ccdb, runNumber);
553+
if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) {
554+
LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber);
555+
}
556+
mTFLength = rinfo.orbitsPerTF;
557+
std::vector<o2::dataformats::IRFrame> frames;
558+
for (const auto& rng : ent->second) {
559+
long orbMin = 0, orbMax = 0;
560+
if (mConvRunTimeRangesToOrbits > 0) {
561+
orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
562+
orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
563+
} else {
564+
orbMin = rng.first;
565+
orbMax = rng.second;
566+
}
567+
if (orbMin < 0) {
568+
orbMin = 0;
569+
}
570+
if (orbMax < 0) {
571+
orbMax = 0;
572+
}
573+
if (runNumber > 523897) {
574+
orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
575+
orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
576+
}
577+
LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
578+
frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
579+
}
580+
mIRFrameSelector.setOwnList(frames, true);
581+
}
582+
451583
//_________________________________________________________
452584
o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
453585
{

Detectors/Raw/TFReaderDD/src/TFReaderSpec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ struct TFReaderInp {
3232
std::string tffileRegex{};
3333
std::string remoteRegex{};
3434
std::string metricChannel{};
35+
std::string fileRunTimeSpans{};
3536
o2::detectors::DetID::mask_t detMask{};
3637
o2::detectors::DetID::mask_t detMaskRawOnly{};
3738
o2::detectors::DetID::mask_t detMaskNonRawOnly{};
@@ -46,6 +47,7 @@ struct TFReaderInp {
4647
int maxTFsPerFile = -1;
4748
bool sendDummyForMissing = true;
4849
bool sup0xccdb = false;
50+
bool invertIRFramesSelection = false;
4951
std::vector<o2::header::DataHeader> hdVec;
5052
std::vector<int> tfIDs{};
5153
};

Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
3939
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
4040
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
4141
options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
42+
options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
43+
options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
4244
options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
4345

4446
// options for error-check suppression
@@ -80,7 +82,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
8082
if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
8183
rinp.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
8284
}
83-
85+
rinp.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
86+
rinp.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
8487
WorkflowSpec specs;
8588
specs.emplace_back(o2::rawdd::getTFReaderSpec(rinp));
8689
return specs;

0 commit comments

Comments
 (0)