5050
5151#include " DecongestionService.h"
5252#include " Framework/DataProcessingHelpers.h"
53+ #include " Framework/DataModelViews.h"
5354#include " DataRelayerHelpers.h"
5455#include " Headers/DataHeader.h"
5556#include " Headers/DataHeaderHelpers.h"
@@ -585,7 +586,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
585586// the inputs which are shared between this device and others
586587// to the next one in the daisy chain.
587588// FIXME: do it in a smarter way than O(N^2)
588- static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet >& currentSetOfInputs,
589+ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr> >& currentSetOfInputs,
589590 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true ) {
590591 auto & proxy = registry.get <FairMQDeviceProxy>();
591592
@@ -617,7 +618,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
617618 O2_SIGNPOST_END (forwarding, sid, " forwardInputs" , " Forwarding done" );
618619};
619620
620- static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet >& currentSetOfInputs,
621+ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr> >& currentSetOfInputs,
621622 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true ) {
622623 auto & proxy = registry.get <FairMQDeviceProxy>();
623624
@@ -627,7 +628,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
627628 // Always copy them, because we do not want to actually send them.
628629 // We merely need the side effect of the consume, if applicable.
629630 for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
630- auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]. messages );
631+ auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
631632 DataProcessingHelpers::cleanForwardedMessages (span, consume);
632633 }
633634
@@ -1278,7 +1279,7 @@ void DataProcessingDevice::Run()
12781279 // - we can trigger further events from the queue
12791280 // - we can guarantee this is the last thing we do in the loop (
12801281 // assuming no one else is adding to the queue before this point).
1281- auto onDrop = [®istry = mServiceRegistry , lid](TimesliceSlot slot, std::vector<MessageSet >& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1282+ auto onDrop = [®istry = mServiceRegistry , lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr> >& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
12821283 O2_SIGNPOST_START (device, lid, " run_loop" , " Dropping message from slot %" PRIu64 " . Forwarding as needed." , (uint64_t )slot.index );
12831284 ServiceRegistryRef ref{registry};
12841285 ref.get <AsyncQueue>();
@@ -1942,7 +1943,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19421943 nPayloadsPerHeader = 1 ;
19431944 ii += (nMessages / 2 ) - 1 ;
19441945 }
1945- auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet >& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1946+ auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr> >& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
19461947 O2_SIGNPOST_ID_GENERATE (cid, async_queue);
19471948 O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " onDrop" , " Dropping message from slot %zu. Forwarding as needed. Timeslice %zu" ,
19481949 slot.index , oldestOutputInfo.timeslice .value );
@@ -2120,7 +2121,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21202121 // want to support multithreaded dispatching of operations, I can simply
21212122 // move these to some thread local store and the rest of the lambdas
21222123 // should work just fine.
2123- std::vector<MessageSet > currentSetOfInputs;
2124+ std::vector<std::vector<fair::mq::MessagePtr> > currentSetOfInputs;
21242125
21252126 //
21262127 auto getInputSpan = [ref, ¤tSetOfInputs](TimesliceSlot slot, bool consume = true ) {
@@ -2131,7 +2132,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21312132 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice (slot);
21322133 }
21332134 auto getter = [¤tSetOfInputs](size_t i, size_t partindex) -> DataRef {
2134- if ((currentSetOfInputs[i]. messages | count_payloads{}) > partindex) {
2135+ if ((currentSetOfInputs[i] | count_payloads{}) > partindex) {
21352136 const char * headerptr = nullptr ;
21362137 const char * payloadptr = nullptr ;
21372138 size_t payloadSize = 0 ;
@@ -2140,9 +2141,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21402141 // sequence is the header message
21412142 // - each part has one or more payload messages
21422143 // - InputRecord provides all payloads as header-payload pair
2143- auto const indices = currentSetOfInputs[i]. messages | get_pair{partindex};
2144- auto const & headerMsg = currentSetOfInputs[i]. messages [indices.headerIdx ];
2145- auto const & payloadMsg = currentSetOfInputs[i]. messages [indices.payloadIdx ];
2144+ auto const indices = currentSetOfInputs[i] | get_pair{partindex};
2145+ auto const & headerMsg = currentSetOfInputs[i][indices.headerIdx ];
2146+ auto const & payloadMsg = currentSetOfInputs[i][indices.payloadIdx ];
21462147 headerptr = static_cast <char const *>(headerMsg->GetData ());
21472148 payloadptr = payloadMsg ? static_cast <char const *>(payloadMsg->GetData ()) : nullptr ;
21482149 payloadSize = payloadMsg ? payloadMsg->GetSize () : 0 ;
@@ -2151,10 +2152,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21512152 return DataRef{};
21522153 };
21532154 auto nofPartsGetter = [¤tSetOfInputs](size_t i) -> size_t {
2154- return (currentSetOfInputs[i]. messages | count_payloads{});
2155+ return (currentSetOfInputs[i] | count_payloads{});
21552156 };
21562157 auto refCountGetter = [¤tSetOfInputs](size_t idx) -> int {
2157- auto & header = static_cast <const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx]. messages | get_header{0 }));
2158+ auto & header = static_cast <const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx] | get_header{0 }));
21582159 return header.GetRefCount ();
21592160 };
21602161 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size ()};
@@ -2213,7 +2214,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22132214 continue ;
22142215 }
22152216 // This will hopefully delete the message.
2216- currentSetOfInputs[ii].messages . clear ();
2217+ currentSetOfInputs[ii].clear ();
22172218 }
22182219 };
22192220
0 commit comments