Skip to content

Commit 54a490d

Browse files
committed
DPL: move away from MessageSet::header / payload
Abstract header / payload retrieval, with the idea that get_header / get_payload will work on any range of fair::mq::MessagePtrs.
1 parent 056b5f4 commit 54a490d

File tree

5 files changed

+21
-45
lines changed

5 files changed

+21
-45
lines changed

Framework/Core/include/Framework/DataModelViews.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ struct get_header {
153153
// ends the pipeline, returns the number of parts
154154
template <typename R>
155155
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
156-
friend fair::mq::MessagePtr& operator|(R&& r, get_header self)
156+
friend auto& operator|(R&& r, get_header self)
157157
{
158158
return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
159159
}
@@ -165,7 +165,7 @@ struct get_payload {
165165
// ends the pipeline, returns the number of parts
166166
template <typename R>
167167
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
168-
friend fair::mq::MessagePtr& operator|(R&& r, get_payload self)
168+
friend auto& operator|(R&& r, get_payload self)
169169
{
170170
return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
171171
}

Framework/Core/include/Framework/MessageSet.h

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -136,30 +136,6 @@ struct MessageSet {
136136
}
137137
}
138138

139-
fair::mq::MessagePtr& header(size_t partIndex)
140-
{
141-
return messages[messageMap[partIndex].position];
142-
}
143-
144-
fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
145-
{
146-
assert(partIndex < messageMap.size());
147-
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
148-
return messages[messageMap[partIndex].position + payloadIndex + 1];
149-
}
150-
151-
fair::mq::MessagePtr const& header(size_t partIndex) const
152-
{
153-
return messages[messageMap[partIndex].position];
154-
}
155-
156-
fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
157-
{
158-
assert(partIndex < messageMap.size());
159-
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
160-
return messages[messageMap[partIndex].position + payloadIndex + 1];
161-
}
162-
163139
fair::mq::MessagePtr const& associatedHeader(size_t pos) const
164140
{
165141
return messages[messageMap[pairMap[pos].partIndex].position];

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21532153
return currentSetOfInputs[i].getNumberOfPairs();
21542154
};
21552155
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2156-
auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2156+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx].messages | get_header{0}));
21572157
return header.GetRefCount();
21582158
};
21592159
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};

Framework/Core/src/DataRelayer.cxx

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
184184
// We check that no data is already there for the given cell
185185
// it is enough to check the first element
186186
auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
187-
if (!part.messages.empty() && part.header(0) != nullptr) {
187+
if (!part.messages.empty() && (part.messages | get_header{0}) != nullptr) {
188188
headerPresent++;
189189
continue;
190190
}
191-
if (!part.messages.empty() && part.payload(0) != nullptr) {
191+
if (!part.messages.empty() && (part.messages | get_payload{0, 0}) != nullptr) {
192192
payloadPresent++;
193193
continue;
194194
}
@@ -213,9 +213,9 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
213213
auto partial = getPartialRecord(ti);
214214
// TODO: get the data ref from message model
215215
auto getter = [&partial](size_t idx, size_t part) {
216-
if (!partial[idx].messages.empty() && partial[idx].header(part).get()) {
217-
auto header = partial[idx].header(part).get();
218-
auto payload = partial[idx].payload(part).get();
216+
if (!partial[idx].messages.empty() && (partial[idx].messages | get_header{part}).get()) {
217+
auto header = (partial[idx].messages | get_header{part}).get();
218+
auto payload = (partial[idx].messages | get_payload{part, 0}).get();
219219
return DataRef{nullptr,
220220
reinterpret_cast<const char*>(header->GetData()),
221221
reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
@@ -227,7 +227,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
227227
return partial[idx].messages | count_parts{};
228228
};
229229
auto refCountGetter = [&partial](size_t idx) -> int {
230-
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
230+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx].messages | get_header{0}));
231231
return header.GetRefCount();
232232
};
233233
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
@@ -246,8 +246,8 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
246246
activity.expiredSlots++;
247247

248248
mTimesliceIndex.markAsDirty(slot, true);
249-
assert(part.header(0) != nullptr);
250-
assert(part.payload(0) != nullptr);
249+
assert((part.messages | get_header{0}) != nullptr);
250+
assert((part.messages | get_payload{0, 0}) != nullptr);
251251
}
252252
}
253253
LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
@@ -786,9 +786,9 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
786786
auto partial = getPartialRecord(li);
787787
// TODO: get the data ref from message model
788788
auto getter = [&partial](size_t idx, size_t part) {
789-
if (!partial[idx].messages.empty() && partial[idx].header(part).get()) {
790-
auto header = partial[idx].header(part).get();
791-
auto payload = partial[idx].payload(part).get();
789+
if (!partial[idx].messages.empty() && (partial[idx].messages | get_header{part}).get()) {
790+
auto header = (partial[idx].messages | get_header{part}).get();
791+
auto payload = (partial[idx].messages | get_payload{part, 0}).get();
792792
return DataRef{nullptr,
793793
reinterpret_cast<const char*>(header->GetData()),
794794
reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
@@ -800,7 +800,7 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
800800
return partial[idx].messages | count_parts{};
801801
};
802802
auto refCountGetter = [&partial](size_t idx) -> int {
803-
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
803+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx].messages | get_header{0}));
804804
return header.GetRefCount();
805805
};
806806
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
@@ -952,10 +952,10 @@ std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTime
952952
// TODO: in the original implementation of the cache, there have been only two messages per entry,
953953
// check if the 2 above corresponds to the number of messages.
954954
for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) {
955-
auto& header = cache[cacheId].header(pi);
955+
auto& header = cache[cacheId].messages | get_header{pi};
956956
auto&& newHeader = header->GetTransport()->CreateMessage();
957957
newHeader->Copy(*header);
958-
messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
958+
messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].messages | get_payload{pi, 0})});
959959
}
960960
};
961961

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -798,11 +798,11 @@ TEST_CASE("DataRelayer")
798798
// one message set containing number of added sequences of messages
799799
REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size());
800800
size_t counter = 0;
801-
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
801+
for (size_t seqid = 0; seqid < sequenceSize.size(); ++seqid) {
802802
REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
803-
for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804-
REQUIRE(messageSet[0].payload(seqid, pi));
805-
auto const* data = messageSet[0].payload(seqid, pi)->GetData();
803+
for (size_t pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804+
REQUIRE((messageSet[0].messages | get_payload{seqid, pi}));
805+
auto const* data = (messageSet[0].messages | get_payload{seqid, pi})->GetData();
806806
REQUIRE(*(reinterpret_cast<size_t const*>(data)) == counter);
807807
++counter;
808808
}

0 commit comments

Comments
 (0)