Skip to content

Commit c85788d

Browse files
committed
DPL: migrate away from MessageSet
Use a vector of messages instead. To be replaced by a B-Tree which is able to keep track of all inputs / slots in a less rigid manner.
1 parent 65b055f commit c85788d

File tree

11 files changed

+296
-411
lines changed

11 files changed

+296
-411
lines changed

Framework/Core/include/Framework/DataModelViews.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
#include "DomainInfoHeader.h"
1717
#include "SourceInfoHeader.h"
1818
#include "Headers/DataHeader.h"
19+
#include "Framework/TimesliceSlot.h"
1920
#include <ranges>
21+
#include <span>
2022

2123
namespace o2::framework
2224
{
@@ -213,13 +215,11 @@ struct get_num_payloads {
213215
}
214216
};
215217

216-
struct MessageSet;
217-
218218
struct inputs_for_slot {
219219
TimesliceSlot slot;
220220
template <typename R>
221221
requires requires(R r) { requires std::ranges::random_access_range<decltype(r.sets)>; }
222-
friend std::span<o2::framework::MessageSet> operator|(R&& r, inputs_for_slot self)
222+
friend auto operator|(R&& r, inputs_for_slot self)
223223
{
224224
return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
225225
}
@@ -231,7 +231,7 @@ struct messages_for_input {
231231
requires std::ranges::random_access_range<R>
232232
friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
233233
{
234-
return r[self.inputIdx].messages;
234+
return std::span(r[self.inputIdx]);
235235
}
236236
};
237237

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "Framework/TimesliceSlot.h"
1616
#include "Framework/TimesliceIndex.h"
1717
#include <fairmq/FwdDecls.h>
18+
#include <fairmq/Message.h>
1819
#include <vector>
1920
#include <span>
2021

@@ -29,7 +30,6 @@ struct OutputChannelState;
2930
struct ProcessingPolicies;
3031
struct DeviceSpec;
3132
struct FairMQDeviceProxy;
32-
struct MessageSet;
3333
struct ChannelIndex;
3434
enum struct StreamingState;
3535
enum struct TransitionHandlingState;
@@ -54,7 +54,7 @@ struct DataProcessingHelpers {
5454
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
5555
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
5656
/// Helper to route messages for forwarding
57-
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
57+
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
5858
bool copy, bool consume);
5959
/// Helper to route messages for forwarding
6060
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#include "Framework/DataDescriptorMatcher.h"
1717
#include "Framework/ForwardRoute.h"
1818
#include "Framework/CompletionPolicy.h"
19-
#include "Framework/MessageSet.h"
19+
#include <fairmq/Message.h>
2020
#include "Framework/TimesliceIndex.h"
2121
#include "Framework/Tracing.h"
2222
#include "Framework/TimesliceSlot.h"
@@ -113,7 +113,7 @@ class DataRelayer
113113
ActivityStats processDanglingInputs(std::vector<ExpirationHandler> const&,
114114
ServiceRegistryRef context, bool createNew);
115115

116-
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)>;
116+
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::vector<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;
117117

118118
// Callback for when some messages are about to be owned by the the DataRelayer
119119
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;
@@ -156,8 +156,8 @@ class DataRelayer
156156
/// Returns an input registry associated to the given timeslice and gives
157157
/// ownership to the caller. This is because once the inputs are out of the
158158
/// DataRelayer they need to be deleted once the processing is concluded.
159-
std::vector<MessageSet> consumeAllInputsForTimeslice(TimesliceSlot id);
160-
std::vector<MessageSet> consumeExistingInputsForTimeslice(TimesliceSlot id);
159+
std::vector<std::vector<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
160+
std::vector<std::vector<fair::mq::MessagePtr>> consumeExistingInputsForTimeslice(TimesliceSlot id);
161161

162162
/// Returns how many timeslices we can handle in parallel
163163
[[nodiscard]] size_t getParallelTimeslices() const;
@@ -203,7 +203,7 @@ class DataRelayer
203203
/// Notice that we store them as a NxM sized vector, where
204204
/// N is the maximum number of inflight timeslices, while
205205
/// M is the number of inputs which are requested.
206-
std::vector<MessageSet> mCache;
206+
std::vector<std::vector<fair::mq::MessagePtr>> mCache;
207207

208208
/// This is the index which maps a given timestamp to the associated
209209
/// cacheline.

Framework/Core/include/Framework/MessageSet.h

