1- // Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+ // Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33// All rights not expressly granted are reserved.
44//
88// In applying this license CERN does not waive the privileges and immunities
99// granted to it by virtue of its status as an Intergovernmental Organization
1010// or submit itself to any jurisdiction.
11- #ifndef FRAMEWORK_MESSAGESET_H
12- #define FRAMEWORK_MESSAGESET_H
11+ #ifndef O2_FRAMEWORK_MESSAGESET_H_
12+ #define O2_FRAMEWORK_MESSAGESET_H_
1313
1414#include " Framework/PartRef.h"
15+ #include < Message.h>
16+ #include < fairmq/FwdDecls.h>
1517#include < memory>
1618#include < vector>
1719#include < cassert>
20+ #include < concepts>
1821
19- namespace o2
20- {
21- namespace framework
22+ namespace o2 ::framework
2223{
2324
25+ template <typename T>
26+ concept MessageSetFiller = requires (T t, size_t n) {
27+ { t (n) } -> std::same_as<fair::mq::MessagePtr>;
28+ };
29+
30+ template <typename T>
31+ concept MessageSetCounter = requires (fair::mq::MessagePtr && (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) {
32+ { t (std::forward<fair::mq::MessagePtr>(ref)) } -> std::same_as<fair::mq::MessagePtr&&>;
33+ };
34+
35+ template <typename T>
36+ concept MessageSetDisposer = requires (void (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) {
37+ { t (std::forward<fair::mq::MessagePtr>(ref)) } -> std::same_as<void >;
38+ };
39+
40+ // Sometimes we fill from a PartRef, e.g. (header, payload pair)
41+ // So we need some special code for it.
42+ template <typename T>
43+ concept PartRefFiller = requires (T t, size_t n) {
44+ { t (n) } -> std::same_as<PartRef>;
45+ };
46+
47+ template <typename T>
48+ concept PartRefCounter = requires (PartRef && (*t)(PartRef&&), PartRef&& ref) {
49+ { t (std::forward<PartRef>(ref)) } -> std::same_as<PartRef&&>;
50+ };
51+
2452// / A set of inflight messages.
2553// / The messages are stored in a linear vector. Originally, an O2 message was
2654// / comprised of a header-payload pair which makes indexing of pairs in the
@@ -31,20 +59,30 @@ namespace framework
3159// / O2 message model. For this purpose, also the pair index is filled and can
3260// / be used to access header and payload associated with a pair
3361struct MessageSet {
62+ static auto passthrough (fair::mq::MessagePtr&& ref) -> fair::mq::MessagePtr&& { return std::forward<fair::mq::MessagePtr>(ref); }
63+ static auto passthrough_partref (o2::framework::PartRef&& ref) -> o2::framework::PartRef&& { return std::forward<o2::framework::PartRef>(ref); }
64+ // Use this when you want to delete the messages on clear.
65+ static auto destroy_message (fair::mq::MessagePtr&& ref) -> void
66+ {
67+ fair::mq::MessagePtr toDelete (nullptr );
68+ ref.swap (toDelete);
69+ }
70+ static auto noop (fair::mq::MessagePtr&& ref) -> void {}
71+ static auto assert_empty (fair::mq::MessagePtr&& ref) -> void { assert (ref.get () == nullptr ); }
72+ static auto enforce_empty (fair::mq::MessagePtr&& ref) -> void;
73+
3474 struct Index {
35- Index (size_t p, size_t s) : position(p), size(s) {}
3675 size_t position = 0 ;
3776 size_t size = 0 ;
3877 };
3978 // linear storage of messages
40- std::vector<fair::mq::MessagePtr > messages;
79+ std::vector<std::unique_ptr< fair::mq::Message> > messages;
4180 // message map describes O2 messages consisting of a header message and
4281 // payload message(s), index describes position in the linear storage
4382 std::vector<Index> messageMap;
4483 // pair map describes all messages in one sequence of header-payload pairs and
4584 // where in the message index the associated header and payload can be found
4685 struct PairMapping {
47- PairMapping (size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
4886 // O2 message where the pair is located in
4987 size_t partIndex = 0 ;
5088 // payload index within the O2 message
@@ -57,17 +95,44 @@ struct MessageSet {
5795 {
5896 }
5997
60- template <typename F>
61- MessageSet (F getter, size_t size)
98+ // Allow creating a message set via a getter.
99+ // The counting function will be invoked only
100+ // once per message. If you want to augment a
101+ // MessageSet use the merge method.
102+ MessageSet (MessageSetFiller auto getter, size_t size, MessageSetCounter auto counter)
103+ : messages(), messageMap(), pairMap()
104+ {
105+ messages.reserve (size);
106+ pairMap.reserve (size - 1 );
107+ messageMap.emplace_back (Index{.position = 0 , .size = size - 1 });
108+ for (size_t i = 0 ; i < size; ++i) {
109+ if (i > 0 ) {
110+ pairMap.emplace_back (0 , i - 1 );
111+ }
112+ messages.emplace_back (std::move (counter (getter (i))));
113+ }
114+ }
115+
116+ MessageSet (PartRefFiller auto filler, size_t nPartRef, PartRefCounter auto counter)
62117 : messages(), messageMap(), pairMap()
63118 {
64- add (std::forward<F>(getter), size);
119+ messages.reserve (2 * nPartRef); // Beacause messages contains all the messages
120+ messageMap.reserve (nPartRef); // Because the message map tracks how many (header, payload0, payload1 ...) there are
121+ pairMap.reserve (2 * nPartRef - 1 ); // Because pairMap tracks (header, payload0), (header, payload1), etc.
122+
123+ for (size_t i = 0 ; i < nPartRef; ++i) {
124+ pairMap.emplace_back (PairMapping{.partIndex = messageMap.size (), .payloadIndex = 0 }); // Because this is the first one
125+ messageMap.emplace_back (Index{.position = i, .size = 1 }); // Because a PartRef only has 2 messages (and 1 payload)
126+ o2::framework::PartRef ref = counter (filler (i));
127+ messages.emplace_back (std::move (ref.header ));
128+ messages.emplace_back (std::move (ref.payload ));
129+ }
65130 }
66131
67132 MessageSet (MessageSet&& other)
68133 : messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
69134 {
70- other.clear ();
135+ other.clear (MessageSet::noop );
71136 }
72137
73138 MessageSet& operator =(MessageSet&& other)
@@ -78,107 +143,98 @@ struct MessageSet {
78143 messages = std::move (other.messages );
79144 messageMap = std::move (other.messageMap );
80145 pairMap = std::move (other.pairMap );
81- other.clear ();
146+ other.clear (MessageSet::noop );
82147 return *this ;
83148 }
84149
150+ ~MessageSet ();
151+
85152 // / get number of in-flight O2 messages
86- size_t size () const
153+ [[nodiscard]] size_t size () const
87154 {
88155 return messageMap.size ();
89156 }
90157
91158 // / get number of header-payload pairs
92- size_t getNumberOfPairs () const
159+ [[nodiscard]] size_t getNumberOfPairs () const
93160 {
94161 return pairMap.size ();
95162 }
96163
97164 // / get number of payloads for an in-flight message
98- size_t getNumberOfPayloads (size_t mi) const
165+ [[nodiscard]] size_t getNumberOfPayloads (size_t mi) const
99166 {
100167 return messageMap[mi].size ;
101168 }
102169
103170 // / clear the set
104- void clear ()
171+ void clear (MessageSetDisposer auto dispose )
105172 {
173+ for (auto & message : messages) {
174+ dispose (std::move (message));
175+ }
106176 messages.clear ();
107177 messageMap.clear ();
108178 pairMap.clear ();
109179 }
110180
111- // this is more or less legacy
112- // PartRef has been earlier used to store fixed header-payload pairs
113- // reset the set and store content of the part ref
114- void reset (PartRef&& ref)
115- {
116- clear ();
117- add (std::move (ref));
118- }
119-
120- // this is more or less legacy
121- // PartRef has been earlier used to store fixed header-payload pairs
122- // add content of the part ref
123- void add (PartRef&& ref)
124- {
125- pairMap.emplace_back (messageMap.size (), 0 );
126- messageMap.emplace_back (messages.size (), 1 );
127- messages.emplace_back (std::move (ref.header ));
128- messages.emplace_back (std::move (ref.payload ));
129- }
130-
131- // / add an O2 message
132- template <typename F>
133- void add (F getter, size_t size)
181+ // / Add messages in bulk. We are guaranteed that this
182+ // / function is executed only once for each incoming message
183+ // / so it can be used to trigger the early forwarding.
184+ // /
185+ void merge (MessageSet&& other)
134186 {
135187 auto partid = messageMap.size ();
136- messageMap.emplace_back (messages.size (), size - 1 );
137- for (size_t i = 0 ; i < size; ++i) {
188+ messageMap.emplace_back (messages.size (), other. messages . size () - 1 );
189+ for (size_t i = 0 ; i < other. messages . size () ; ++i) {
138190 if (i > 0 ) {
139191 pairMap.emplace_back (partid, i - 1 );
140192 }
141- messages.emplace_back (std::move (getter (i) ));
193+ messages.emplace_back (std::move (other. messages [i] ));
142194 }
195+ // Every message should be removed once the MessageSet is
196+ // merged.
197+ other.clear (MessageSet::assert_empty);
143198 }
144199
145- fair::mq::MessagePtr& header (size_t partIndex)
200+ // This should really be used to give ownership to something else.
201+ [[nodiscard]] std::unique_ptr<fair::mq::Message> extractHeader (size_t partIndex)
146202 {
147- return messages[messageMap[partIndex].position ];
203+ return std::move ( messages[messageMap[partIndex].position ]) ;
148204 }
149205
150- fair::mq::MessagePtr& payload (size_t partIndex, size_t payloadIndex = 0 )
206+ [[nodiscard]] std::unique_ptr< fair::mq::Message> extractPayload (size_t partIndex, size_t payloadIndex = 0 )
151207 {
152208 assert (partIndex < messageMap.size ());
153209 assert (messageMap[partIndex].position + payloadIndex + 1 < messages.size ());
154- return messages[messageMap[partIndex].position + payloadIndex + 1 ];
210+ return std::move ( messages[messageMap[partIndex].position + payloadIndex + 1 ]) ;
155211 }
156212
157- fair::mq::MessagePtr const & header (size_t partIndex) const
213+ [[nodiscard]] std::unique_ptr< fair::mq::Message> const & header (size_t partIndex) const
158214 {
159215 return messages[messageMap[partIndex].position ];
160216 }
161217
162- fair::mq::MessagePtr const & payload (size_t partIndex, size_t payloadIndex = 0 ) const
218+ [[nodiscard]] std::unique_ptr< fair::mq::Message> const & payload (size_t partIndex, size_t payloadIndex = 0 ) const
163219 {
164220 assert (partIndex < messageMap.size ());
165221 assert (messageMap[partIndex].position + payloadIndex + 1 < messages.size ());
166222 return messages[messageMap[partIndex].position + payloadIndex + 1 ];
167223 }
168224
169- fair::mq::MessagePtr const & associatedHeader (size_t pos) const
225+ [[nodiscard]] std::unique_ptr< fair::mq::Message> const & associatedHeader (size_t pos) const
170226 {
171227 return messages[messageMap[pairMap[pos].partIndex ].position ];
172228 }
173229
174- fair::mq::MessagePtr const & associatedPayload (size_t pos) const
230+ [[nodiscard]] std::unique_ptr< fair::mq::Message> const & associatedPayload (size_t pos) const
175231 {
176232 auto partIndex = pairMap[pos].partIndex ;
177233 auto payloadIndex = pairMap[pos].payloadIndex ;
178234 return messages[messageMap[partIndex].position + payloadIndex + 1 ];
179235 }
180236};
181237
182- } // namespace framework
183- } // namespace o2
184- #endif // FRAMEWORK_MESSAGESET_H
238+ } // namespace o2:: framework
239+
240+ #endif // O2_FRAMEWORK_MESSAGESET_H_
0 commit comments