Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions DataFormats/Headers/include/Headers/Stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
#include "MemoryResources/MemoryResources.h"
#include "Headers/DataHeader.h"

namespace o2
{

namespace header
namespace o2::header
{
//__________________________________________________________________________________________________
/// @struct Stack
Expand Down Expand Up @@ -45,20 +42,20 @@ struct Stack {
};

public:
using allocator_type = boost::container::pmr::polymorphic_allocator<std::byte>;
using allocator_type = fair::mq::pmr::polymorphic_allocator<std::byte>;
using value_type = std::byte;
using BufferType = std::unique_ptr<value_type[], freeobj>; //this gives us proper default move semantics for free

Stack() = default;
Stack(Stack&&) = default;
Stack(Stack&) = delete;
Stack& operator=(Stack&) = delete;
Stack& operator=(Stack&&) = default;
Stack& operator=(Stack&&) = delete;

value_type* data() const { return buffer.get(); }
size_t size() const { return bufferSize; }
[[nodiscard]] value_type* data() const { return buffer.get(); }
[[nodiscard]] size_t size() const { return bufferSize; }
allocator_type get_allocator() const { return allocator; }
const BaseHeader* first() const { return reinterpret_cast<const BaseHeader*>(this->data()); }
[[nodiscard]] const BaseHeader* first() const { return reinterpret_cast<const BaseHeader*>(this->data()); }
static const BaseHeader* firstHeader(std::byte const* buf) { return BaseHeader::get(buf); }
static const BaseHeader* lastHeader(std::byte const* buf)
{
Expand Down Expand Up @@ -90,9 +87,9 @@ struct Stack {
/// all headers must derive from BaseHeader, in addition also other stacks can be passed to ctor.
template <typename FirstArgType, typename... Headers,
typename std::enable_if_t<
!std::is_convertible<FirstArgType, boost::container::pmr::polymorphic_allocator<std::byte>>::value, int> = 0>
!std::is_convertible<FirstArgType, fair::mq::pmr::polymorphic_allocator<std::byte>>::value, int> = 0>
Stack(FirstArgType&& firstHeader, Headers&&... headers)
: Stack(boost::container::pmr::new_delete_resource(), std::forward<FirstArgType>(firstHeader),
: Stack(fair::mq::pmr::new_delete_resource(), std::forward<FirstArgType>(firstHeader),
std::forward<Headers>(headers)...)
{
}
Expand Down Expand Up @@ -143,7 +140,7 @@ struct Stack {
constexpr static size_t calculateSize() { return 0; }

private:
allocator_type allocator{boost::container::pmr::new_delete_resource()};
allocator_type allocator{fair::mq::pmr::new_delete_resource()};
size_t bufferSize{0};
BufferType buffer{nullptr, freeobj{allocator.resource()}};

Expand Down Expand Up @@ -231,7 +228,6 @@ struct Stack {
}
};

} // namespace header
} // namespace o2
} // namespace o2::header

#endif // HEADERS_STACK_H
2 changes: 1 addition & 1 deletion DataFormats/Headers/test/testDataHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ BOOST_AUTO_TEST_CASE(headerStack_test)
BOOST_CHECK(h3->secret == 42);

//test constructing from a buffer and an additional header
using namespace boost::container::pmr;
using namespace fair::mq::pmr;
Stack s5(new_delete_resource(), s1.data(), Stack{}, meta);
BOOST_CHECK(s5.size() == s1.size() + sizeof(meta));
// check if we can find the header even though there was an empty stack in the middle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class MessageResource : public FairMQMemoryResource
// A spectator pmr memory resource which only watches the memory of the underlying buffer, does not
// carry out real allocation. It owns the underlying buffer which is destroyed on deallocation.
template <typename BufferType>
class SpectatorMemoryResource : public boost::container::pmr::memory_resource
class SpectatorMemoryResource : public fair::mq::pmr::memory_resource
{
public:
using buffer_type = BufferType;
Expand Down Expand Up @@ -183,10 +183,10 @@ class SpectatorMemoryResource : public boost::container::pmr::memory_resource
// This in general (as in STL) is a bad idea, but here it is safe to inherit from an allocator since we
// have no additional data and only override some methods so we don't get into slicing and other problems.
template <typename T>
class SpectatorAllocator : public boost::container::pmr::polymorphic_allocator<T>
class SpectatorAllocator : public fair::mq::pmr::polymorphic_allocator<T>
{
public:
using boost::container::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using fair::mq::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using propagate_on_container_move_assignment = std::true_type;

// skip default construction of empty elements
Expand Down Expand Up @@ -243,7 +243,7 @@ class OwningMessageSpectatorAllocator
return OwningMessageSpectatorAllocator();
}

boost::container::pmr::memory_resource* resource() { return &mResource; }
fair::mq::pmr::memory_resource* resource() { return &mResource; }

// skip default construction of empty elements
// this is important for two reasons: one: it allows us to adopt an existing buffer (e.g. incoming message) and
Expand All @@ -269,14 +269,14 @@ class OwningMessageSpectatorAllocator

// The NoConstructAllocator behaves like the normal pmr vector but does not call constructors / destructors
template <typename T>
class NoConstructAllocator : public boost::container::pmr::polymorphic_allocator<T>
class NoConstructAllocator : public fair::mq::pmr::polymorphic_allocator<T>
{
public:
using boost::container::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using fair::mq::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using propagate_on_container_move_assignment = std::true_type;

template <typename... Args>
NoConstructAllocator(Args&&... args) : boost::container::pmr::polymorphic_allocator<T>(std::forward<Args>(args)...)
NoConstructAllocator(Args&&... args) : fair::mq::pmr::polymorphic_allocator<T>(std::forward<Args>(args)...)
{
}

Expand All @@ -302,9 +302,9 @@ class NoConstructAllocator : public boost::container::pmr::polymorphic_allocator
//__________________________________________________________________________________________________

using ByteSpectatorAllocator = SpectatorAllocator<std::byte>;
using BytePmrAllocator = boost::container::pmr::polymorphic_allocator<std::byte>;
using BytePmrAllocator = fair::mq::pmr::polymorphic_allocator<std::byte>;
template <class T>
using vector = std::vector<T, o2::pmr::polymorphic_allocator<T>>;
using vector = std::vector<T, fair::mq::pmr::polymorphic_allocator<T>>;

//__________________________________________________________________________________________________
/// Return a std::vector spanned over the contents of the message, takes ownership of the message
Expand Down
2 changes: 1 addition & 1 deletion DataFormats/MemoryResources/test/testMemoryResources.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(transportallocatormap_test)
BOOST_CHECK(_tmp == allocZMQ);
}

using namespace boost::container::pmr;
using namespace fair::mq::pmr;

BOOST_AUTO_TEST_CASE(allocator_test)
{
Expand Down
30 changes: 11 additions & 19 deletions Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace o2
{
namespace tpc
{

/// create the processor spec for TPC raw cluster decoder converting TPC raw to native clusters
/// Input: raw pages of TPC raw clusters
/// Output: vector of containers with clusters in ClusterNative format, one container per
Expand Down Expand Up @@ -79,27 +80,18 @@ DataProcessorSpec getClusterDecoderRawSpec(bool sendMC)
// init the stacks for forwarding the sector header
// FIXME check if there is functionality in the DPL to forward the stack
// FIXME make one function
o2::header::Stack rawHeaderStack;
o2::header::Stack mcHeaderStack;
o2::tpc::TPCSectorHeader const* sectorHeaderMC = nullptr;
if (DataRefUtils::isValid(mclabelref)) {
sectorHeaderMC = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(mclabelref);
if (sectorHeaderMC) {
o2::header::Stack actual{*sectorHeaderMC};
std::swap(mcHeaderStack, actual);
if (sectorHeaderMC->sector() < 0) {
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, std::move(mcHeaderStack)}, fanSpec);
}
}
}

if (sectorHeaderMC && sectorHeaderMC->sector() < 0) {
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, {*sectorHeaderMC}}, fanSpec);
}
auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
if (sectorHeader) {
o2::header::Stack actual{*sectorHeader};
std::swap(rawHeaderStack, actual);
if (sectorHeader->sector() < 0) {
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, std::move(rawHeaderStack)}, fanSpec);
return;
}
if (sectorHeader && sectorHeader->sector() < 0) {
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, {*sectorHeader}}, fanSpec);
return;
}
assert(sectorHeaderMC == nullptr || sectorHeader->sector() == sectorHeaderMC->sector());