Lines changed: 0 additions & 71 deletions
This file was deleted.

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
#include "DecongestionService.h"
5252
#include "Framework/DataProcessingHelpers.h"
53+
#include "Framework/DataModelViews.h"
5354
#include "DataRelayerHelpers.h"
5455
#include "Headers/DataHeader.h"
5556
#include "Headers/DataHeaderHelpers.h"
@@ -585,7 +586,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
585586
// the inputs which are shared between this device and others
586587
// to the next one in the daisy chain.
587588
// FIXME: do it in a smarter way than O(N^2)
588-
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589+
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
589590
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590591
auto& proxy = registry.get<FairMQDeviceProxy>();
591592

@@ -617,7 +618,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
617618
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
618619
};
619620

620-
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
621+
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
621622
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
622623
auto& proxy = registry.get<FairMQDeviceProxy>();
623624

@@ -627,7 +628,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
627628
// Always copy them, because we do not want to actually send them.
628629
// We merely need the side effect of the consume, if applicable.
629630
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
630-
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
631+
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
631632
DataProcessingHelpers::cleanForwardedMessages(span, consume);
632633
}
633634

@@ -1278,7 +1279,7 @@ void DataProcessingDevice::Run()
12781279
// - we can trigger further events from the queue
12791280
// - we can guarantee this is the last thing we do in the loop (
12801281
// assuming no one else is adding to the queue before this point).
1281-
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1282+
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
12821283
O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
12831284
ServiceRegistryRef ref{registry};
12841285
ref.get<AsyncQueue>();
@@ -1944,7 +1945,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19441945
nPayloadsPerHeader = 1;
19451946
ii += (nMessages / 2) - 1;
19461947
}
1947-
auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1948+
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
19481949
O2_SIGNPOST_ID_GENERATE(cid, async_queue);
19491950
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
19501951
slot.index, oldestOutputInfo.timeslice.value);
@@ -2122,7 +2123,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21222123
// want to support multithreaded dispatching of operations, I can simply
21232124
// move these to some thread local store and the rest of the lambdas
21242125
// should work just fine.
2125-
std::vector<MessageSet> currentSetOfInputs;
2126+
std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
21262127

21272128
//
21282129
auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
@@ -2133,7 +2134,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21332134
currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
21342135
}
21352136
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2136-
if ((currentSetOfInputs[i].messages | count_payloads{}) > partindex) {
2137+
if ((currentSetOfInputs[i] | count_payloads{}) > partindex) {
21372138
const char* headerptr = nullptr;
21382139
const char* payloadptr = nullptr;
21392140
size_t payloadSize = 0;
@@ -2142,9 +2143,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21422143
// sequence is the header message
21432144
// - each part has one or more payload messages
21442145
// - InputRecord provides all payloads as header-payload pair
2145-
auto const indices = currentSetOfInputs[i].messages | get_pair{partindex};
2146-
auto const& headerMsg = currentSetOfInputs[i].messages[indices.headerIdx];
2147-
auto const& payloadMsg = currentSetOfInputs[i].messages[indices.payloadIdx];
2146+
auto const indices = currentSetOfInputs[i] | get_pair{partindex};
2147+
auto const& headerMsg = currentSetOfInputs[i][indices.headerIdx];
2148+
auto const& payloadMsg = currentSetOfInputs[i][indices.payloadIdx];
21482149
headerptr = static_cast<char const*>(headerMsg->GetData());
21492150
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
21502151
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
@@ -2153,10 +2154,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21532154
return DataRef{};
21542155
};
21552156
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2156-
return (currentSetOfInputs[i].messages | count_payloads{});
2157+
return (currentSetOfInputs[i] | count_payloads{});
21572158
};
21582159
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2159-
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx].messages | get_header{0}));
2160+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx] | get_header{0}));
21602161
return header.GetRefCount();
21612162
};
21622163
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
@@ -2215,7 +2216,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22152216
continue;
22162217
}
22172218
// This will hopefully delete the message.
2218-
currentSetOfInputs[ii].messages.clear();
2219+
currentSetOfInputs[ii].clear();
22192220
}
22202221
};
22212222

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,14 +393,14 @@ void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePt
393393
}
394394

395395
auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
396-
std::vector<MessageSet>& currentSetOfInputs,
396+
std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
397397
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
398398
{
399399
// we collect all messages per forward in a map and send them together
400400
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());
401401

402402
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
403-
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
403+
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
404404
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
405405
}
406406
return forwardedParts;

0 commit comments

Comments
 (0)