Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions Framework/Core/include/Framework/FairMQResizableBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@
namespace o2::framework
{

using namespace arrow;
using namespace arrow::io;

class FairMQOutputStream : public OutputStream
class FairMQOutputStream : public arrow::io::OutputStream
{
public:
explicit FairMQOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
explicit FairMQOutputStream(const std::shared_ptr<arrow::ResizableBuffer>& buffer);

/// \brief Create in-memory output stream with indicated capacity using a
/// memory pool
/// \param[in] initial_capacity the initial allocated internal capacity of
/// the OutputStream
/// \param[in,out] pool a MemoryPool to use for allocations
/// \return the created stream
static Result<std::shared_ptr<FairMQOutputStream>> Create(
int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool());
static arrow::Result<std::shared_ptr<FairMQOutputStream>> Create(
int64_t initial_capacity = 4096, arrow::MemoryPool* pool = arrow::default_memory_pool());

// By the time we call the destructor, the contents
// of the buffer are already moved to fairmq
Expand All @@ -49,34 +46,34 @@ class FairMQOutputStream : public OutputStream
// Implement the OutputStream interface

/// Close the stream, preserving the buffer (retrieve it with Finish()).
Status Close() override;
arrow::Status Close() override;
[[nodiscard]] bool closed() const override;
[[nodiscard]] Result<int64_t> Tell() const override;
Status Write(const void* data, int64_t nbytes) override;
[[nodiscard]] arrow::Result<int64_t> Tell() const override;
arrow::Status Write(const void* data, int64_t nbytes) override;

/// \cond FALSE
using OutputStream::Write;
/// \endcond

/// Close the stream and return the buffer
Result<std::shared_ptr<Buffer>> Finish();
arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();

/// \brief Initialize state of OutputStream with newly allocated memory and
/// set position to 0
/// \param[in] initial_capacity the starting allocated capacity
/// \param[in,out] pool the memory pool to use for allocations
/// \return Status
Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
arrow::Status Reset(int64_t initial_capacity = 1024, arrow::MemoryPool* pool = arrow::default_memory_pool());

[[nodiscard]] int64_t capacity() const { return capacity_; }

private:
FairMQOutputStream();

// Ensures there is sufficient space available to write nbytes
Status Reserve(int64_t nbytes);
arrow::Status Reserve(int64_t nbytes);

std::shared_ptr<ResizableBuffer> buffer_;
std::shared_ptr<arrow::ResizableBuffer> buffer_;
bool is_open_;
int64_t capacity_;
int64_t position_;
Expand Down
14 changes: 8 additions & 6 deletions Framework/Core/src/FairMQResizableBuffer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <cassert>
#include <utility>

using arrow::Status;

namespace arrow::io::internal
{
void CloseFromDestructor(FileInterface* file);
Expand All @@ -28,23 +30,23 @@ static constexpr int64_t kBufferMinimumSize = 256;
FairMQOutputStream::FairMQOutputStream()
: is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}

FairMQOutputStream::FairMQOutputStream(const std::shared_ptr<ResizableBuffer>& buffer)
FairMQOutputStream::FairMQOutputStream(const std::shared_ptr<arrow::ResizableBuffer>& buffer)
: buffer_(buffer),
is_open_(true),
capacity_(buffer->size()),
position_(0),
mutable_data_(buffer->mutable_data()) {}

Result<std::shared_ptr<FairMQOutputStream>> FairMQOutputStream::Create(
int64_t initial_capacity, MemoryPool* pool)
arrow::Result<std::shared_ptr<FairMQOutputStream>> FairMQOutputStream::Create(
int64_t initial_capacity, arrow::MemoryPool* pool)
{
// ctor is private, so cannot use make_shared
auto ptr = std::shared_ptr<FairMQOutputStream>(new FairMQOutputStream);
RETURN_NOT_OK(ptr->Reset(initial_capacity, pool));
return ptr;
}

Status FairMQOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool)
Status FairMQOutputStream::Reset(int64_t initial_capacity, arrow::MemoryPool* pool)
{
ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
is_open_ = true;
Expand All @@ -67,15 +69,15 @@ Status FairMQOutputStream::Close()

bool FairMQOutputStream::closed() const { return !is_open_; }

Result<std::shared_ptr<Buffer>> FairMQOutputStream::Finish()
arrow::Result<std::shared_ptr<arrow::Buffer>> FairMQOutputStream::Finish()
{
RETURN_NOT_OK(Close());
buffer_->ZeroPadding();
is_open_ = false;
return std::move(buffer_);
}

Result<int64_t> FairMQOutputStream::Tell() const { return position_; }
arrow::Result<int64_t> FairMQOutputStream::Tell() const { return position_; }

Status FairMQOutputStream::Write(const void* data, int64_t nbytes)
{
Expand Down