Skip to content

Commit ab426cf

Browse files
committed
Optionally select CTFs in timestamps or orbits range
New option --ir-frames-files <root_file_with_IRFrames_to_select> allows to push to DPL only those TFs which overlap with the <runnumber> <range-min> <range-max> (separators can be any whitespace, comma or semicolon) records provided via text file (assuming that there are some entries for a given run, otherwise the option is ignored). Multiple ranges per run and multiple runs can be mentioned in a single input file. The range limits can be indicated either as a UNIX timestamp in ms or as an orbit number (in the fill the run belongs to). In case an option --invert-irframe-selection is provided, the selections above are inverted: TFs matching some of the provided ranges will be discarded, while the rest will be pushed to the DPL At the end of the processing the ctf-writer will create a local file ctf_read_ntf.txt containing only the number of TFs pushed to the DPL. In case no TF passed the selections above, this file will contain 0.
1 parent feea3ad commit ab426cf

File tree

6 files changed

+208
-23
lines changed

6 files changed

+208
-23
lines changed

Common/Utils/include/CommonUtils/IRFrameSelector.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class IRFrameSelector
4646
auto getIRFrames() const { return mFrames; }
4747
bool isSet() const { return mIsSet; }
4848

49+
void setOwnList(const std::vector<o2::dataformats::IRFrame>& lst, bool toBeSorted);
50+
4951
private:
5052
gsl::span<const o2::dataformats::IRFrame> mFrames{}; // externally provided span of IRFrames, must be sorted in IRFrame.getMin()
5153
o2::dataformats::IRFrame mLastIRFrameChecked{}; // last frame which was checked

Common/Utils/src/IRFrameSelector.cxx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,16 @@ size_t IRFrameSelector::loadIRFrames(const std::string& fname)
167167
return mOwnList.size();
168168
}
169169

