Skip to content

Commit 6ff8405

Browse files
committed
Fix TimingInfo.timeslice vs DPH.startTime mismatch
1 parent 80c4d14 commit 6ff8405

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ class TFReaderSpec : public o2f::Task
8787
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
8888
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
8989
int mConvRunTimeRangesToOrbits = -1; // not defined yet
90-
int mTFCounter = 0;
90+
int mSentTFCounter = 0;
91+
int mAccTFCounter = 0;
9192
int mTFBuilderCounter = 0;
9293
int mNWaits = 0;
9394
int mTFLength = 32;
@@ -159,15 +160,15 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
159160
}
160161
if (hd->splitPayloadIndex == 0) { // check the 1st one only
161162
auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
162-
if (entry.count != this->mTFCounter) {
163+
if (entry.count != this->mSentTFCounter) {
163164
if (verbose && hdPrev) { // report previous partition size
164165
LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
165166
}
166167
dsizeTot += dsize;
167168
dsize = 0;
168-
entry.count = this->mTFCounter; // acknowledge identifier seen in the data
169+
entry.count = this->mSentTFCounter; // acknowledge identifier seen in the data
169170
LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
170-
<< "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mTFCounter;
171+
<< "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mSentTFCounter;
171172
nblocks++;
172173
}
173174
}
@@ -219,11 +220,11 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
219220
const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
220221
const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
221222
for (auto& out : this->mSeenOutputMap) {
222-
if (out.second.count == this->mTFCounter) { // was seen in the data
223+
if (out.second.count == this->mSentTFCounter) { // was seen in the data
223224
continue;
224225
}
225226
LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
226-
<< "/" << out.second.defSubSpec << " for TF " << this->mTFCounter;
227+
<< "/" << out.second.defSubSpec << " for TF " << this->mSentTFCounter;
227228
o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
228229
outHeader.payloadSerializationMethod = o2h::gSerializationMethodNone;
229230
outHeader.firstTForbit = hd0->firstTForbit;
@@ -270,7 +271,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
270271

271272
auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
272273
auto tDiff = tNow - tLastTF;
273-
if (mTFCounter && tDiff < mInput.delay_us) {
274+
if (mSentTFCounter && tDiff < mInput.delay_us) {
274275
std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
275276
}
276277
for (auto& msgIt : *tfPtr.get()) {
@@ -285,9 +286,9 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
285286
// however this is a small enough hack for now.
286287
ctx.services().get<o2f::MessageContext>().fakeDispatch();
287288
tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
288-
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
289+
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter, dataSize, nparts, mSentTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
289290
tLastTF = tNow;
290-
++mTFCounter;
291+
++mSentTFCounter;
291292

292293
while (mTFQueue.size() == 0 && mWaitSendingLast) {
293294
usleep(10000);
@@ -300,7 +301,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
300301
}
301302
// usleep(5000); // wait 5ms for new TF to be built
302303
}
303-
if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
304+
if (mSentTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
304305
stopProcessing(ctx);
305306
}
306307
}
@@ -325,7 +326,7 @@ void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
325326
return;
326327
}
327328
stopDone = true;
328-
LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
329+
LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
329330
mRunning = false;
330331
if (mFileFetcher) {
331332
mFileFetcher->stop();
@@ -420,7 +421,7 @@ void TFReaderSpec::TFBuilder()
420421
std::this_thread::sleep_for(sleepTime);
421422
continue;
422423
}
423-
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
424+
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity);
424425
bool acceptTF = true;
425426
if (tf) {
426427
if (mRunTimeRanges.size()) {
@@ -443,21 +444,23 @@ void TFReaderSpec::TFBuilder()
443444
locID++;
444445
if (!mInput.tfIDs.empty() && acceptTF) {
445446
acceptTF = false;
447+
while ((mInput.tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.tfIDs.size()) {
448+
mSelIDEntry++;
449+
}
450+
LOGP(info, "chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.tfIDs[mSelIDEntry], mTFBuilderCounter);
446451
if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
447452
mWaitSendingLast = false;
448453
acceptTF = true;
449454
LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
450-
mSelIDEntry++;
451455
} else {
452456
LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
453457
}
454-
} else {
455-
mSelIDEntry++;
456458
}
457459
mTFBuilderCounter++;
458460
}
459461
if (mRunning && tf) {
460462
if (acceptTF) {
463+
mAccTFCounter++;
461464
mWaitSendingLast = true;
462465
mTFQueue.push(std::move(tf));
463466
}

0 commit comments

Comments
 (0)