Skip to content

Commit dcc0b55

Browse files
committed
Ignore stored DistSTF by default
1 parent 63c42d3 commit dcc0b55

4 files changed

Lines changed: 60 additions & 25 deletions

File tree

Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ class SubTimeFrameFileReader
4646
public:
4747

4848
SubTimeFrameFileReader() = delete;
49-
SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask);
49+
SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF);
5050
~SubTimeFrameFileReader();
5151

5252
/// Read a single TF from the file
53-
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity);
53+
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice);
5454

5555
/// Tell the current position of the file
5656
inline std::uint64_t position() const { return mFileMapOffset; }
@@ -76,6 +76,13 @@ class SubTimeFrameFileReader
7676
std::uint64_t mFileMapOffset = 0;
7777
std::uint64_t mFileSize = 0;
7878

79+
int mVerbosity = 0;
80+
bool mSup0xccdb = true;
81+
bool mRepaireHeaders = true;
82+
bool mRejectDistSTF = true;
83+
84+
const std::string describeHeader(const o2::header::DataHeader& hd, bool full = false) const;
85+
7986
// helper to make sure written chunks are buffered, only allow pointers
8087
template <typename pointer,
8188
typename = std::enable_if_t<std::is_pointer<pointer>::value>>

Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ namespace o2f = o2::framework;
4545
/// SubTimeFrameFileReader
4646
////////////////////////////////////////////////////////////////////////////////
4747

48-
SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask)
49-
: mFileName(pFileName)
48+
SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF)
49+
: mFileName(pFileName), mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF)
5050
{
5151
mFileMap.open(mFileName);
5252
if (!mFileMap.is_open()) {
@@ -178,13 +178,21 @@ Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize)
178178
return Stack(lStackMem);
179179
}
180180

181+
const std::string SubTimeFrameFileReader::describeHeader(const o2::header::DataHeader& hd, bool full) const
182+
{
183+
std::string res = fmt::format("{}", o2f::DataSpecUtils::describe(o2::framework::OutputSpec{hd.dataOrigin, hd.dataDescription, hd.subSpecification}));
184+
if (full) {
185+
res += fmt::format(" part:{}/{} sz:{} TF:{} Orb:{} Run:{}", hd.splitPayloadIndex, hd.splitPayloadParts, hd.payloadSize, hd.tfCounter, hd.firstTForbit, hd.runNumber);
186+
}
187+
return res;
188+
}
189+
181190
std::uint32_t sRunNumber = 0; // TODO: add id to files metadata
182191
std::uint32_t sFirstTForbit = 0; // TODO: add id to files metadata
183192
std::uint64_t sCreationTime = 0;
184193
std::mutex stfMtx;
185194

186-
std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes,
187-
const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity)
195+
std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice)
188196
{
189197
std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
190198
auto& msgMap = *messagesPerRoute.get();
@@ -252,13 +260,13 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
252260
return nullptr;
253261
}
254262
lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
255-
if (verbosity > 0) {
263+
if (mVerbosity > 0) {
256264
LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
257265
}
258266
if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
259267
return nullptr;
260268
}
261-
if (verbosity > 0) {
269+
if (mVerbosity > 0) {
262270
LOGP(info, "TFMeta : {}", lStfFileMeta.info());
263271
}
264272
if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
@@ -342,21 +350,18 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
342350
}
343351
DataHeader locDataHeader(*lDataHeader);
344352

