Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,162 +111,6 @@ class MessageResource : public FairMQMemoryResource
}
};

//__________________________________________________________________________________________________
// A spectator pmr memory resource which only watches the memory of the underlying buffer, does not
// carry out real allocation. It owns the underlying buffer which is destroyed on deallocation.
template <typename BufferType>
class SpectatorMemoryResource : public fair::mq::pmr::memory_resource
{
public:
using buffer_type = BufferType;

SpectatorMemoryResource() noexcept = delete;
SpectatorMemoryResource(const SpectatorMemoryResource&) noexcept = delete;
SpectatorMemoryResource(SpectatorMemoryResource&&) noexcept = default;
SpectatorMemoryResource& operator=(const SpectatorMemoryResource&) = delete;
SpectatorMemoryResource& operator=(SpectatorMemoryResource&&) = default;
~SpectatorMemoryResource() noexcept override = default;

// the resource is the pointer managed by unique_ptr
template <typename T>
SpectatorMemoryResource(std::unique_ptr<T, typename buffer_type::deleter_type>&& buffer, size_t size)
: mBuffer{std::move(buffer)}, mPointer{mBuffer.get()}, mSize{size}
{
}

// the resource is the data of the vector managed by unique ptr
template <typename T>
SpectatorMemoryResource(std::unique_ptr<std::vector<T>, typename buffer_type::deleter_type>&& buffer)
: mBuffer{std::move(buffer)}, mPointer{mBuffer->data()}, mSize{mBuffer->size() * sizeof(T)}
{
}

// TODO: the underlying resource can be directly the vector or the read only buffer
protected:
void* do_allocate(std::size_t bytes, std::size_t /*alignment*/) override
{
if (mSize > 0) {
if (bytes > mSize) {
throw std::bad_alloc();
}
mSize = 0;
return mPointer;
}
throw std::runtime_error("Can not allocate: this memory resource is only supposed to provide spectator access to external buffer");
}

void do_deallocate(void* p, std::size_t /*bytes*/, std::size_t /*alignment*/) override
{
if (p == mPointer) {
mBuffer.reset();
mPointer = nullptr;
} else if (mPointer == nullptr) {
// there is an error in the logic flow, this should never be called more than once
throw std::logic_error("underlying controlled resource has been released already");
} else {
throw std::logic_error("this resource can only deallocate the controlled resource pointer");
}
}
bool do_is_equal(const memory_resource& /*other*/) const noexcept override
{
// uniquely owns the underlying resource, can never be equal to any other instance
return false;
}

private:
buffer_type mBuffer;
void* mPointer = nullptr;
size_t mSize = 0;
};

//__________________________________________________________________________________________________
// This in general (as in STL) is a bad idea, but here it is safe to inherit from an allocator since we
// have no additional data and only override some methods so we don't get into slicing and other problems.
template <typename T>
class SpectatorAllocator : public fair::mq::pmr::polymorphic_allocator<T>
{
public:
using fair::mq::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using propagate_on_container_move_assignment = std::true_type;

// skip default construction of empty elements
// this is important for two reasons: one: it allows us to adopt an existing buffer (e.g. incoming message) and
// quickly construct large vectors while skipping the element initialization.
template <class U>
void construct(U*)
{
}

// dont try to call destructors, makes no sense since resource is managed externally AND allowed
// types cannot have side effects
template <typename U>
void destroy(U*)
{
}

T* allocate(size_t size) { return reinterpret_cast<T*>(this->resource()->allocate(size * sizeof(T), 64)); }
void deallocate(T* ptr, size_t size)
{
this->resource()->deallocate(const_cast<typename std::remove_cv<T>::type*>(ptr), size);
}
};

//__________________________________________________________________________________________________
/// This allocator has a pmr-like interface, but keeps the unique MessageResource as internal state,
/// allowing full resource (associated message) management internally without any global state.
template <typename T>
class OwningMessageSpectatorAllocator
{
public:
using value_type = T;

MessageResource mResource;

OwningMessageSpectatorAllocator() noexcept = default;
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator&) noexcept = default;
OwningMessageSpectatorAllocator(OwningMessageSpectatorAllocator&&) noexcept = default;
OwningMessageSpectatorAllocator(MessageResource&& resource) noexcept : mResource{resource} {}

template <class U>
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator<U>& other) noexcept : mResource(other.mResource)
{
}

OwningMessageSpectatorAllocator& operator=(const OwningMessageSpectatorAllocator& other)
{
mResource = other.mResource;
return *this;
}

OwningMessageSpectatorAllocator select_on_container_copy_construction() const
{
return OwningMessageSpectatorAllocator();
}

fair::mq::pmr::memory_resource* resource() { return &mResource; }

// skip default construction of empty elements
// this is important for two reasons: one: it allows us to adopt an existing buffer (e.g. incoming message) and
// quickly construct large vectors while skipping the element initialization.
template <class U>
void construct(U*)
{
}

// dont try to call destructors, makes no sense since resource is managed externally AND allowed
// types cannot have side effects
template <typename U>
void destroy(U*)
{
}

T* allocate(size_t size) { return reinterpret_cast<T*>(mResource.allocate(size * sizeof(T), 64)); }
void deallocate(T* ptr, size_t size)
{
mResource.deallocate(const_cast<typename std::remove_cv<T>::type*>(ptr), size);
}
};

