Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ all-features = true
async = ["dep:futures"]
ffi = ["arrow-schema/ffi", "arrow-data/ffi"]
force_validate = []
# Enable memory tracking support
pool = ["arrow-buffer/pool", "arrow-data/pool"]

[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ unsafe impl Array for BooleanArray {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.values.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl ArrayAccessor for &BooleanArray {
Expand Down
9 changes: 9 additions & 0 deletions arrow-array/src/array/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,15 @@ unsafe impl<T: ByteArrayType> Array for GenericByteArray<T> {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.value_offsets.claim(pool);
self.value_data.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<'a, T: ByteArrayType> ArrayAccessor for &'a GenericByteArray<T> {
Expand Down
11 changes: 11 additions & 0 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,17 @@ unsafe impl<T: ByteViewType + ?Sized> Array for GenericByteViewArray<T> {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.views.claim(pool);
for buffer in self.buffers.iter() {
buffer.claim(pool);
}
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<'a, T: ByteViewType + ?Sized> ArrayAccessor for &'a GenericByteViewArray<T> {
Expand Down
11 changes: 11 additions & 0 deletions arrow-array/src/array/dictionary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,12 @@ unsafe impl<T: ArrowDictionaryKeyType> Array for DictionaryArray<T> {
+ self.keys.get_buffer_memory_size()
+ self.values.get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.keys.claim(pool);
self.values.claim(pool);
}
}

impl<T: ArrowDictionaryKeyType> std::fmt::Debug for DictionaryArray<T> {
Expand Down Expand Up @@ -911,6 +917,11 @@ unsafe impl<K: ArrowDictionaryKeyType, V: Sync> Array for TypedDictionaryArray<'
fn get_array_memory_size(&self) -> usize {
self.dictionary.get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.dictionary.claim(pool);
}
}

impl<K, V> IntoIterator for TypedDictionaryArray<'_, K, V>
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/array/fixed_size_binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,14 @@ unsafe impl Array for FixedSizeBinaryArray {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.value_data.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<'a> ArrayAccessor for &'a FixedSizeBinaryArray {
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,14 @@ unsafe impl Array for FixedSizeListArray {
}
size
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.values.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl ArrayAccessor for FixedSizeListArray {
Expand Down
9 changes: 9 additions & 0 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,15 @@ unsafe impl<OffsetSize: OffsetSizeTrait> Array for GenericListArray<OffsetSize>
}
size
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.value_offsets.claim(pool);
self.values.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<OffsetSize: OffsetSizeTrait> ArrayAccessor for &GenericListArray<OffsetSize> {
Expand Down
10 changes: 10 additions & 0 deletions arrow-array/src/array/list_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ unsafe impl<OffsetSize: OffsetSizeTrait> Array for GenericListViewArray<OffsetSi
}
size
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.value_offsets.claim(pool);
self.value_sizes.claim(pool);
self.values.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<OffsetSize: OffsetSizeTrait> std::fmt::Debug for GenericListViewArray<OffsetSize> {
Expand Down
9 changes: 9 additions & 0 deletions arrow-array/src/array/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ unsafe impl Array for MapArray {
}
size
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.value_offsets.claim(pool);
self.entries.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl ArrayAccessor for &MapArray {
Expand Down
79 changes: 79 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,75 @@ pub unsafe trait Array: std::fmt::Debug + Send + Sync {
/// This value will always be greater than returned by `get_buffer_memory_size()` and
/// includes the overhead of the data structures that contain the pointers to the various buffers.
fn get_array_memory_size(&self) -> usize;

/// Claim memory used by this array in the provided memory pool.
///
/// This recursively claims memory for:
/// - All data buffers in this array
/// - All child arrays (for nested types like List, Struct, etc.)
/// - The null bitmap buffer if present
///
/// This method guarantees that the memory pool will only compute occupied memory
/// exactly once. For example, if this array is derived from operations like `slice`,
/// calling `claim` on it would not change the memory pool's usage if the underlying buffers
/// are already counted before.
///
/// # Example
/// ```
/// # use arrow_array::{Int32Array, Array};
/// # use arrow_buffer::TrackingMemoryPool;
/// # use arrow_buffer::MemoryPool;
///
/// let pool = TrackingMemoryPool::default();
///
/// let small_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let small_array_size = small_array.get_buffer_memory_size();
///
/// // Claim the array's memory in the pool
/// small_array.claim(&pool);
///
/// // Create and claim slices of `small_array`; should not increase memory usage
/// let slice1 = small_array.slice(0, 2);
/// let slice2 = small_array.slice(2, 2);
/// slice1.claim(&pool);
/// slice2.claim(&pool);
///
/// assert_eq!(pool.used(), small_array_size);
///
/// // Create a `large_array` which does not derive from the original `small_array`
///
/// let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
/// let large_array_size = large_array.get_buffer_memory_size();
///
/// large_array.claim(&pool);
///
/// // Trying to claim more than once is a no-op
/// large_array.claim(&pool);
/// large_array.claim(&pool);
///
/// assert_eq!(pool.used(), small_array_size + large_array_size);
///
/// let sum_of_all_sizes = small_array_size + large_array_size + slice1.get_buffer_memory_size() + slice2.get_buffer_memory_size();
///
/// // `get_buffer_memory_size` works independently of the memory pool, so a sum of all the
/// // arrays in scope will always be >= the memory used reported by the memory pool.
/// assert_ne!(pool.used(), sum_of_all_sizes);
///
/// // Until the final claim is dropped the buffer size remains accounted for
/// drop(small_array);
/// drop(slice1);
///
/// assert_eq!(pool.used(), small_array_size + large_array_size);
///
/// // Dropping this finally releases the buffer that was backing `small_array`
/// drop(slice2);
///
/// assert_eq!(pool.used(), large_array_size);
/// ```
#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.to_data().claim(pool)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling to_data allocates a new Vec to create the ArrayData (the child buffers specifically)

Should we perhaps instead thread the claim API through the actual arrays / buffers?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Updating my PR

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here maybe I should add a comment stating that Array impl can provide a more optimised method. Wdyt?

}
}

/// A reference-counted reference to a generic `Array`
Expand Down Expand Up @@ -437,6 +506,11 @@ unsafe impl Array for ArrayRef {
fn get_array_memory_size(&self) -> usize {
self.as_ref().get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.as_ref().claim(pool)
}
}

unsafe impl<T: Array> Array for &T {
Expand Down Expand Up @@ -507,6 +581,11 @@ unsafe impl<T: Array> Array for &T {
fn get_array_memory_size(&self) -> usize {
T::get_array_memory_size(self)
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
T::claim(self, pool)
}
}

/// A generic trait for accessing the values of an [`Array`]
Expand Down
5 changes: 5 additions & 0 deletions arrow-array/src/array/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ unsafe impl Array for NullArray {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>()
}

#[cfg(feature = "pool")]
fn claim(&self, _pool: &dyn arrow_buffer::MemoryPool) {
// NullArray has no buffers to claim
}
}

impl From<ArrayData> for NullArray {
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,14 @@ unsafe impl<T: ArrowPrimitiveType> Array for PrimitiveArray<T> {
fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.values.claim(pool);
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl<T: ArrowPrimitiveType> ArrayAccessor for &PrimitiveArray<T> {
Expand Down
11 changes: 11 additions & 0 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ unsafe impl<T: RunEndIndexType> Array for RunArray<T> {
+ self.run_ends.inner().inner().capacity()
+ self.values.get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.run_ends.claim(pool);
self.values.claim(pool);
}
}

impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
Expand Down Expand Up @@ -603,6 +609,11 @@ unsafe impl<R: RunEndIndexType, V: Sync> Array for TypedRunArray<'_, R, V> {
fn get_array_memory_size(&self) -> usize {
self.run_array.get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.run_array.claim(pool);
}
}

// Array accessor converts the index of logical array to the index of the physical array
Expand Down
10 changes: 10 additions & 0 deletions arrow-array/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ unsafe impl Array for StructArray {
}
size
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
for field in &self.fields {
field.claim(pool);
}
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}
}
}

impl From<Vec<(FieldRef, ArrayRef)>> for StructArray {
Expand Down
11 changes: 11 additions & 0 deletions arrow-array/src/array/union_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,17 @@ unsafe impl Array for UnionArray {
.sum::<usize>()
+ sum
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.type_ids.claim(pool);
if let Some(offsets) = &self.offsets {
offsets.claim(pool);
}
for field in self.fields.iter().flatten() {
field.claim(pool);
}
}
}

impl std::fmt::Debug for UnionArray {
Expand Down
1 change: 1 addition & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ bench = false
all-features = true

[features]
# Enable memory tracking support
pool = []

[dependencies]
Expand Down
8 changes: 8 additions & 0 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@ impl BooleanBuffer {
self.buffer
}

/// Claim memory used by this buffer in the provided memory pool.
///
/// See [`Buffer::claim`] for details.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
self.buffer.claim(pool);
}

/// Returns an iterator over the bits in this [`BooleanBuffer`]
pub fn iter(&self) -> BitIterator<'_> {
self.into_iter()
Expand Down
9 changes: 8 additions & 1 deletion arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{Buffer, MutableBuffer};
/// that it is null.
///
/// # See also
/// * [`NullBufferBuilder`] for creating `NullBuffer`s
/// * [`NullBufferBuilder`] for creating `NullBuffer`s
///
/// [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps
/// [`NullBufferBuilder`]: crate::NullBufferBuilder
Expand Down Expand Up @@ -231,6 +231,13 @@ impl NullBuffer {
let nb = NullBuffer::new(bb);
(nb.null_count() > 0).then_some(nb)
}

/// Claim memory used by this null buffer in the provided memory pool.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
// NullBuffer wraps a BooleanBuffer which wraps a Buffer
self.buffer.inner().claim(pool);
}
}

impl<'a> IntoIterator for &'a NullBuffer {
Expand Down
Loading
Loading