Skip to content

Commit fb5baae

Browse files
authored
DPL: add ability to create arrow::RecordBatches directly in shared memory without allocations (#13993)
1 parent 96d683b commit fb5baae

File tree

3 files changed

+268
-0
lines changed

3 files changed

+268
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ o2_add_library(Framework
4848
src/DataProcessingStates.cxx
4949
src/DefaultsHelpers.cxx
5050
src/DomainInfoHeader.cxx
51+
src/EmptyFragment.cxx
5152
src/ProcessingPoliciesHelpers.cxx
5253
src/ConfigParamDiscovery.cxx
5354
src/ConfigParamStore.cxx
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_DEFERREDFRAGMENT_H
12+
#define O2_FRAMEWORK_DEFERREDFRAGMENT_H
13+
14+
#include <arrow/dataset/api.h>
15+
16+
namespace o2::framework
17+
{
18+
19+
// A Fragment which will create a preallocated batch in shared memory
20+
// and fill it directly in place.
21+
class EmptyFragment : public arrow::dataset::Fragment
22+
{
23+
public:
24+
// @a numRows is the number of rows in the final result.
25+
// @a physical_schema the schema of the resulting batch
26+
// @a fillers helper functions to fill the given buffer.
27+
EmptyFragment(size_t rows,
28+
arrow::compute::Expression partition_expression,
29+
std::shared_ptr<arrow::Schema> physical_schema)
30+
: Fragment(std::move(partition_expression), physical_schema)
31+
{
32+
}
33+
34+
// Scanner function which returns a batch where the space is not actually used.
35+
arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
36+
const std::shared_ptr<arrow::dataset::ScanOptions>& options) override;
37+
38+
private:
39+
/// The pointer to each allocation is an incremental number, indexing a collection to track
40+
/// the size of each allocation.
41+
std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
42+
{
43+
mSizes.push_back(size);
44+
return std::make_shared<arrow::Buffer>((uint8_t*)(mSizes.size() - 1), size);
45+
}
46+
std::vector<size_t> mSizes;
47+
size_t mRows;
48+
};
49+
50+
/// An OutputStream which does the reading of the input buffers directly
51+
/// on writing, if needed. Each deferred operation is encoded in the source
52+
/// buffer by an incremental number which can be used to lookup in the @a ops
53+
/// vector the operation to perform.
54+
class PreallocatedOutputStream : public arrow::io::OutputStream
55+
{
56+
public:
57+
explicit PreallocatedOutputStream(std::vector<size_t>& sizes,
58+
const std::shared_ptr<arrow::ResizableBuffer>& buffer);
59+
60+
/// \brief Create in-memory output stream with indicated capacity using a
61+
/// memory pool
62+
/// \param[in] initial_capacity the initial allocated internal capacity of
63+
/// the OutputStream
64+
/// \param[in,out] pool a MemoryPool to use for allocations
65+
/// \return the created stream
66+
static arrow::Result<std::shared_ptr<PreallocatedOutputStream>> Create(
67+
std::vector<size_t>& sizes,
68+
int64_t initial_capacity = 4096,
69+
arrow::MemoryPool* pool = arrow::default_memory_pool());
70+
71+
// By the time we call the destructor, the contents
72+
// of the buffer are already moved to fairmq
73+
// for being sent.
74+
~PreallocatedOutputStream() override = default;
75+
76+
// Implement the OutputStream interface
77+
78+
/// Close the stream, preserving the buffer (retrieve it with Finish()).
79+
arrow::Status Close() override;
80+
[[nodiscard]] bool closed() const override;
81+
[[nodiscard]] arrow::Result<int64_t> Tell() const override;
82+
arrow::Status Write(const void* data, int64_t nbytes) override;
83+
84+
/// \cond FALSE
85+
using OutputStream::Write;
86+
/// \endcond
87+
88+
/// Close the stream and return the buffer
89+
arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
90+
91+
/// \brief Initialize state of OutputStream with newly allocated memory and
92+
/// set position to 0
93+
/// \param[in] initial_capacity the starting allocated capacity
94+
/// \param[in,out] pool the memory pool to use for allocations
95+
/// \return Status
96+
arrow::Status Reset(std::vector<size_t> sizes,
97+
int64_t initial_capacity, arrow::MemoryPool* pool);
98+
99+
[[nodiscard]] int64_t capacity() const { return capacity_; }
100+
101+
private:
102+
std::vector<size_t> sizes_;
103+
PreallocatedOutputStream();
104+
105+
// Ensures there is sufficient space available to write nbytes
106+
arrow::Status Reserve(int64_t nbytes);
107+
108+
std::shared_ptr<arrow::ResizableBuffer> buffer_;
109+
bool is_open_;
110+
int64_t capacity_;
111+
int64_t position_;
112+
uint8_t* mutable_data_;
113+
};
114+
} // namespace o2::framework
115+
116+
#endif
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "Framework/EmptyFragment.h"
12+
#include <arrow/type_fwd.h>
13+
#include <arrow/array/array_primitive.h>
14+
#include <arrow/array/array_nested.h>
15+
#include <memory>
16+
17+
static constexpr int64_t kBufferMinimumSize = 256;
18+
19+
namespace o2::framework
20+
{
21+
22+
// Scanner function which returns a batch where the space is not actually used.
23+
arrow::Result<arrow::RecordBatchGenerator> EmptyFragment::ScanBatchesAsync(
24+
const std::shared_ptr<arrow::dataset::ScanOptions>& options)
25+
{
26+
auto generator = [this]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
27+
std::vector<std::shared_ptr<arrow::Array>> columns;
28+
columns.reserve(this->physical_schema_->fields().size());
29+
30+
for (auto& field : this->physical_schema_->fields()) {
31+
if (auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(field->type())) {
32+
size_t size = mRows * listType->list_size();
33+
if (field->type()->field(0)->type()->byte_width() == 0) {
34+
size /= 8;
35+
} else {
36+
size *= field->type()->field(0)->type()->byte_width();
37+
}
38+
auto varray = std::make_shared<arrow::PrimitiveArray>(field->type()->field(0)->type(), mRows * listType->list_size(), GetPlaceholderForOp(size));
39+
columns.push_back(std::make_shared<arrow::FixedSizeListArray>(field->type(), (int32_t)mRows, varray));
40+
} else {
41+
size_t size = mRows;
42+
if (field->type()->byte_width() == 0) {
43+
size /= 8;
44+
} else {
45+
size *= field->type()->byte_width();
46+
}
47+
columns.push_back(std::make_shared<arrow::PrimitiveArray>(field->type(), mRows, GetPlaceholderForOp(size)));
48+
}
49+
}
50+
return arrow::RecordBatch::Make(physical_schema_, mRows, columns);
51+
};
52+
return generator;
53+
}
54+
55+
PreallocatedOutputStream::PreallocatedOutputStream()
56+
: is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
57+
58+
PreallocatedOutputStream::PreallocatedOutputStream(std::vector<size_t>& sizes,
59+
const std::shared_ptr<arrow::ResizableBuffer>& buffer)
60+
: sizes_(sizes),
61+
buffer_(buffer),
62+
is_open_(true),
63+
capacity_(buffer->size()),
64+
position_(0),
65+
mutable_data_(buffer->mutable_data()) {}
66+
67+
arrow::Result<std::shared_ptr<PreallocatedOutputStream>> PreallocatedOutputStream::Create(
68+
std::vector<size_t>& ops,
69+
int64_t initial_capacity, arrow::MemoryPool* pool)
70+
{
71+
// ctor is private, so cannot use make_shared
72+
auto ptr = std::shared_ptr<PreallocatedOutputStream>(new PreallocatedOutputStream);
73+
RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
74+
return ptr;
75+
}
76+
77+
arrow::Status PreallocatedOutputStream::Reset(std::vector<size_t> sizes,
78+
int64_t initial_capacity, arrow::MemoryPool* pool)
79+
{
80+
ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
81+
sizes_ = sizes;
82+
is_open_ = true;
83+
capacity_ = initial_capacity;
84+
position_ = 0;
85+
mutable_data_ = buffer_->mutable_data();
86+
return arrow::Status::OK();
87+
}
88+
89+
arrow::Status PreallocatedOutputStream::Close()
90+
{
91+
if (is_open_) {
92+
is_open_ = false;
93+
if (position_ < capacity_) {
94+
RETURN_NOT_OK(buffer_->Resize(position_, false));
95+
}
96+
}
97+
return arrow::Status::OK();
98+
}
99+
100+
bool PreallocatedOutputStream::closed() const { return !is_open_; }
101+
102+
arrow::Result<std::shared_ptr<arrow::Buffer>> PreallocatedOutputStream::Finish()
103+
{
104+
RETURN_NOT_OK(Close());
105+
buffer_->ZeroPadding();
106+
is_open_ = false;
107+
return std::move(buffer_);
108+
}
109+
110+
arrow::Result<int64_t> PreallocatedOutputStream::Tell() const { return position_; }
111+
112+
arrow::Status PreallocatedOutputStream::Write(const void* data, int64_t nbytes)
113+
{
114+
if (ARROW_PREDICT_FALSE(!is_open_)) {
115+
return arrow::Status::IOError("OutputStream is closed");
116+
}
117+
if (ARROW_PREDICT_TRUE(nbytes == 0)) {
118+
return arrow::Status::OK();
119+
}
120+
if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
121+
RETURN_NOT_OK(Reserve(nbytes));
122+
}
123+
// This is a real address which needs to be copied. Do it!
124+
auto ref = (int64_t)data;
125+
if (ref >= sizes_.size()) {
126+
memcpy(mutable_data_ + position_, data, nbytes);
127+
position_ += nbytes;
128+
return arrow::Status::OK();
129+
}
130+
131+
position_ += nbytes;
132+
return arrow::Status::OK();
133+
}
134+
135+
arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes)
136+
{
137+
// Always overallocate by doubling. It seems that it is a better growth
138+
// strategy, at least for memory_benchmark.cc.
139+
// This may be because it helps match the allocator's allocation buckets
140+
// more exactly. Or perhaps it hits a sweet spot in jemalloc.
141+
int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
142+
new_capacity = position_ + nbytes;
143+
if (new_capacity > capacity_) {
144+
RETURN_NOT_OK(buffer_->Resize(new_capacity));
145+
capacity_ = new_capacity;
146+
mutable_data_ = buffer_->mutable_data();
147+
}
148+
return arrow::Status::OK();
149+
}
150+
151+
} // namespace o2::framework

0 commit comments

Comments
 (0)