Skip to content

Commit 75f3f9e

Browse files
committed
DPL: Ensure lifecycle of MessageSet
By exploiting the fact that messages go through the "add" method only once, we make sure that they get forwarded as soon as possible.
1 parent 72b9f51 commit 75f3f9e

File tree

11 files changed

+405
-144
lines changed

11 files changed

+405
-144
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ o2_add_library(Framework
105105
src/StringContext.cxx
106106
src/LogParsingHelpers.cxx
107107
src/MessageContext.cxx
108+
src/MessageSet.cxx
108109
src/Metric2DViewIndex.cxx
109110
src/SimpleOptionsRetriever.cxx
110111
src/O2ControlHelpers.cxx

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ class DataRelayer
104104
TimesliceIndex&,
105105
ServiceRegistryRef);
106106

107+
~DataRelayer();
108+
107109
/// This invokes the appropriate `InputRoute::danglingChecker` on every
108110
/// entry in the cache and if it returns true, it creates a new
109111
/// cache entry by invoking the associated `InputRoute::expirationHandler`.
Lines changed: 111 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
// All rights not expressly granted are reserved.
44
//
@@ -8,19 +8,47 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_MESSAGESET_H
12-
#define FRAMEWORK_MESSAGESET_H
11+
#ifndef O2_FRAMEWORK_MESSAGESET_H_
12+
#define O2_FRAMEWORK_MESSAGESET_H_
1313

1414
#include "Framework/PartRef.h"
15+
#include <Message.h>
16+
#include <fairmq/FwdDecls.h>
1517
#include <memory>
1618
#include <vector>
1719
#include <cassert>
20+
#include <concepts>
1821