Expand Down Expand Up @@ -166,8 +158,8 @@ DataProcessorSpec getClusterDecoderRawSpec(bool sendMC)
// output of the decoder is sorted in (sector,globalPadRow) coordinates, individual
// containers are created for clusters and MC labels per (sector,globalPadRow) address
char* outputBuffer = nullptr;
auto outputAllocator = [&pc, &fanSpec, &outputBuffer, &rawHeaderStack](size_t size) -> char* {
outputBuffer = pc.outputs().newChunk(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, std::move(rawHeaderStack)}, size).data();
auto outputAllocator = [&pc, &fanSpec, &outputBuffer, sectorHeader](size_t size) -> char* {
outputBuffer = pc.outputs().newChunk(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, sectorHeader ? o2::header::Stack{*sectorHeader} : o2::header::Stack{}}, size).data();
return outputBuffer;
};
MCLabelContainer mcout;
Expand All @@ -188,7 +180,7 @@ DataProcessorSpec getClusterDecoderRawSpec(bool sendMC)
// serialize the complete list of MC label containers
ConstMCLabelContainer labelsFlat;
mcout.flatten_to(labelsFlat);
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, std::move(mcHeaderStack)}, labelsFlat);
pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, sectorHeaderMC ? o2::header::Stack{*sectorHeaderMC} : o2::header::Stack{}}, labelsFlat);
}
};