// The NoConstructAllocator behaves like the normal pmr vector but does not call constructors / destructors
template <typename T>
class NoConstructAllocator : public fair::mq::pmr::polymorphic_allocator<T>
Expand Down Expand Up @@ -301,21 +145,10 @@ class NoConstructAllocator : public fair::mq::pmr::polymorphic_allocator<T>
//__________________________________________________________________________________________________
//__________________________________________________________________________________________________

using ByteSpectatorAllocator = SpectatorAllocator<std::byte>;
using BytePmrAllocator = fair::mq::pmr::polymorphic_allocator<std::byte>;
template <class T>
using vector = std::vector<T, fair::mq::pmr::polymorphic_allocator<T>>;

//__________________________________________________________________________________________________
/// Return a std::vector spanned over the contents of the message, takes ownership of the message
template <typename ElemT>
auto adoptVector(size_t nelem, fair::mq::MessagePtr message)
{
static_assert(std::is_trivially_destructible<ElemT>::value);
return std::vector<ElemT, OwningMessageSpectatorAllocator<ElemT>>(
nelem, OwningMessageSpectatorAllocator<ElemT>(MessageResource{std::move(message)}));
};

//__________________________________________________________________________________________________
/// Get the allocator associated to a transport factory
inline static FairMQMemoryResource* getTransportAllocator(fair::mq::TransportFactory* factory)
Expand Down
76 changes: 0 additions & 76 deletions DataFormats/MemoryResources/test/testMemoryResources.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,6 @@ BOOST_AUTO_TEST_CASE(allocator_test)
}

testData::nconstructions = 0;
{
std::vector<testData, SpectatorAllocator<testData>> v(SpectatorAllocator<testData>{allocZMQ});
v.reserve(3);
BOOST_CHECK(allocZMQ->getNumberOfMessages() == 1);
v.emplace_back(1);
v.emplace_back(2);
v.emplace_back(3);
BOOST_CHECK(testData::nconstructions == 3);
}
BOOST_CHECK(allocZMQ->getNumberOfMessages() == 0);
}

Expand Down Expand Up @@ -147,73 +138,6 @@ BOOST_AUTO_TEST_CASE(getMessage_test)
messageArray = static_cast<int*>(message->GetData());
BOOST_CHECK(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);

{
std::vector<testData, SpectatorAllocator<testData>> v(SpectatorAllocator<testData>{allocSHM});
}
}

BOOST_AUTO_TEST_CASE(adoptVector_test)
{
size_t session{(size_t)getpid() * 1000 + 3};
fair::mq::ProgOptions config;
config.SetProperty<std::string>("session", std::to_string(session));

auto factoryZMQ = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
auto factorySHM = fair::mq::TransportFactory::CreateTransportFactory("shmem", "adoptVector_test", &config);
auto allocZMQ = getTransportAllocator(factoryZMQ.get());
auto allocSHM = getTransportAllocator(factorySHM.get());

testData::nconstructions = 0;

// Create a bogus message
auto message = factoryZMQ->CreateMessage(3 * sizeof(testData));
auto messageAddr = message.get();
testData tmpBuf[3] = {3, 2, 1};
std::memcpy(message->GetData(), tmpBuf, 3 * sizeof(testData));

auto adoptedOwner = adoptVector<testData>(3, std::move(message));
BOOST_CHECK(adoptedOwner[0].i == 3);
BOOST_CHECK(adoptedOwner[1].i == 2);
BOOST_CHECK(adoptedOwner[2].i == 1);

auto reclaimedMessage = o2::pmr::getMessage(std::move(adoptedOwner));
BOOST_CHECK(reclaimedMessage.get() == messageAddr);
BOOST_CHECK(adoptedOwner.size() == 0);

auto modified = adoptVector<testData>(3, std::move(reclaimedMessage));
modified.emplace_back(9);
BOOST_CHECK(modified[3].i == 9);
BOOST_CHECK(modified.size() == 4);
BOOST_CHECK(testData::nconstructions == 7);
auto modifiedMessage = getMessage(std::move(modified));
BOOST_CHECK(modifiedMessage != nullptr);
BOOST_CHECK(modifiedMessage.get() != messageAddr);
}

BOOST_AUTO_TEST_CASE(test_SpectatorMemoryResource)
{
constexpr int size = 5;
auto buffer = std::make_unique<int[]>(size);
auto const* bufferdata = buffer.get();
SpectatorMemoryResource<decltype(buffer)> resource(std::move(buffer), size * sizeof(int));
std::vector<int, o2::pmr::SpectatorAllocator<int>> bufferclone(size, o2::pmr::SpectatorAllocator<int>(&resource));
BOOST_CHECK(bufferclone.data() == bufferdata);
BOOST_CHECK(bufferclone.size() == size);
BOOST_CHECK_THROW(bufferclone.resize(2 * size), std::runtime_error);

auto vecbuf = std::make_unique<std::vector<int>>(size);
auto const* vectordata = vecbuf->data();
SpectatorMemoryResource<decltype(vecbuf)> vecresource(std::move(vecbuf));
std::vector<int, o2::pmr::SpectatorAllocator<int>> vecclone(size, o2::pmr::SpectatorAllocator<int>(&vecresource));
BOOST_CHECK(vecclone.data() == vectordata);
BOOST_CHECK(vecclone.size() == size);
BOOST_CHECK_THROW(vecclone.resize(2 * size), std::runtime_error);

std::vector<int, o2::pmr::SpectatorAllocator<int>> vecmove;
vecmove = std::move(vecclone);
BOOST_CHECK(vecclone.size() == 0);
BOOST_CHECK(vecmove.data() == vectordata);
BOOST_CHECK(vecmove.size() == size);
}

}; // namespace o2::pmr
Loading