Skip to content

Commit 8c44308

Browse files
committed
Drop add, reset
1 parent 3b23b76 commit 8c44308

File tree

4 files changed

+50
-53
lines changed

4 files changed

+50
-53
lines changed

Framework/Core/include/Framework/MessageSet.h

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ struct MessageSet {
4343
MessageSet(F getter, size_t size)
4444
: messages()
4545
{
46-
add(std::forward<F>(getter), size);
46+
for (size_t i = 0; i < size; ++i) {
47+
messages.emplace_back(std::move(getter(i)));
48+
}
4749
}
4850

4951
MessageSet(MessageSet&& other)
@@ -67,34 +69,6 @@ struct MessageSet {
6769
{
6870
messages.clear();
6971
}
70-
71-
// this is more or less legacy
72-
// PartRef has been earlier used to store fixed header-payload pairs
73-
// reset the set and store content of the part ref
74-
void reset(PartRef&& ref)
75-
{
76-
clear();
77-
add(std::move(ref));
78-
}
79-
80-
// this is more or less legacy
81-
// PartRef has been earlier used to store fixed header-payload pairs
82-
// add content of the part ref
83-
void add(PartRef&& ref)
84-
{
85-
messages.emplace_back(std::move(ref.header));
86-
messages.emplace_back(std::move(ref.payload));
87-
}
88-
89-
/// add an O2 message
90-
template <typename F>
91-
void add(F getter, size_t size)
92-
{
93-
for (size_t i = 0; i < size; ++i) {
94-
messages.emplace_back(std::move(getter(i)));
95-
}
96-
}
97-
9872
};
9973

10074
} // namespace o2::framework

Framework/Core/src/DataRelayer.cxx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,9 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
242242
assert(expirator.handler);
243243
PartRef newRef;
244244
expirator.handler(services, newRef, variables);
245-
part.reset(std::move(newRef));
245+
part.messages.clear();
246+
part.messages.emplace_back(std::move(newRef.header));
247+
part.messages.emplace_back(std::move(newRef.payload));
246248
activity.expiredSlots++;
247249

248250
mTimesliceIndex.markAsDirty(slot, true);
@@ -536,7 +538,9 @@ DataRelayer::RelayChoice
536538
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
537539
// Notice this will split [(header, payload), (header, payload)] multiparts
538540
// in N different subParts for the message spec.
539-
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
541+
for (size_t i = 0; i < nPayloads + 1; ++i) {
542+
target.messages.emplace_back(std::move(span[i]));
543+
}
540544
mi += nPayloads;
541545
saved += nPayloads;
542546
}
@@ -955,7 +959,8 @@ std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTime
955959
auto& header = cache[cacheId].messages | get_header{pi};
956960
auto&& newHeader = header->GetTransport()->CreateMessage();
957961
newHeader->Copy(*header);
958-
messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].messages | get_payload{pi, 0})});
962+
messages[arg].messages.emplace_back(std::move(newHeader));
963+
messages[arg].messages.emplace_back(std::move(cache[cacheId].messages | get_payload{pi, 0}));
959964
}
960965
};
961966

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute")
9191
fair::mq::MessagePtr payload(transport->CreateMessage());
9292
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
9393
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
94-
messageSet.add(PartRef{std::move(header), std::move(payload)});
94+
messageSet.messages.emplace_back(std::move(header));
95+
messageSet.messages.emplace_back(std::move(payload));
9596
REQUIRE((messageSet.messages | count_parts{}) == 1);
9697
currentSetOfInputs.emplace_back(std::move(messageSet));
9798

@@ -142,7 +143,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
142143
REQUIRE(payload.get() == nullptr);
143144
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
144145
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
145-
messageSet.add(PartRef{std::move(header), std::move(payload)});
146+
messageSet.messages.emplace_back(std::move(header));
147+
messageSet.messages.emplace_back(std::move(payload));
146148
REQUIRE((messageSet.messages | count_parts{}) == 1);
147149
currentSetOfInputs.emplace_back(std::move(messageSet));
148150

