Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct CTFReaderInp {
std::string fileIRFrames{};
std::string fileRunTimeSpans{};
std::vector<int> ctfIDs{};
bool reverseCTFIDs{false};
bool skipSkimmedOutTF = false;
bool invertIRFramesSelection = false;
bool allowMissingDetectors = false;
Expand All @@ -47,6 +48,7 @@ struct CTFReaderInp {
unsigned int decSSpecEMC = 0;
int tfRateLimit = -999;
size_t minSHM = 0;
bool shuffle{false};
};

/// create a processor spec
Expand Down
47 changes: 42 additions & 5 deletions Detectors/CTF/workflow/src/CTFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@

/// @file CTFReaderSpec.cxx

#include <random>
#include <vector>
#include <algorithm>
#include <numeric>

#include <TFile.h>
#include <TTree.h>
#include <TStopwatch.h>

#include "Framework/Logger.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -49,7 +54,6 @@
#include "CCDB/BasicCCDBManager.h"
#include "CommonConstants/LHCConstants.h"
#include "Algorithm/RangeTokenizer.h"
#include <TStopwatch.h>
#include <fairmq/Device.h>

using namespace o2::framework;
Expand Down Expand Up @@ -155,6 +159,9 @@ void CTFReaderSpec::stopReader()
void CTFReaderSpec::init(InitContext& ic)
{
mInput.ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-ctf-ids"));
if ((mInput.reverseCTFIDs = ic.options().get<bool>("reverse-select-ctf-ids"))) {
std::reverse(mInput.ctfIDs.begin(), mInput.ctfIDs.end());
}
mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
Expand Down Expand Up @@ -299,6 +306,27 @@ void CTFReaderSpec::openCTFFile(const std::string& flname)
if (mCTFTree->GetEntries() < 1) {
throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname));
}
if (mInput.shuffle) {
if (mInput.ctfIDs.empty()) {
int entries = mCTFTree->GetEntries();
if (mInput.maxTFs > 0) {
entries = std::min(entries, mInput.maxTFs);
}
if (mInput.maxTFsPerFile > 0) {
entries = std::min(entries, mInput.maxTFsPerFile);
}
mInput.ctfIDs.clear();
mInput.ctfIDs.resize(entries);
std::iota(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), 0);
}
std::random_device dev;
std::mt19937 gen{dev()};
std::shuffle(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), gen);
LOGP(info, "will shuffle reading of CTF entries in this order:");
for (int i{0}; i < (int)mInput.ctfIDs.size(); ++i) {
LOGP(info, "\tTF {:02} -> {:02}", i, mInput.ctfIDs[i]);
}
}
} catch (const std::exception& e) {
LOG(error) << "Cannot process " << flname << ", reason: " << e.what();
mCTFTree.reset();
Expand All @@ -322,9 +350,12 @@ void CTFReaderSpec::run(ProcessingContext& pc)
long startWait = 0;

while (mRunning) {
if (mCTFTree) { // there is a tree open with multiple CTF
if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter) { // no selection requested or matching CTF ID is found
if (mCTFTree) { // there is a tree open with multiple CTF
if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter || mInput.shuffle || mInput.reverseCTFIDs) { // no selection requested or matching CTF ID is found
LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops();
if (mInput.shuffle || mInput.reverseCTFIDs) {
mCurrTreeEntry = mInput.ctfIDs[mSelIDEntry];
}
mSelIDEntry++;
if (processTF(pc)) {
break;
Expand Down Expand Up @@ -500,8 +531,13 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
///_______________________________________
void CTFReaderSpec::checkTreeEntries()
{
// check if the tree has entries left, if needed, close current tree/file
if (++mCurrTreeEntry >= mCTFTree->GetEntries() || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
bool reachedEnd{false};
if (mInput.shuffle || mInput.reverseCTFIDs) { // last entry is last id
reachedEnd = (mCurrTreeEntry == mInput.ctfIDs.back());
} else { // check if the tree has entries left, if needed, close current tree/file
reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
}
if (reachedEnd || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
mCTFTree.reset();
mCTFFile->Close();
mCTFFile.reset();
Expand Down Expand Up @@ -611,6 +647,7 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
}

options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
options.emplace_back(ConfigParamSpec{"reverse-select-ctf-ids", VariantType::Bool, false, {"reverse order of to inject CTF IDs"}});
options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
Expand Down
2 changes: 2 additions & 0 deletions Detectors/CTF/workflow/src/ctf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}});
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
Expand Down Expand Up @@ -120,6 +121,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)

ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));

ctfInput.shuffle = configcontext.options().get<bool>("shuffle");
ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");
Expand Down