Skip to content

Commit 056b5f4

Browse files
authored
DPL: get rid of the size method of the MessageSet (#15206)
One more step in getting rid of the artificial container "MessageSet". By removing the size method, we imply that any sequence of messages can have their number of parts computed, regardless of how we store them and how the ownership of such parts works.
1 parent 35fc90d commit 056b5f4

File tree

5 files changed

+34
-40
lines changed

5 files changed

+34
-40
lines changed

Framework/Core/include/Framework/MessageSet.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ struct MessageSet {
8282
return *this;
8383
}
8484

85-
/// get number of in-flight O2 messages
86-
[[nodiscard]] size_t size() const
87-
{
88-
return messages | count_parts{};
89-
}
90-
9185
/// get number of header-payload pairs
9286
[[nodiscard]] size_t getNumberOfPairs() const
9387
{

Framework/Core/src/DataRelayer.cxx

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
184184
// We check that no data is already there for the given cell
185185
// it is enough to check the first element
186186
auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
187-
if (part.size() > 0 && part.header(0) != nullptr) {
187+
if (!part.messages.empty() && part.header(0) != nullptr) {
188188
headerPresent++;
189189
continue;
190190
}
191-
if (part.size() > 0 && part.payload(0) != nullptr) {
191+
if (!part.messages.empty() && part.payload(0) != nullptr) {
192192
payloadPresent++;
193193
continue;
194194
}
@@ -213,7 +213,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
213213
auto partial = getPartialRecord(ti);
214214
// TODO: get the data ref from message model
215215
auto getter = [&partial](size_t idx, size_t part) {
216-
if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
216+
if (!partial[idx].messages.empty() && partial[idx].header(part).get()) {
217217
auto header = partial[idx].header(part).get();
218218
auto payload = partial[idx].payload(part).get();
219219
return DataRef{nullptr,
@@ -224,7 +224,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
224224
return DataRef{};
225225
};
226226
auto nPartsGetter = [&partial](size_t idx) {
227-
return partial[idx].size();
227+
return partial[idx].messages | count_parts{};
228228
};
229229
auto refCountGetter = [&partial](size_t idx) -> int {
230230
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
@@ -327,7 +327,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan
327327
for (size_t mi = 0; mi < mInputs.size(); ++mi) {
328328
auto& input = mInputs[mi];
329329
auto& element = mCache[si * mInputs.size() + mi];
330-
if (element.size() != 0) {
330+
if (!element.messages.empty()) {
331331
if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
332332
didDrop = true;
333333
auto& state = mContext.get<DeviceState>();
@@ -353,7 +353,7 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan
353353
continue;
354354
}
355355
auto& element = mCache[si * mInputs.size() + mi];
356-
if (element.size() == 0) {
356+
if (element.messages.empty()) {
357357
auto& state = mContext.get<DeviceState>();
358358
if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
359359
if (state.allowedProcessing == DeviceState::CalibrationOnly) {
@@ -411,11 +411,11 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop)
411411
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
412412
// TODO: in the original implementation of the cache, there have been only two messages per entry,
413413
// check if the 2 above corresponds to the number of messages.
414-
if (cache[cacheId].size() > 0) {
414+
if (!cache[cacheId].messages.empty()) {
415415
dropped[ai] = std::move(cache[cacheId]);
416416
}
417417
}
418-
bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
418+
bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.messages.empty(); });
419419
if (anyDropped) {
420420
O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
421421
O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
@@ -786,7 +786,7 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
786786
auto partial = getPartialRecord(li);
787787
// TODO: get the data ref from message model
788788
auto getter = [&partial](size_t idx, size_t part) {
789-
if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
789+
if (!partial[idx].messages.empty() && partial[idx].header(part).get()) {
790790
auto header = partial[idx].header(part).get();
791791
auto payload = partial[idx].payload(part).get();
792792
return DataRef{nullptr,
@@ -797,7 +797,7 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
797797
return DataRef{};
798798
};
799799
auto nPartsGetter = [&partial](size_t idx) {
800-
return partial[idx].size();
800+
return partial[idx].messages | count_parts{};
801801
};
802802
auto refCountGetter = [&partial](size_t idx) -> int {
803803
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
@@ -897,7 +897,7 @@ std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice
897897
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
898898
// TODO: in the original implementation of the cache, there have been only two messages per entry,
899899
// check if the 2 above corresponds to the number of messages.
900-
if (cache[cacheId].size() > 0) {
900+
if (!cache[cacheId].messages.empty()) {
901901
messages[arg] = std::move(cache[cacheId]);
902902
}
903903
index.markAsInvalid(s);
@@ -951,7 +951,7 @@ std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTime
951951
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
952952
// TODO: in the original implementation of the cache, there have been only two messages per entry,
953953
// check if the 2 above corresponds to the number of messages.
954-
for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
954+
for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) {
955955
auto& header = cache[cacheId].header(pi);
956956
auto&& newHeader = header->GetTransport()->CreateMessage();
957957
newHeader->Copy(*header);

Framework/Core/test/benchmark_DataRelayer.cxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
9696
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
9797
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
9898
assert(result.size() == 1);
99-
assert(result.at(0).size() == 1);
99+
assert((result.at(0).messages | count_parts{}) == 1);
100100
inflightMessages = std::move(result[0].messages);
101101
}
102102
}
@@ -153,7 +153,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
153153
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
154154
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
155155
assert(result.size() == 1);
156-
assert(result.at(0).size() == 1);
156+
assert((result.at(0).messages | count_parts{}) == 1);
157157
inflightMessages = std::move(result[0].messages);
158158
}
159159
}
@@ -228,8 +228,8 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
228228
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
229229
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
230230
assert(result.size() == 2);
231-
assert(result.at(0).size() == 1);
232-
assert(result.at(1).size() == 1);
231+
assert((result.at(0).messages | count_parts{}) == 1);
232+
assert((result.at(1).messages | count_parts{}) == 1);
233233
inflightMessages = std::move(result[0].messages);
234234
inflightMessages.emplace_back(std::move(result[1].messages[0]));
235235
inflightMessages.emplace_back(std::move(result[1].messages[1]));

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ TEST_CASE("DataRelayer")
115115
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
116116
// one MessageSet with one PartRef with header and payload
117117
REQUIRE(result.size() == 1);
118-
REQUIRE(result.at(0).size() == 1);
118+
REQUIRE((result.at(0).messages | count_parts{}) == 1);
119119
}
120120

121121
//
@@ -165,7 +165,7 @@ TEST_CASE("DataRelayer")
165165
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
166166
// one MessageSet with one PartRef with header and payload
167167
REQUIRE(result.size() == 1);
168-
REQUIRE(result.at(0).size() == 1);
168+
REQUIRE((result.at(0).messages | count_parts{}) == 1);
169169
}
170170

171171
// This test a more complicated set of inputs, and verifies that data is
@@ -245,8 +245,8 @@ TEST_CASE("DataRelayer")
245245
auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
246246
// two MessageSets, each with one PartRef
247247
REQUIRE(result.size() == 2);
248-
REQUIRE(result.at(0).size() == 1);
249-
REQUIRE(result.at(1).size() == 1);
248+
REQUIRE((result.at(0).messages | count_parts{}) == 1);
249+
REQUIRE((result.at(1).messages | count_parts{}) == 1);
250250
}
251251

252252
// This test a more complicated set of inputs, and verifies that data is
@@ -733,7 +733,7 @@ TEST_CASE("DataRelayer")
733733
// we have one input route and thus one message set containing pairs for all
734734
// payloads
735735
REQUIRE(messageSet.size() == 1);
736-
REQUIRE(messageSet[0].size() == nSplitParts);
736+
REQUIRE((messageSet[0].messages | count_parts{}) == nSplitParts);
737737
REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1);
738738
}
739739

@@ -796,7 +796,7 @@ TEST_CASE("DataRelayer")
796796
// we have one input route
797797
REQUIRE(messageSet.size() == 1);
798798
// one message set containing number of added sequences of messages
799-
REQUIRE(messageSet[0].size() == sequenceSize.size());
799+
REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size());
800800
size_t counter = 0;
801801
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
802802
REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute")
9292
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
9393
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
9494
messageSet.add(PartRef{std::move(header), std::move(payload)});
95-
REQUIRE(messageSet.size() == 1);
95+
REQUIRE((messageSet.messages | count_parts{}) == 1);
9696
currentSetOfInputs.emplace_back(std::move(messageSet));
9797

9898
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -143,7 +143,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
143143
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
144144
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
145145
messageSet.add(PartRef{std::move(header), std::move(payload)});
146-
REQUIRE(messageSet.size() == 1);
146+
REQUIRE((messageSet.messages | count_parts{}) == 1);
147147
currentSetOfInputs.emplace_back(std::move(messageSet));
148148

149149
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true);
@@ -198,7 +198,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
198198
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
199199
REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
200200
messageSet.add(PartRef{std::move(header), std::move(payload)});
201-
REQUIRE(messageSet.size() == 1);
201+
REQUIRE((messageSet.messages | count_parts{}) == 1);
202202
currentSetOfInputs.emplace_back(std::move(messageSet));
203203

204204
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -256,7 +256,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
256256
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih});
257257
REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
258258
messageSet.add(PartRef{std::move(header), std::move(payload)});
259-
REQUIRE(messageSet.size() == 1);
259+
REQUIRE((messageSet.messages | count_parts{}) == 1);
260260
currentSetOfInputs.emplace_back(std::move(messageSet));
261261

262262
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -321,7 +321,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
321321
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
322322
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
323323
messageSet.add(PartRef{std::move(header), std::move(payload)});
324-
REQUIRE(messageSet.size() == 1);
324+
REQUIRE((messageSet.messages | count_parts{}) == 1);
325325
currentSetOfInputs.emplace_back(std::move(messageSet));
326326

327327
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -384,7 +384,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
384384
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
385385
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
386386
messageSet.add(PartRef{std::move(header), std::move(payload)});
387-
REQUIRE(messageSet.size() == 1);
387+
REQUIRE((messageSet.messages | count_parts{}) == 1);
388388
currentSetOfInputs.emplace_back(std::move(messageSet));
389389

390390
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -455,12 +455,12 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
455455
auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
456456
MessageSet messageSet1;
457457
messageSet1.add(PartRef{std::move(header1), std::move(payload1)});
458-
REQUIRE(messageSet1.size() == 1);
458+
REQUIRE((messageSet1.messages | count_parts{}) == 1);
459459

460460
auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
461461
MessageSet messageSet2;
462462
messageSet2.add(PartRef{std::move(header2), std::move(payload2)});
463-
REQUIRE(messageSet2.size() == 1);
463+
REQUIRE((messageSet2.messages | count_parts{}) == 1);
464464
currentSetOfInputs.emplace_back(std::move(messageSet1));
465465
currentSetOfInputs.emplace_back(std::move(messageSet2));
466466
REQUIRE(currentSetOfInputs.size() == 2);
@@ -525,7 +525,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
525525
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
526526
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
527527
messageSet.add(PartRef{std::move(header), std::move(payload)});
528-
REQUIRE(messageSet.size() == 1);
528+
REQUIRE((messageSet.messages | count_parts{}) == 1);
529529
currentSetOfInputs.emplace_back(std::move(messageSet));
530530

531531
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -607,7 +607,7 @@ TEST_CASE("ForwardInputsSplitPayload")
607607
PartRef part{std::move(header2), transport->CreateMessage()};
608608
messageSet.add(std::move(part));
609609

610-
REQUIRE(messageSet.size() == 2);
610+
REQUIRE((messageSet.messages | count_parts{}) == 2);
611611
currentSetOfInputs.emplace_back(std::move(messageSet));
612612

613613
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -727,7 +727,7 @@ TEST_CASE("ForwardInputEOSSingleRoute")
727727
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
728728
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
729729
messageSet.add(PartRef{std::move(header), std::move(payload)});
730-
REQUIRE(messageSet.size() == 1);
730+
REQUIRE((messageSet.messages | count_parts{}) == 1);
731731
currentSetOfInputs.emplace_back(std::move(messageSet));
732732

733733
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
@@ -772,7 +772,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute")
772772
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
773773
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
774774
messageSet.add(PartRef{std::move(header), std::move(payload)});
775-
REQUIRE(messageSet.size() == 1);
775+
REQUIRE((messageSet.messages | count_parts{}) == 1);
776776
currentSetOfInputs.emplace_back(std::move(messageSet));
777777

778778
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);

0 commit comments

Comments
 (0)