@@ -197,7 +199,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
197199
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
198200
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
199201
REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
200-
messageSet.add(PartRef{std::move(header), std::move(payload)});
202+
messageSet.messages.emplace_back(std::move(header));
203+
messageSet.messages.emplace_back(std::move(payload));
201204
REQUIRE((messageSet.messages | count_parts{}) == 1);
202205
currentSetOfInputs.emplace_back(std::move(messageSet));
203206

@@ -255,7 +258,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
255258
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
256259
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih});
257260
REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
258-
messageSet.add(PartRef{std::move(header), std::move(payload)});
261+
messageSet.messages.emplace_back(std::move(header));
262+
messageSet.messages.emplace_back(std::move(payload));
259263
REQUIRE((messageSet.messages | count_parts{}) == 1);
260264
currentSetOfInputs.emplace_back(std::move(messageSet));
261265

@@ -320,7 +324,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
320324
fair::mq::MessagePtr payload(transport->CreateMessage());
321325
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
322326
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
323-
messageSet.add(PartRef{std::move(header), std::move(payload)});
327+
messageSet.messages.emplace_back(std::move(header));
328+
messageSet.messages.emplace_back(std::move(payload));
324329
REQUIRE((messageSet.messages | count_parts{}) == 1);
325330
currentSetOfInputs.emplace_back(std::move(messageSet));
326331

@@ -383,7 +388,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
383388
fair::mq::MessagePtr payload(transport->CreateMessage());
384389
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
385390
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
386-
messageSet.add(PartRef{std::move(header), std::move(payload)});
391+
messageSet.messages.emplace_back(std::move(header));
392+
messageSet.messages.emplace_back(std::move(payload));
387393
REQUIRE((messageSet.messages | count_parts{}) == 1);
388394
currentSetOfInputs.emplace_back(std::move(messageSet));
389395

@@ -454,12 +460,14 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
454460
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
455461
auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
456462
MessageSet messageSet1;
457-
messageSet1.add(PartRef{std::move(header1), std::move(payload1)});
463+
messageSet1.messages.emplace_back(std::move(header1));
464+
messageSet1.messages.emplace_back(std::move(payload1));
458465
REQUIRE((messageSet1.messages | count_parts{}) == 1);
459466

460467
auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
461468
MessageSet messageSet2;
462-
messageSet2.add(PartRef{std::move(header2), std::move(payload2)});
469+
messageSet2.messages.emplace_back(std::move(header2));
470+
messageSet2.messages.emplace_back(std::move(payload2));
463471
REQUIRE((messageSet2.messages | count_parts{}) == 1);
464472
currentSetOfInputs.emplace_back(std::move(messageSet1));
465473
currentSetOfInputs.emplace_back(std::move(messageSet2));
@@ -524,7 +532,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
524532
fair::mq::MessagePtr payload(transport->CreateMessage());
525533
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
526534
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
527-
messageSet.add(PartRef{std::move(header), std::move(payload)});
535+
messageSet.messages.emplace_back(std::move(header));
536+
messageSet.messages.emplace_back(std::move(payload));
528537
REQUIRE((messageSet.messages | count_parts{}) == 1);
529538
currentSetOfInputs.emplace_back(std::move(messageSet));
530539

@@ -602,10 +611,13 @@ TEST_CASE("ForwardInputsSplitPayload")
602611
auto fillMessages = [&messages](size_t t) -> fair::mq::MessagePtr {
603612
return std::move(messages[t]);
604613
};
605-
messageSet.add(fillMessages, 3);
614+
for (size_t i = 0; i < 3; ++i) {
615+
messageSet.messages.emplace_back(fillMessages(i));
616+
}
606617
auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
607618
PartRef part{std::move(header2), transport->CreateMessage()};
608-
messageSet.add(std::move(part));
619+
messageSet.messages.emplace_back(std::move(part.header));
620+
messageSet.messages.emplace_back(std::move(part.payload));
609621