345-
if (repaireHeaders) {
346-
auto descHeader = [&locDataHeader]() {
347-
return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification});
348-
};
353+
if (mRepaireHeaders) {
349354
if (locDataHeader == prevHeader) {
350355
if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) {
351-
if (verbosity > 3) {
352-
LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader());
356+
if (mVerbosity > 3) {
357+
LOGP(warn, "Repairing wrong part index for {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
353358
}
354359
locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts;
355360
}
356361
} else { // new header
357362
if (locDataHeader.splitPayloadIndex != 0) {
358-
if (verbosity > 2) {
359-
LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader());
363+
if (mVerbosity > 2) {
364+
LOGP(warn, "Repairing wrong part index for new {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
360365
}
361366
locDataHeader.splitPayloadIndex = 0;
362367
}
@@ -378,6 +383,18 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
378383
}
379384
locDataHeader.runNumber = runNumberFallBack;
380385
}
386+
const std::uint64_t lDataSize = locDataHeader.payloadSize;
387+
388+
if (locDataHeader.dataOrigin == o2::header::gDataOriginFLP && locDataHeader.dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) {
389+
if (mVerbosity > 0) {
390+
LOGP(warn, "Ignoring stored {}", describeHeader(locDataHeader));
391+
}
392+
if (!ignore_nbytes(lDataSize)) {
393+
return nullptr;
394+
}
395+
lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
396+
continue;
397+
}
381398
o2::header::Stack headerStack{locDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
382399
if (stfHeader.runNumber == -1) {
383400
stfHeader.id = locDataHeader.tfCounter;
@@ -387,8 +404,6 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
387404
sRunNumber = stfHeader.runNumber;
388405
sFirstTForbit = stfHeader.firstOrbit;
389406
}
390-
391-
const std::uint64_t lDataSize = locDataHeader.payloadSize;
392407
// do we accept these data?
393408
auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin);
394409
if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it
@@ -431,17 +446,20 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
431446
if (!read_advance(lDataMsg->GetData(), lDataSize)) {
432447
return nullptr;
433448
}
434-
if (verbosity > 0) {
435-
if (verbosity > 1 || locDataHeader.splitPayloadIndex == 0) {
449+
if (mVerbosity > 0) {
450+
if (mVerbosity > 1 || locDataHeader.splitPayloadIndex == 0) {
436451
printStack(headerStack);
437-
if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && verbosity > 2) {
452+
if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && mVerbosity > 2) {
438453
o2::raw::RDHUtils::printRDH(lDataMsg->GetData());
439454
}
440455
}
441456
}
442457
#ifdef _RUN_TIMING_MEASUREMENT_
443458
addPartSW.Start(false);
444459
#endif
460+
if (mVerbosity > 2) {
461+
LOGP(info, "addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader, true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize());
462+
}
445463
addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
446464
#ifdef _RUN_TIMING_MEASUREMENT_
447465
addPartSW.Stop();
@@ -463,7 +481,7 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
463481
}
464482

465483
unsigned stfSS[2] = {0, 0xccdb};
466-
for (int iss = 0; iss < (sup0xccdb ? 1 : 2); iss++) {
484+
for (int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
467485
o2::header::DataHeader stfDistDataHeader(o2::header::gDataDescriptionDISTSTF, o2::header::gDataOriginFLP, stfSS[iss], sizeof(STFHeader), 0, 1);
468486
stfDistDataHeader.payloadSerializationMethod = o2::header::gSerializationMethodNone;
469487
stfDistDataHeader.firstTForbit = stfHeader.firstOrbit;
@@ -473,7 +491,7 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
473491
if (!fmqChannel.empty()) { // no output channel
474492
auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
475493
o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
476-
if (verbosity > 0) {
494+
if (mVerbosity > 0) {
477495
printStack(headerStackSTF);
478496
}
479497
auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
@@ -483,6 +501,9 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
483501
#ifdef _RUN_TIMING_MEASUREMENT_
484502
addPartSW.Start(false);
485503
#endif
504+
if (mVerbosity > 2) {
505+
LOGP(info, "addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader, true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize());
506+
}
486507
addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
487508
#ifdef _RUN_TIMING_MEASUREMENT_
488509
addPartSW.Stop();

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ void TFReaderSpec::init(o2f::InitContext& ic)
119119
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
120120
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
121121
mInput.repairHeaders = !ic.options().get<bool>("ignore-repair-headers");
122+
mInput.rejectDistSTF = !ic.options().get<bool>("read-dist-stf");
122123

123124
if (!mInput.fileRunTimeSpans.empty()) {
124125
loadRunTimeSpans(mInput.fileRunTimeSpans);
@@ -265,7 +266,11 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
265266
setTimingInfo(*tfPtr.get());
266267
size_t nparts = 0, dataSize = 0;
267268
if (mInput.sendDummyForMissing) {
269+
int cntAck = 0;
268270
for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data
271+
if (mInput.verbosity > 0) {
272+
LOGP(info, "acknowledgeOutput {}", cntAck++);
273+
}
269274
acknowledgeOutput(*msgIt.second.get(), true);
270275
}
271276
addMissingParts(*tfPtr.get());
@@ -411,7 +416,7 @@ void TFReaderSpec::TFBuilder()
411416
}
412417

413418
LOG(info) << "Processing file " << tfFileName;
414-
SubTimeFrameFileReader reader(tfFileName, mInput.detMask);
419+
SubTimeFrameFileReader reader(tfFileName, mInput.detMask, mInput.verbosity, mInput.sup0xccdb, mInput.repairHeaders, mInput.rejectDistSTF);
415420
size_t locID = 0;
416421
// try
417422
{
@@ -423,7 +428,7 @@ void TFReaderSpec::TFBuilder()
423428
std::this_thread::sleep_for(sleepTime);
424429
continue;
425430
}
426-
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity);
431+
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter);
427432
bool acceptTF = true;
428433
if (tf) {
429434
if (mRunTimeRanges.size()) {
@@ -678,6 +683,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
678683
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
679684
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
680685
spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}});
686+
spec.options.emplace_back(o2f::ConfigParamSpec{"read-dist-stf", o2f::VariantType::Bool, false, {"do not ignore stored FLP/DISTSUBTIMEFRAME (will clash with injected one)"}});
681687
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
682688
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
683689
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});

Detectors/Raw/TFReaderDD/src/TFReaderSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct TFReaderInp {
5050
bool sup0xccdb = false;
5151
bool invertIRFramesSelection = false;
5252
bool repairHeaders = true;
53+
bool rejectDistSTF = true;
5354
std::vector<o2::header::DataHeader> hdVec;
5455
std::vector<int> tfIDs{};
5556
};

0 commit comments

Comments
 (0)