Skip to content

Commit 65879e8

Browse files
committed
Repair raw tf part counters / headers
Can be disabled by --ignore-repair-headers.
1 parent ebf4195 commit 65879e8

6 files changed

Lines changed: 53 additions & 7 deletions

File tree

Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <vector>
2222

2323
#include <Headers/DataHeader.h>
24+
#include "Framework/DataSpecUtils.h"
25+
#include "Framework/OutputSpec.h"
2426
#include "Framework/Logger.h"
2527

2628
namespace o2
@@ -151,13 +153,13 @@ struct SubTimeFrameFileMeta {
151153
///
152154
std::uint64_t mWriteTimeMs;
153155

154-
auto getTimePoint()
156+
auto getTimePoint() const
155157
{
156158
using namespace std::chrono;
157159
return time_point<system_clock, milliseconds>{milliseconds{mWriteTimeMs}};
158160
}
159161

160-
std::string getTimeString()
162+
std::string getTimeString() const
161163
{
162164
using namespace std::chrono;
163165
std::time_t lTime = system_clock::to_time_t(getTimePoint());
@@ -167,6 +169,11 @@ struct SubTimeFrameFileMeta {
167169
return lTimeStream.str();
168170
}
169171

172+
const std::string info() const
173+
{
174+
return fmt::format("Size in file: {} Time: {} Version: {}", mStfSizeInFile, getTimeString(), mStfFileVersion);
175+
}
176+
170177
SubTimeFrameFileMeta(const std::uint64_t pStfSize)
171178
: SubTimeFrameFileMeta()
172179
{
@@ -220,6 +227,11 @@ struct SubTimeFrameFileDataIndex {
220227
static_assert(sizeof(DataIndexElem) == 48,
221228
"DataIndexElem changed -> Binary compatibility is lost!");
222229
}
230+
231+
const std::string info() const
232+
{
233+
return fmt::format("DH: {} Cnt:{} Size:{} Offset:{}", o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{mDataOrigin, mDataDescription, mSubSpecification}), mDataBlockCnt, mSize, mOffset);
234+
}
223235
};
224236

225237
SubTimeFrameFileDataIndex() = default;
@@ -240,6 +252,8 @@ struct SubTimeFrameFileDataIndex {
240252
return sizeof(o2::header::DataHeader) + (sizeof(DataIndexElem) * mDataIndex.size());
241253
}
242254

255+
const std::vector<DataIndexElem>& getDataIndex() const { return mDataIndex; }
256+
243257
friend std::ostream& operator<<(std::ostream& pStream, const SubTimeFrameFileDataIndex& pIndex);
244258

245259
private:

Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class SubTimeFrameFileReader
5050
~SubTimeFrameFileReader();
5151

5252
/// Read a single TF from the file
53-
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity);
53+
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity);
5454

5555
/// Tell the current position of the file
5656
inline std::uint64_t position() const { return mFileMapOffset; }

Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ std::uint64_t sCreationTime = 0;
184184
std::mutex stfMtx;
185185

186186
std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes,
187-
const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity)
187+
const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity)
188188
{
189189
std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
190190
auto& msgMap = *messagesPerRoute.get();
@@ -252,10 +252,15 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
252252
return nullptr;
253253
}
254254
lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
255-
LOGP(debug, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
255+
if (verbosity > 0) {
256+
LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
257+
}
256258
if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
257259
return nullptr;
258260
}
261+
if (verbosity > 0) {
262+
LOGP(info, "TFMeta : {}", lStfFileMeta.info());
263+
}
259264
if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
260265
if (!creation0Notified) {
261266
creation0Notified = true;
@@ -319,6 +324,7 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
319324

320325
std::int64_t lLeftToRead = lStfDataSize;
321326
STFHeader stfHeader{tfID, -1u, -1u};
327+
DataHeader prevHeader;
322328
// read <hdrStack + data> pairs
323329
while (lLeftToRead > 0) {
324330
// allocate and read the Headers
@@ -335,6 +341,28 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
335341
return nullptr;
336342
}
337343
DataHeader locDataHeader(*lDataHeader);
344+
345+
if (repaireHeaders) {
346+
auto descHeader = [&locDataHeader]() {
347+
return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification});
348+
};
349+
if (locDataHeader == prevHeader) {
350+
if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) {
351+
if (verbosity > 3) {
352+
LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader());
353+
}
354+
locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts;
355+
}
356+
} else { // new header
357+
if (locDataHeader.splitPayloadIndex != 0) {
358+
if (verbosity > 2) {
359+
LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader());
360+
}
361+
locDataHeader.splitPayloadIndex = 0;
362+
}
363+
}
364+
prevHeader = locDataHeader;
365+
}
338366
// sanity check
339367
if (int(locDataHeader.firstTForbit) == -1) {
340368
if (!negativeOrbitNotified) {

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ void TFReaderSpec::init(o2f::InitContext& ic)
118118
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
119119
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
120120
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
121+
mInput.repairHeaders = !ic.options().get<bool>("ignore-repair-headers");
122+
121123
if (!mInput.fileRunTimeSpans.empty()) {
122124
loadRunTimeSpans(mInput.fileRunTimeSpans);
123125
}
@@ -421,7 +423,7 @@ void TFReaderSpec::TFBuilder()
421423
std::this_thread::sleep_for(sleepTime);
422424
continue;
423425
}
424-
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity);
426+
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity);
425427
bool acceptTF = true;
426428
if (tf) {
427429
if (mRunTimeRanges.size()) {
@@ -675,6 +677,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
675677
}
676678
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
677679
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
680+
spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}});
678681
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
679682
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
680683
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});

Detectors/Raw/TFReaderDD/src/TFReaderSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ struct TFReaderInp {
4949
bool sendDummyForMissing = true;
5050
bool sup0xccdb = false;
5151
bool invertIRFramesSelection = false;
52+
bool repairHeaders = true;
5253
std::vector<o2::header::DataHeader> hdVec;
5354
std::vector<int> tfIDs{};
5455
};

Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
3434
options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}});
3535
options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
3636
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
37-
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
37+
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH), report repairs"}});
3838
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
3939
options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
4040
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});

0 commit comments

Comments
 (0)