Skip to content

Commit 64077ed

Browse files
committed
send messages at every TF
1 parent 3d3ee4d commit 64077ed

File tree

1 file changed

+37
-33
lines changed

1 file changed

+37
-33
lines changed

Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <iostream>
2828
#include <memory>
2929
#include <string>
30+
#include <stdexcept>
3031

3132
using namespace o2::framework;
3233

@@ -63,61 +64,64 @@ class DigitSamplerTask : public io::DigitIOBaseTask
6364

6465
void outputAndClear(DataAllocator& out)
6566
{
66-
printSummary(mDigits, mROFs, "-> to output");
67+
LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size());
6768
out.snapshot(OutputRef{"rofs"}, mROFs);
6869
out.snapshot(OutputRef{"digits"}, mDigits);
6970
mDigits.clear();
7071
mROFs.clear();
7172
}
7273

73-
bool shouldEnd() const
74+
bool shouldEnd()
7475
{
7576
bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames;
7677
bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs;
77-
return !mReadIsOk || maxTFreached || maxROFreached;
78+
bool lastTF = mInput.peek() == EOF;
79+
return !mReadIsOk || lastTF || maxTFreached || maxROFreached;
7880
}
7981

8082
void run(ProcessingContext& pc)
8183
{
8284
if (shouldEnd()) {
83-
// output remaining data if any
84-
if (mROFs.size() > 0) {
85-
--mTFid;
86-
outputAndClear(pc.outputs());
87-
}
88-
pc.services().get<ControlService>().endOfStream();
89-
return;
85+
throw std::invalid_argument("process should have ended already");
9086
}
9187

9288
std::vector<ROFRecord> rofs;
9389
std::vector<Digit> digits;
94-
mReadIsOk = mDigitSampler->read(digits, rofs);
95-
if (!mReadIsOk) {
96-
return;
97-
}
90+
while ((mReadIsOk = mDigitSampler->read(digits, rofs))) {
91+
92+
// process the current input TF if requested
93+
if (shouldProcess()) {
94+
incNofProcessedTFs();
95+
mNofProcessedROFs += rofs.size();
96+
// append rofs to mROFs, but shift the indices by the amount of digits
97+
// we have read so far.
98+
auto offset = mDigits.size();
99+
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
100+
[offset](ROFRecord r) {
101+
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
102+
return r;
103+
});
104+
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
105+
printSummary(mDigits, mROFs);
106+
printFull(mDigits, mROFs);
107+
}
98108

99-
if (shouldProcess()) {
100-
incNofProcessedTFs();
101-
mNofProcessedROFs += rofs.size();
102-
// append rofs to mROFs, but shift the indices by the amount of digits
103-
// we have read so far.
104-
auto offset = mDigits.size();
105-
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
106-
[offset](ROFRecord r) {
107-
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
108-
return r;
109-
});
110-
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
111-
printSummary(mDigits, mROFs);
112-
printFull(mDigits, mROFs);
113-
}
109+
// increment the input TF id for the next one
110+
incTFid();
114111

115-
// output if we've accumulated enough ROFs
116-
if (mROFs.size() >= mMinNumberOfROFsPerTF) {
117-
outputAndClear(pc.outputs());
112+
// stop here if we've accumulated enough ROFs or TFs
113+
if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) {
114+
break;
115+
}
118116
}
119117

120-
incTFid();
118+
// output whatever has been accumulated, even if empty
119+
outputAndClear(pc.outputs());
120+
121+
if (shouldEnd()) {
122+
pc.services().get<ControlService>().endOfStream();
123+
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
124+
}
121125
}
122126
};
123127

0 commit comments

Comments
 (0)