19-
namespace o2
20-
{
21-
namespace framework
22+
namespace o2::framework
2223
{
2324

25+
template <typename T>
26+
concept MessageSetFiller = requires(T t, size_t n) {
27+
{ t(n) } -> std::same_as<fair::mq::MessagePtr>;
28+
};
29+
30+
template <typename T>
31+
concept MessageSetCounter = requires(fair::mq::MessagePtr && (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) {
32+
{ t(std::forward<fair::mq::MessagePtr>(ref)) } -> std::same_as<fair::mq::MessagePtr&&>;
33+
};
34+
35+
template <typename T>
36+
concept MessageSetDisposer = requires(void (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) {
37+
{ t(std::forward<fair::mq::MessagePtr>(ref)) } -> std::same_as<void>;
38+
};
39+
40+
// Sometimes we fill from a PartRef, e.g. (header, payload pair)
41+
// So we need some special code for it.
42+
template <typename T>
43+
concept PartRefFiller = requires(T t, size_t n) {
44+
{ t(n) } -> std::same_as<PartRef>;
45+
};
46+
47+
template <typename T>
48+
concept PartRefCounter = requires(PartRef && (*t)(PartRef&&), PartRef&& ref) {
49+
{ t(std::forward<PartRef>(ref)) } -> std::same_as<PartRef&&>;
50+
};
51+
2452
/// A set of inflight messages.
2553
/// The messages are stored in a linear vector. Originally, an O2 message was
2654
/// comprised of a header-payload pair which makes indexing of pairs in the
@@ -31,20 +59,30 @@ namespace framework
3159
/// O2 message model. For this purpose, also the pair index is filled and can
3260
/// be used to access header and payload associated with a pair
3361
struct MessageSet {
62+
static auto passthrough(fair::mq::MessagePtr&& ref) -> fair::mq::MessagePtr&& { return std::forward<fair::mq::MessagePtr>(ref); }
63+
static auto passthrough_partref(o2::framework::PartRef&& ref) -> o2::framework::PartRef&& { return std::forward<o2::framework::PartRef>(ref); }
64+
// Use this when you want to delete the messages on clear.
65+
static auto destroy_message(fair::mq::MessagePtr&& ref) -> void
66+
{
67+
fair::mq::MessagePtr toDelete(nullptr);
68+
ref.swap(toDelete);
69+
}
70+
static auto noop(fair::mq::MessagePtr&& ref) -> void {}
71+
static auto assert_empty(fair::mq::MessagePtr&& ref) -> void { assert(ref.get() == nullptr); }
72+
static auto enforce_empty(fair::mq::MessagePtr&& ref) -> void;
73+
3474
struct Index {
35-
Index(size_t p, size_t s) : position(p), size(s) {}
3675
size_t position = 0;
3776
size_t size = 0;
3877
};
3978
// linear storage of messages
40-
std::vector<fair::mq::MessagePtr> messages;
79+
std::vector<std::unique_ptr<fair::mq::Message>> messages;
4180
// message map describes O2 messages consisting of a header message and
4281
// payload message(s), index describes position in the linear storage
4382
std::vector<Index> messageMap;
4483
// pair map describes all messages in one sequence of header-payload pairs and
4584
// where in the message index the associated header and payload can be found
4685
struct PairMapping {
47-
PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
4886
// O2 message where the pair is located in
4987
size_t partIndex = 0;
5088
// payload index within the O2 message
@@ -57,17 +95,44 @@ struct MessageSet {
5795
{
5896
}
5997

60-
template <typename F>
61-
MessageSet(F getter, size_t size)
98+
// Allow creating a message set via a getter.
99+
// The counting function will be invoked only
100+
// once per message. If you want to augment a
101+
// MessageSet use the merge method.
102+
MessageSet(MessageSetFiller auto getter, size_t size, MessageSetCounter auto counter)
103+
: messages(), messageMap(), pairMap()
104+
{
105+
messages.reserve(size);
106+
pairMap.reserve(size - 1);
107+
messageMap.emplace_back(Index{.position = 0, .size = size - 1});
108+
for (size_t i = 0; i < size; ++i) {
109+
if (i > 0) {
110+
pairMap.emplace_back(0, i - 1);
111+
}
112+
messages.emplace_back(std::move(counter(getter(i))));
113+
}
114+
}
115+
116+
MessageSet(PartRefFiller auto filler, size_t nPartRef, PartRefCounter auto counter)
62117
: messages(), messageMap(), pairMap()
63118
{
64-
add(std::forward<F>(getter), size);
119+
messages.reserve(2 * nPartRef); // Beacause messages contains all the messages
120+
messageMap.reserve(nPartRef); // Because the message map tracks how many (header, payload0, payload1 ...) there are
121+
pairMap.reserve(2 * nPartRef - 1); // Because pairMap tracks (header, payload0), (header, payload1), etc.
122+
123+
for (size_t i = 0; i < nPartRef; ++i) {
124+
pairMap.emplace_back(PairMapping{.partIndex = messageMap.size(), .payloadIndex = 0}); // Because this is the first one
125+
messageMap.emplace_back(Index{.position = i, .size = 1}); // Because a PartRef only has 2 messages (and 1 payload)
126+
o2::framework::PartRef ref = counter(filler(i));
127+
messages.emplace_back(std::move(ref.header));
128+
messages.emplace_back(std::move(ref.payload));
129+
}
65130
}
66131

67132
MessageSet(MessageSet&& other)
68133
: messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
69134
{
70-
other.clear();
135+
other.clear(MessageSet::noop);
71136
}
72137

73138
MessageSet& operator=(MessageSet&& other)
@@ -78,107 +143,98 @@ struct MessageSet {
78143
messages = std::move(other.messages);
79144
messageMap = std::move(other.messageMap);
80145
pairMap = std::move(other.pairMap);
81-
other.clear();
146+
other.clear(MessageSet::noop);
82147
return *this;
83148
}
84149

150+
~MessageSet();
151+
85152
/// get number of in-flight O2 messages
86-
size_t size() const
153+
[[nodiscard]] size_t size() const
87154
{
88155
return messageMap.size();
89156
}
90157

91158
/// get number of header-payload pairs
92-
size_t getNumberOfPairs() const
159+
[[nodiscard]] size_t getNumberOfPairs() const
93160
{
94161
return pairMap.size();
95162
}
96163

97164
/// get number of payloads for an in-flight message
98-
size_t getNumberOfPayloads(size_t mi) const
165+
[[nodiscard]] size_t getNumberOfPayloads(size_t mi) const
99166
{
100167
return messageMap[mi].size;
101168
}
102169

103170
/// clear the set
104-
void clear()
171+
void clear(MessageSetDisposer auto dispose)
105172
{
173+
for (auto& message : messages) {
174+
dispose(std::move(message));
175+
}
106176
messages.clear();
107177
messageMap.clear();
108178
pairMap.clear();
109179
}
110180

111-
// this is more or less legacy
112-
// PartRef has been earlier used to store fixed header-payload pairs
113-
// reset the set and store content of the part ref
114-
void reset(PartRef&& ref)
115-
{
116-
clear();
117-
add(std::move(ref));
118-
}
119-
120-
// this is more or less legacy
121-
// PartRef has been earlier used to store fixed header-payload pairs
122-
// add content of the part ref
123-
void add(PartRef&& ref)
124-
{
125-
pairMap.emplace_back(messageMap.size(), 0);
126-
messageMap.emplace_back(messages.size(), 1);
127-
messages.emplace_back(std::move(ref.header));
128-
messages.emplace_back(std::move(ref.payload));
129-
}
130-
131-
/// add an O2 message
132-
template <typename F>
133-
void add(F getter, size_t size)
181+
/// Add messages in bulk. We are guaranteed that this
182+
/// function is executed only once for each incoming message
183+
/// so it can be used to trigger the early forwarding.
184+
///
185+
void merge(MessageSet&& other)
134186
{
135187
auto partid = messageMap.size();
136-
messageMap.emplace_back(messages.size(), size - 1);
137-
for (size_t i = 0; i < size; ++i) {
188+
messageMap.emplace_back(messages.size(), other.messages.size() - 1);
189+
for (size_t i = 0; i < other.messages.size(); ++i) {
138190
if (i > 0) {
139191
pairMap.emplace_back(partid, i - 1);
140192
}
141-
messages.emplace_back(std::move(getter(i)));
193+
messages.emplace_back(std::move(other.messages[i]));
142194
}
195+
// Every message should be removed once the MessageSet is
196+
// merged.
197+
other.clear(MessageSet::assert_empty);
143198
}
144199

145-
fair::mq::MessagePtr& header(size_t partIndex)
200+
// This should really be used to give ownership to something else.
201+
[[nodiscard]] std::unique_ptr<fair::mq::Message> extractHeader(size_t partIndex)
146202
{
147-
return messages[messageMap[partIndex].position];
203+
return std::move(messages[messageMap[partIndex].position]);
148204
}
149205

150-
fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
206+
[[nodiscard]] std::unique_ptr<fair::mq::Message> extractPayload(size_t partIndex, size_t payloadIndex = 0)
151207
{
152208
assert(partIndex < messageMap.size());
153209
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
154-
return messages[messageMap[partIndex].position + payloadIndex + 1];
210+
return std::move(messages[messageMap[partIndex].position + payloadIndex + 1]);
155211
}
156212

157-
fair::mq::MessagePtr const& header(size_t partIndex) const
213+
[[nodiscard]] std::unique_ptr<fair::mq::Message> const& header(size_t partIndex) const
158214
{
159215
return messages[messageMap[partIndex].position];
160216
}
161217

162-
fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
218+
[[nodiscard]] std::unique_ptr<fair::mq::Message> const& payload(size_t partIndex, size_t payloadIndex = 0) const
163219
{
164220
assert(partIndex < messageMap.size());
165221
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
166222
return messages[messageMap[partIndex].position + payloadIndex + 1];
167223
}
168224

169-
fair::mq::MessagePtr const& associatedHeader(size_t pos) const
225+
[[nodiscard]] std::unique_ptr<fair::mq::Message> const& associatedHeader(size_t pos) const
170226
{
171227
return messages[messageMap[pairMap[pos].partIndex].position];
172228
}
173229

174-
fair::mq::MessagePtr const& associatedPayload(size_t pos) const
230+
[[nodiscard]] std::unique_ptr<fair::mq::Message> const& associatedPayload(size_t pos) const
175231
{
176232
auto partIndex = pairMap[pos].partIndex;
177233
auto payloadIndex = pairMap[pos].payloadIndex;
178234
return messages[messageMap[partIndex].position + payloadIndex + 1];
179235
}
180236
};
181237

182-
} // namespace framework
183-
} // namespace o2
184-
#endif // FRAMEWORK_MESSAGESET_H
238+
} // namespace o2::framework
239+
240+
#endif // O2_FRAMEWORK_MESSAGESET_H_

Framework/Core/include/Framework/PartRef.h

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_PARTREF_H
12-
#define FRAMEWORK_PARTREF_H
11+
#ifndef O2_FRAMEWORK_PARTREF_H_
12+
#define O2_FRAMEWORK_PARTREF_H_
1313

1414
#include <memory>
15-
1615
#include <fairmq/FwdDecls.h>
1716

18-
namespace o2
19-
{
20-
namespace framework
17+
namespace o2::framework
2118
{
2219

2320
/// Reference to an inflight part.
@@ -26,6 +23,6 @@ struct PartRef {
2623
std::unique_ptr<fair::mq::Message> payload;
2724
};
2825

29-
} // namespace framework
30-
} // namespace o2
26+
} // namespace o2::framework
27+
3128
#endif // FRAMEWORK_PARTREF_H

0 commit comments

Comments
 (0)