610622
REQUIRE((messageSet.messages | count_parts{}) == 2);
611623
currentSetOfInputs.emplace_back(std::move(messageSet));
@@ -726,7 +738,8 @@ TEST_CASE("ForwardInputEOSSingleRoute")
726738
fair::mq::MessagePtr payload(transport->CreateMessage());
727739
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
728740
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
729-
messageSet.add(PartRef{std::move(header), std::move(payload)});
741+
messageSet.messages.emplace_back(std::move(header));
742+
messageSet.messages.emplace_back(std::move(payload));
730743
REQUIRE((messageSet.messages | count_parts{}) == 1);
731744
currentSetOfInputs.emplace_back(std::move(messageSet));
732745

@@ -771,7 +784,8 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute")
771784
fair::mq::MessagePtr payload(transport->CreateMessage());
772785
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
773786
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
774-
messageSet.add(PartRef{std::move(header), std::move(payload)});
787+
messageSet.messages.emplace_back(std::move(header));
788+
messageSet.messages.emplace_back(std::move(payload));
775789
REQUIRE((messageSet.messages | count_parts{}) == 1);
776790
currentSetOfInputs.emplace_back(std::move(messageSet));
777791

Framework/Core/test/test_MessageSet.cxx

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ TEST_CASE("MessageSet")
3636
std::vector<fair::mq::MessagePtr> ptrs;
3737
ptrs.emplace_back(std::move(header));
3838
ptrs.emplace_back(std::move(msg2));
39-
msgSet.add([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 2);
39+
for (size_t i = 0; i < 2; ++i) {
40+
msgSet.messages.emplace_back(std::move(ptrs[i]));
41+
}
4042

4143
REQUIRE(msgSet.messages.size() == 2);
4244
REQUIRE((msgSet.messages | count_payloads{}) == 1);
@@ -126,7 +128,8 @@ TEST_CASE("MessageSetAddPartRef")
126128
ptrs.emplace_back(std::move(msg2));
127129
PartRef ref{std::move(msg), std::move(msg2)};
128130
o2::framework::MessageSet msgSet;
129-
msgSet.add(std::move(ref));
131+
msgSet.messages.emplace_back(std::move(ref.header));
132+
msgSet.messages.emplace_back(std::move(ref.payload));
130133

131134
REQUIRE(msgSet.messages.size() == 2);
132135
}
@@ -155,17 +158,18 @@ TEST_CASE("MessageSetAddMultiple")
155158
std::unique_ptr<fair::mq::Message> msg3(nullptr);
156159
PartRef ref{std::move(header1), std::move(msg2)};
157160
o2::framework::MessageSet msgSet;
158-
msgSet.add(std::move(ref));
161+
msgSet.messages.emplace_back(std::move(ref.header));
162+
msgSet.messages.emplace_back(std::move(ref.payload));
159163
PartRef ref2{std::move(header2), std::move(msg2)};
160-
msgSet.add(std::move(ref2));
164+
msgSet.messages.emplace_back(std::move(ref2.header));
165+
msgSet.messages.emplace_back(std::move(ref2.payload));
161166
std::vector<fair::mq::MessagePtr> msgs;
162167
msgs.push_back(std::move(header3));
163168
msgs.push_back(std::unique_ptr<fair::mq::Message>(nullptr));
164169
msgs.push_back(std::unique_ptr<fair::mq::Message>(nullptr));
165-
msgSet.add([&msgs](size_t i) {
166-
return std::move(msgs[i]);
167-
},
168-
3);
170+
for (size_t i = 0; i < 3; ++i) {
171+
msgSet.messages.emplace_back(std::move(msgs[i]));
172+
}
169173

170174
REQUIRE(msgSet.messages.size() == 7);
171175

0 commit comments

Comments
 (0)