Expand Down
9 changes: 1 addition & 8 deletions Framework/Core/include/Framework/Output.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,7 @@ struct Output {

Output& operator=(const Output&) = delete;

Output& operator=(Output&& rhs)
{
origin = rhs.origin;
description = rhs.description;
subSpec = rhs.subSpec;
metaHeader = std::move(rhs.metaHeader);
return *this;
}
Output& operator=(Output&& rhs) = delete;

bool operator==(const Output& that) const
{
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_FairMQ.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ TEST_CASE("addDataBlockForEach_test")
int i;
int j;
};
using namespace boost::container::pmr;
using namespace fair::mq::pmr;
fair::mq::Parts message;
std::vector<elem, polymorphic_allocator<elem>> vec(polymorphic_allocator<elem>{allocZMQ});
vec.reserve(100);
Expand Down
1 change: 0 additions & 1 deletion Utilities/DataSampling/include/DataSampling/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class Dispatcher : public framework::Task

private:
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original);
header::Stack extractAdditionalHeaders(const char* inputHeaderStack) const;
void reportStats(monitoring::Monitoring& monitoring) const;
void send(framework::DataAllocator& dataAllocator, const framework::DataRef& inputData, const framework::Output& output) const;

Expand Down
54 changes: 38 additions & 16 deletions Utilities/DataSampling/src/Dispatcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <stdexcept>

using namespace o2::configuration;
using namespace o2::monitoring;
Expand Down Expand Up @@ -77,6 +78,42 @@ void Dispatcher::init(InitContext& ctx)
mDeviceID.runtimeInit(spec.id.substr(0, DataSamplingHeader::deviceIDTypeSize).c_str());
}

header::Stack extractAdditionalHeaders(const char* inputHeaderStack)
{
std::array<header::BaseHeader const*, 5> headers;
int count = 0;
const auto* first = header::BaseHeader::get(reinterpret_cast<const std::byte*>(inputHeaderStack));
for (const auto* current = first; current != nullptr; current = current->next()) {
if (current->description != header::DataHeader::sHeaderType && current->description != DataProcessingHeader::sHeaderType) {
headers[count++] = current;
}
}

// Poor man runtime pack expansion.
switch (count) {
case 0:
return header::Stack{};
case 1:
return header::Stack{*headers[0]};
case 2:
return header::Stack{*headers[0], *headers[1]};
case 3:
return header::Stack{*headers[0], *headers[1], *headers[2]};
case 4:
return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3]};
case 5:
return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4]};
case 6:
return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5]};
case 7:
return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6]};
case 8:
return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6], *headers[7]};
default:
throw std::runtime_error(fmt::format("Too many headers to copy {}", count));
}
}

void Dispatcher::run(ProcessingContext& ctx)
{
// todo: consider matching (and deciding) in completion policy to save some time
Expand Down Expand Up @@ -106,7 +143,7 @@ void Dispatcher::run(ProcessingContext& ctx)
// so that custom data-dependent headers are passed forward,
// and we add a DataSamplingHeader.
header::Stack headerStack{
std::move(extractAdditionalHeaders(part.header)),
extractAdditionalHeaders(part.header),
dsheader};
const auto* partInputHeader = DataRefUtils::getHeader<header::DataHeader*>(part);

Expand Down Expand Up @@ -156,21 +193,6 @@ DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolic
original};
}

header::Stack Dispatcher::extractAdditionalHeaders(const char* inputHeaderStack) const
{
header::Stack headerStack;

const auto* first = header::BaseHeader::get(reinterpret_cast<const std::byte*>(inputHeaderStack));
for (const auto* current = first; current != nullptr; current = current->next()) {
if (current->description != header::DataHeader::sHeaderType &&
current->description != DataProcessingHeader::sHeaderType) {
headerStack = std::move(header::Stack{std::move(headerStack), *current});
}
}

return headerStack;
}

void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, const Output& output) const
{
const auto* inputHeader = DataRefUtils::getHeader<header::DataHeader*>(inputData);
Expand Down