170+
void IRFrameSelector::setOwnList(const std::vector<o2::dataformats::IRFrame>& lst, bool toBeSorted)
171+
{
172+
clear();
173+
mOwnList.insert(mOwnList.end(), lst.begin(), lst.end());
174+
if (toBeSorted) {
175+
std::sort(mOwnList.begin(), mOwnList.end(), [](const auto& a, const auto& b) { return a.getMin() < b.getMin(); });
176+
}
177+
setSelectedIRFrames(mOwnList, 0, 0, 0, false);
178+
}
179+
170180
void IRFrameSelector::print(bool lst) const
171181
{
172182
LOGP(info, "Last query stopped at entry {} for IRFrame {}:{}", mLastBoundID,
@@ -183,6 +193,8 @@ void IRFrameSelector::clear()
183193
{
184194
mIsSet = false;
185195
mOwnList.clear();
196+
mLastIRFrameChecked.getMin().clear(); // invalidate
197+
mLastBoundID = -1;
186198
mFrames = {};
187199
}
188200

Detectors/CTF/README.md

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ comma-separated list of detectors to read, Overrides skipDet
9595
```
9696
comma-separated list of detectors to skip
9797
98+
99+
By default an exception will be thrown if detector is requested but missing in the CTF. To enable injection of the empty output in such case one should use option `--allow-missing-detectors`.
100+
101+
```
102+
--ctf-data-subspec arg (=0)
103+
```
104+
allows to alter the `subSpecification` used to send the CTFDATA from the reader to decoders. Non-0 value must be used in case the data extracted by the CTF-reader should be processed and stored in new CTFs (in order to avoid clash of CTFDATA messages of the reader and writer).
105+
98106
```
99107
--max-tf arg (=-1)
100108
```
@@ -141,31 +149,34 @@ There is a possibility to read remote root files directly, w/o caching them loca
141149
2) provide proper regex to define remote files, e.g. for the example above: `--remote-regex "^root://.+/eos/aliceo2/.+"`.
142150
3) pass an option `--copy-cmd no-copy`.
143151
152+
## Selective TF reading
153+
144154
```
145155
--select-ctf-ids <id's of CTFs to select>
146156
```
147157
This is a `ctf-reader` device local option allowing selective reading of particular CTFs. It is useful when dealing with CTF files containing multiple TFs. The comma-separated list of increasing CTFs indices must be provided in the format parsed by the `RangeTokenizer<int>`, e.g. `1,4-6,...`.
148158
Note that the index corresponds not to the entry of the TF in the CTF tree but to the reader own counter incremented throught all input files (e.g. if the 10 CTF files with 20 TFs each are provided for the input and the selection of TFs
149159
`0,2,22,66` is provided, the reader will inject to the DPL the TFs at entries 0 and 2 from the 1st CTF file, entry 5 of the second file, entry 6 of the 3d and will finish the job.
150160
151-
For the ITS and MFT entropy decoding one can request either to decompose clusters to digits and send them instead of clusters (via `o2-ctf-reader-workflow` global options `--its-digits` and `--mft-digits` respectively)
152-
or to apply the noise mask to decoded clusters (or decoded digits). If the masking (e.g. via option `--its-entropy-decoder " --mask-noise "`) is requested, user should provide to the entropy decoder the noise mask file (eventually will be loaded from CCDB) and cluster patterns decoding dictionary (if the clusters were encoded with patterns IDs).
153-
For example,
154161
```
155-
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --its-entropy-decoder ' --mask-noise' | ...
162+
--ir-frames-files <root_file_with_IRFrames_to_select> --skip-skimmed-out-tf
156163
```
157-
will decode ITS and MFT data, decompose on the fly ITS clusters to digits, mask the noisy pixels with the provided masks, recluster remaining ITS digits and send the new clusters out, together with unchanged MFT clusters.
164+
This option (used for skimming) allow to push to DPL only those TFs which overlap with selected BC-ranges provided via input root file (for various formats see `o2::utils::IRFrameSelector::loadIRFrames` method).
165+
158166
```
159-
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --mft-digits --mft-entropy-decoder ' --mask-noise' | ...
167+
--ir-frames-files <root_file_with_IRFrames_to_select>
160168
```
161-
will send decompose clusters to digits and send ben out after masking the noise for the MFT, while ITS clusters will be sent as decoded.
162-
163-
By default an exception will be thrown if detector is requested but missing in the CTF. To enable injection of the empty output in such case one should use option `--allow-missing-detectors`.
169+
This option allows to push to DPL only those TFs which overlap with the `<runnumber> <range-min> <range-max>` (separators can be any whitespace, comma or semicolon) records provided via text file (assuming that there are some entries for a given run, otherwise the option is ignored).
170+
Multiple ranges per run and multiple runs can be mentioned in a single input file. The range limits can be indicated either as a UNIX timestamp in `ms` or as an orbit number (in the fill the run belongs to).
164171
172+
In case an option
165173
```
166-
--ctf-data-subspec arg (=0)
174+
--invert-irframe-selection
167175
```
168-
allows to alter the `subSpecification` used to send the CTFDATA from the reader to decoders. Non-0 value must be used in case the data extracted by the CTF-reader should be processed and stored in new CTFs (in order to avoid clash of CTFDATA messages of the reader and writer).
176+
is provided, the selections above are inverted: TFs matching some of the provided ranges will be discarded, while the rest will be pushed to the DPL
177+
178+
At the end of the processing the `ctf-writer` will create a local file `ctf_read_ntf.txt` containing only the number of TFs pushed to the DPL.
179+
In case no TF passed the selections above, this file will contain 0.
169180
170181
## Support for externally provided encoding dictionaries
171182
@@ -201,3 +212,18 @@ Additionally, one may throttle on the free SHM by providing an option to the rea
201212
202213
Note that by default the reader reads into the memory the CTF data and prepares all output messages but injects them only once the rate-limiter allows that.
203214
With the option `--limit-tf-before-reading` set also the preparation of the data to inject will be conditioned by the green light from the rate-limiter.
215+
216+
217+
## Modifying ITS/MFT CTF output
218+
219+
For the ITS and MFT entropy decoding one can request either to decompose clusters to digits and send them instead of clusters (via `o2-ctf-reader-workflow` global options `--its-digits` and `--mft-digits` respectively)
220+
or to apply the noise mask to decoded clusters (or decoded digits). If the masking (e.g. via option `--its-entropy-decoder " --mask-noise "`) is requested, user should provide to the entropy decoder the noise mask file (eventually will be loaded from CCDB) and cluster patterns decoding dictionary (if the clusters were encoded with patterns IDs).
221+
For example,
222+
```
223+
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --its-entropy-decoder ' --mask-noise' | ...
224+
```
225+
will decode ITS and MFT data, decompose on the fly ITS clusters to digits, mask the noisy pixels with the provided masks, recluster remaining ITS digits and send the new clusters out, together with unchanged MFT clusters.
226+
```
227+
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --mft-digits --mft-entropy-decoder ' --mask-noise' | ...
228+
```
229+
will send decompose clusters to digits and send ben out after masking the noise for the MFT, while ITS clusters will be sent as decoded.

Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ struct CTFReaderInp {
3131
std::string remoteRegex{};
3232
std::string metricChannel{};
3333
std::string fileIRFrames{};
34+
std::string fileRunTimeSpans{};
3435
std::vector<int> ctfIDs{};
3536
bool skipSkimmedOutTF = false;
37+
bool invertIRFramesSelection = false;
3638
bool allowMissingDetectors = false;
3739
bool checkTFLimitBeforeReading = false;
3840
bool sup0xccdb = false;

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 145 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
#include "DataFormatsZDC/CTF.h"
4646
#include "DataFormatsHMP/CTF.h"
4747
#include "DataFormatsCTP/CTF.h"
48+
#include "DataFormatsParameters/AggregatedRunInfo.h"
49+
#include "CCDB/BasicCCDBManager.h"
50+
#include "CommonConstants/LHCConstants.h"
4851
#include "Algorithm/RangeTokenizer.h"
4952
#include <TStopwatch.h>
5053
#include <fairmq/Device.h>
@@ -81,6 +84,8 @@ class CTFReaderSpec : public o2::framework::Task
8184
void run(o2::framework::ProcessingContext& pc) final;
8285

8386
private:
87+
void runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo);
88+
void loadRunTimeSpans(const std::string& flname);
8489
void openCTFFile(const std::string& flname);
8590
bool processTF(ProcessingContext& pc);
8691
void checkTreeEntries();
@@ -91,16 +96,20 @@ class CTFReaderSpec : public o2::framework::Task
9196
void tryToFixCTFHeader(CTFHeader& ctfHeader) const;
9297
CTFReaderInp mInput{};
9398
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
99+
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
94100
std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
95101
std::unique_ptr<TFile> mCTFFile;
96102
std::unique_ptr<TTree> mCTFTree;
97103
bool mRunning = false;
98104
bool mUseLocalTFCounter = false;
105+
int mConvRunTimeRangesToOrbits = -1; // not defined yet
99106
int mCTFCounter = 0;
107+
int mCTFCounterAcc = 0;
100108
int mNFailedFiles = 0;
101109
int mFilesRead = 0;
102110
int mTFLength = 128;
103111
int mNWaits = 0;
112+
int mRunNumberPrev = -1;
104113
long mTotalWaitTime = 0;
105114
long mLastSendTime = 0L;
106115
long mCurrTreeEntry = 0L;
@@ -129,8 +138,8 @@ void CTFReaderSpec::stopReader()
129138
return;
130139
}
131140
LOGP(info, "CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
132-
LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs in {} loops, spent {:.2} s in {} data waiting states",
133-
mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
141+
LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
142+
mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
134143
mRunning = false;
135144
mFileFetcher->stop();
136145
mFileFetcher.reset();
@@ -164,6 +173,111 @@ void CTFReaderSpec::init(InitContext& ic)
164173
mTFLength = hbfu.nHBFPerTF;
165174
LOGP(info, "IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.fileIRFrames, mTFLength);
166175
}
176+
if (!mInput.fileRunTimeSpans.empty()) {
177+
loadRunTimeSpans(mInput.fileRunTimeSpans);
178+
}
179+
}
180+
181+
void CTFReaderSpec::runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo)
182+
{
183+
// convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
184+
mIRFrameSelector.clear();
185+
auto ent = mRunTimeRanges.find(timingInfo.runNumber);
186+
if (ent == mRunTimeRanges.end()) {
187+
LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.runNumber);
188+
return;
189+
}
190+
o2::parameters::AggregatedRunInfo rinfo;
191+
auto& ccdb = o2::ccdb::BasicCCDBManager::instance();
192+
rinfo = o2::parameters::AggregatedRunInfo::buildAggregatedRunInfo(ccdb, timingInfo.runNumber);
193+
if (rinfo.runNumber != timingInfo.runNumber || rinfo.orbitsPerTF < 1) {
194+
LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", timingInfo.runNumber);
195+
}
196+
mTFLength = rinfo.orbitsPerTF;
197+
std::vector<o2::dataformats::IRFrame> frames;
198+
for (const auto& rng : ent->second) {
199+
long orbMin = 0, orbMax = 0;
200+
if (mConvRunTimeRangesToOrbits > 0) {
201+
orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
202+
orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
203+
} else {
204+
orbMin = rng.first;
205+
orbMax = rng.second;
206+
}
207+
if (orbMin < 0) {
208+
orbMin = 0;
209+
}
210+
if (orbMax < 0) {
211+
orbMax = 0;
212+
}
213+
if (timingInfo.runNumber > 523897) {
214+
orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
215+
orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
216+
}
217+
LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
218+
frames.emplace_back(InteractionRecord{0, uint32_t(orbMin)}, InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
219+
}
220+
mIRFrameSelector.setOwnList(frames, true);
221+
}
222+
223+
void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
224+
{
225+
std::ifstream inputFile(flname);
226+
if (!inputFile) {
227+
LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
228+
}
229+
std::string line;
230+
size_t cntl = 0, cntr = 0;
231+
while (std::getline(inputFile, line)) {
232+
cntl++;
233+
for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
234+
if (ch == ';' || ch == '\t' || ch == ',') {
235+
ch = ' ';
236+
}
237+
}
238+
o2::utils::Str::trim(line);
239+
if (line.size() < 1 || line[0] == '#') {
240+
continue;
241+
}
242+
auto tokens = o2::utils::Str::tokenize(line, ' ');
243+
auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
244+
if (tokens.size() >= 3) {
245+
int run = 0;
246+
long rmin, rmax;
247+
try {
248+
run = std::stoi(tokens[0]);
249+
rmin = std::stol(tokens[1]);
250+
rmax = std::stol(tokens[2]);
251+
} catch (...) {
252+
logError();
253+
continue;
254+
}
255+
256+
constexpr long ISTimeStamp = 1514761200000L;
257+
int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
258+
if (rmin > rmax) {
259+
LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
260+
}
261+
if (mConvRunTimeRangesToOrbits == -1) {
262+
if (convmn != convmx) {
263+
LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
264+
}
265+
mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
266+
LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
267+
} else {
268+
if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
269+
LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
270+
}
271+
}
272+
273+
mRunTimeRanges[run].emplace_back(rmin, rmax);
274+
cntr++;
275+
} else {
276+
logError();
277+
}
278+
}
279+
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
280+
inputFile.close();
167281
}
168282

169283
///_______________________________________
@@ -256,6 +370,17 @@ void CTFReaderSpec::run(ProcessingContext& pc)
256370
pc.services().get<ControlService>().endOfStream();
257371
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
258372
stopReader();
373+
const std::string dummy{"ctf_read_ntf.txt"};
374+
if (mCTFCounterAcc == 0) {
375+
LOGP(warn, "No TF passed selection, writing a 0 to file {}", dummy);
376+
}
377+
try {
378+
std::ofstream outfile;
379+
outfile.open(dummy, std::ios::out | std::ios::trunc);
380+
outfile << mCTFCounterAcc << std::endl;
381+
} catch (...) {
382+
LOGP(error, "Failed to write {}", dummy);
383+
}
259384
}
260385
}
261386

@@ -278,7 +403,7 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
278403
}
279404

280405
if (mUseLocalTFCounter) {
281-
ctfHeader.tfCounter = mCTFCounter;
406+
ctfHeader.tfCounter = mCTFCounterAcc;
282407
}
283408

284409
LOG(info) << ctfHeader;
@@ -289,19 +414,26 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
289414
timingInfo.tfCounter = ctfHeader.tfCounter;
290415
timingInfo.runNumber = ctfHeader.run;
291416

417+
if (mRunTimeRanges.size() && timingInfo.runNumber != mRunNumberPrev) {
418+
runTimeRangesToIRFrameSelector(timingInfo);
419+
}
420+
mRunNumberPrev = timingInfo.runNumber;
421+
292422
if (mIRFrameSelector.isSet()) {
293423
o2::InteractionRecord ir0(0, timingInfo.firstTForbit);
294-
// we cannot have GRPECS via DPL CCDB fetcher in the CTFReader, so we use mTFLength extracted from the HBFUtils
295424
o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, timingInfo.firstTForbit < 0xffffffff - (mTFLength - 1) ? timingInfo.firstTForbit + (mTFLength - 1) : 0xffffffff);
296425
auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
297-
if (irSpan.size() == 0 && mInput.skipSkimmedOutTF) {
298-
LOGP(info, "Skimming did not define any selection for TF [{}] : [{}]", ir0.asString(), ir1.asString());
426+
bool acc = true;
427+
if (mInput.skipSkimmedOutTF) {
428+
acc = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
429+
LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
430+
irSpan.size(), ir0.asString(), ir1.asString(), acc ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
431+
}
432+
if (!acc) {
299433
return false;
300-
} else {
301-
if (mInput.checkTFLimitBeforeReading) {
302-
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
303-
}
304-
LOGP(info, "{} IR-Frames are selected for TF [{}] : [{}]", irSpan.size(), ir0.asString(), ir1.asString());
434+
}
435+
if (mInput.checkTFLimitBeforeReading) {
436+
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
305437
}
306438
auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
307439
} else {
@@ -329,6 +461,7 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
329461
processDetector<o2::cpv::CTF>(DetID::CPV, ctfHeader, pc);
330462
processDetector<o2::zdc::CTF>(DetID::ZDC, ctfHeader, pc);
331463
processDetector<o2::ctp::CTF>(DetID::CTP, ctfHeader, pc);
464+
mCTFCounterAcc++;
332465

333466
// send sTF acknowledge message
334467
if (!mInput.sup0xccdb) {
@@ -466,7 +599,7 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
466599
outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec, Lifetime::Timeframe);
467600
}
468601
}
469-
if (!inp.fileIRFrames.empty()) {
602+
if (!inp.fileIRFrames.empty() || !inp.fileRunTimeSpans.empty()) {
470603
outputs.emplace_back(OutputLabel{"selIRFrames"}, "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
471604
}
472605
if (!inp.sup0xccdb) {

0 commit comments

Comments
 (0)