Skip to content

Commit 60ed0d2

Browse files
committed
CTF: add shuffle TF option for debug
1 parent 13b695d commit 60ed0d2

File tree

3 files changed

+41
-5
lines changed

3 files changed

+41
-5
lines changed

Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct CTFReaderInp {
4747
unsigned int decSSpecEMC = 0;
4848
int tfRateLimit = -999;
4949
size_t minSHM = 0;
50+
bool shuffle{false};
5051
};
5152

5253
/// create a processor spec

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@
1111

1212
/// @file CTFReaderSpec.cxx
1313

14+
#include <random>
1415
#include <vector>
16+
#include <algorithm>
17+
#include <numeric>
18+
1519
#include <TFile.h>
1620
#include <TTree.h>
21+
#include <TStopwatch.h>
1722

1823
#include "Framework/Logger.h"
1924
#include "Framework/ControlService.h"
@@ -49,7 +54,6 @@
4954
#include "CCDB/BasicCCDBManager.h"
5055
#include "CommonConstants/LHCConstants.h"
5156
#include "Algorithm/RangeTokenizer.h"
52-
#include <TStopwatch.h>
5357
#include <fairmq/Device.h>
5458

5559
using namespace o2::framework;
@@ -299,6 +303,27 @@ void CTFReaderSpec::openCTFFile(const std::string& flname)
299303
if (mCTFTree->GetEntries() < 1) {
300304
throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname));
301305
}
306+
if (mInput.shuffle) {
307+
if (mInput.ctfIDs.empty()) {
308+
int entries = mCTFTree->GetEntries();
309+
if (mInput.maxTFs > 0) {
310+
entries = std::min(entries, mInput.maxTFs);
311+
}
312+
if (mInput.maxTFsPerFile > 0) {
313+
entries = std::min(entries, mInput.maxTFsPerFile);
314+
}
315+
mInput.ctfIDs.clear();
316+
mInput.ctfIDs.resize(entries);
317+
std::iota(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), 0);
318+
}
319+
std::random_device dev;
320+
std::mt19937 gen{dev()};
321+
std::shuffle(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), gen);
322+
LOGP(info, "will shuffle reading of CTF entries in this order:");
323+
for (int i{0}; i < (int)mInput.ctfIDs.size(); ++i) {
324+
LOGP(info, "\tTF {:02} -> {:02}", i, mInput.ctfIDs[i]);
325+
}
326+
}
302327
} catch (const std::exception& e) {
303328
LOG(error) << "Cannot process " << flname << ", reason: " << e.what();
304329
mCTFTree.reset();
@@ -322,9 +347,12 @@ void CTFReaderSpec::run(ProcessingContext& pc)
322347
long startWait = 0;
323348

324349
while (mRunning) {
325-
if (mCTFTree) { // there is a tree open with multiple CTF
326-
if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter) { // no selection requested or matching CTF ID is found
350+
if (mCTFTree) { // there is a tree open with multiple CTF
351+
if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter || mInput.shuffle) { // no selection requested or matching CTF ID is found
327352
LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops();
353+
if (mInput.shuffle) {
354+
mCurrTreeEntry = mInput.ctfIDs[mSelIDEntry];
355+
}
328356
mSelIDEntry++;
329357
if (processTF(pc)) {
330358
break;
@@ -500,8 +528,13 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
500528
///_______________________________________
501529
void CTFReaderSpec::checkTreeEntries()
502530
{
503-
// check if the tree has entries left, if needed, close current tree/file
504-
if (++mCurrTreeEntry >= mCTFTree->GetEntries() || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
531+
bool reachedEnd{false};
532+
if (mInput.shuffle) { // last entry is last id
533+
reachedEnd = (mCurrTreeEntry == mInput.ctfIDs.back());
534+
} else { // check if the tree has entries left, if needed, close current tree/file
535+
reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
536+
}
537+
if (reachedEnd || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
505538
mCTFTree.reset();
506539
mCTFFile->Close();
507540
mCTFFile.reset();

Detectors/CTF/workflow/src/ctf-reader-workflow.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
5656
options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
5757
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
5858
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
59+
options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}});
5960
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
6061
options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
6162
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
@@ -120,6 +121,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
120121

121122
ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
122123

124+
ctfInput.shuffle = configcontext.options().get<bool>("shuffle");
123125
ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
124126
ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
125127
ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");

0 commit comments

Comments
 (0)