Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl StatementExecutor {
let options = task_ctx.session_config().options();

// Track memory usage for the query result if it's bounded
let mut reservation =
let reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

if physical_plan.boundedness().is_unbounded() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ mod tests {
// configure with same memory / disk manager
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
let reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();
Expand Down
10 changes: 5 additions & 5 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ impl FileSink for ParquetSink {
parquet_props.clone(),
)
.await?;
let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
.register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
Expand Down Expand Up @@ -1465,7 +1465,7 @@ impl DataSink for ParquetSink {
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
mut reservation: MemoryReservation,
reservation: MemoryReservation,
) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
Expand Down Expand Up @@ -1550,7 +1550,7 @@ fn spawn_rg_join_and_finalize_task(
rg_rows: usize,
pool: &Arc<dyn MemoryPool>,
) -> SpawnedTask<RBStreamSerializeResult> {
let mut rg_reservation =
let rg_reservation =
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);

SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -1682,12 +1682,12 @@ async fn concatenate_parallel_row_groups(
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
pool: Arc<dyn MemoryPool>,
) -> Result<ParquetMetaData> {
let mut file_reservation =
let file_reservation =
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let (serialized_columns, mut rg_reservation, _cnt) =
let (serialized_columns, rg_reservation, _cnt) =
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

let mut rg_out = parquet_writer.next_row_group()?;
Expand Down
92 changes: 50 additions & 42 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
//! help with allocation accounting.

use datafusion_common::{Result, internal_err};
use datafusion_common::{Result, internal_datafusion_err};
use std::hash::{Hash, Hasher};
use std::{cmp::Ordering, sync::Arc, sync::atomic};

Expand Down Expand Up @@ -322,7 +322,7 @@ impl MemoryConsumer {
pool: Arc::clone(pool),
consumer: self,
}),
size: 0,
size: atomic::AtomicUsize::new(0),
}
}
}
Expand Down Expand Up @@ -351,13 +351,13 @@ impl Drop for SharedRegistration {
#[derive(Debug)]
pub struct MemoryReservation {
registration: Arc<SharedRegistration>,
size: usize,
size: atomic::AtomicUsize,
}

impl MemoryReservation {
/// Returns the size of this reservation in bytes
pub fn size(&self) -> usize {
self.size
self.size.load(atomic::Ordering::Relaxed)
}

/// Returns [MemoryConsumer] for this [MemoryReservation]
Expand All @@ -367,8 +367,8 @@ impl MemoryReservation {

/// Frees all bytes from this reservation back to the underlying
/// pool, returning the number of bytes freed.
pub fn free(&mut self) -> usize {
let size = self.size;
pub fn free(&self) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this technically this is a breaking API change? I thought about it and from what I can tell the answer is no as to all this API in DataFusion 52 the caller needs a mut and in 53 would not (but could still call it with mut even though that is not needed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, this is not a breaking API change.

One thing that could happen is that people have some clippy lint that goes off in case of "unused muts". In that case people will start seeing new clippy warnings with DataFusion 53 in their own code.

let size = self.size.load(atomic::Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that the reservations should be consistent (Ordering::seqcst), otherwise I would worry that we run the risk of not seeing other changes.

However, Relaxed seems to be used in the MemoryPools themselves, so this is consistent

https://github.com/apache/datafusion/blob/ead8209803770773980fafaf0fc622bb606be0ee/datafusion/execution/src/memory_pool/pool.rs#L83-L82

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the actual accounting that matters is done in the impl MemoryPools, so using something more consistent here is not really going to yield any improvement.

if size != 0 {
self.shrink(size)
}
Expand All @@ -380,60 +380,62 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn shrink(&mut self, capacity: usize) {
let new_size = self.size.checked_sub(capacity).unwrap();
pub fn shrink(&self, capacity: usize) {
self.size.fetch_sub(capacity, atomic::Ordering::Relaxed);
self.registration.pool.shrink(self, capacity);
self.size = new_size
}

/// Tries to free `capacity` bytes from this reservation
/// if `capacity` does not exceed [`Self::size`]
/// Returns new reservation size
/// or error if shrinking capacity is more than allocated size
pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
if let Some(new_size) = self.size.checked_sub(capacity) {
self.registration.pool.shrink(self, capacity);
self.size = new_size;
Ok(new_size)
} else {
internal_err!(
"Cannot free the capacity {capacity} out of allocated size {}",
self.size
pub fn try_shrink(&self, capacity: usize) -> Result<usize> {
let updated = self.size.fetch_update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|prev| prev.checked_sub(capacity),
);
updated.map_err(|_| {
let prev = self.size.load(atomic::Ordering::Relaxed);
internal_datafusion_err!(
"Cannot free the capacity {capacity} out of allocated size {prev}"
)
}
})
}

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
pub fn resize(&self, capacity: usize) {
let size = self.size.load(atomic::Ordering::Relaxed);
match capacity.cmp(&size) {
Ordering::Greater => self.grow(capacity - size),
Ordering::Less => self.shrink(size - capacity),
_ => {}
}
}

/// Try to set the size of this reservation to `capacity`
pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
match capacity.cmp(&self.size) {
Ordering::Greater => self.try_grow(capacity - self.size)?,
Ordering::Less => self.shrink(self.size - capacity),
pub fn try_resize(&self, capacity: usize) -> Result<()> {
let size = self.size.load(atomic::Ordering::Relaxed);
match capacity.cmp(&size) {
Ordering::Greater => self.try_grow(capacity - size)?,
Ordering::Less => self.shrink(size - capacity),
_ => {}
};
Ok(())
}

/// Increase the size of this reservation by `capacity` bytes
pub fn grow(&mut self, capacity: usize) {
pub fn grow(&self, capacity: usize) {
self.registration.pool.grow(self, capacity);
self.size += capacity;
self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
}

/// Try to increase the size of this reservation by `capacity`
/// bytes, returning error if there is insufficient capacity left
/// in the pool.
pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
pub fn try_grow(&self, capacity: usize) -> Result<()> {
self.registration.pool.try_grow(self, capacity)?;
self.size += capacity;
self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
Ok(())
}

Expand All @@ -447,26 +449,32 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn split(&mut self, capacity: usize) -> MemoryReservation {
self.size = self.size.checked_sub(capacity).unwrap();
pub fn split(&self, capacity: usize) -> MemoryReservation {
self.size
.fetch_update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|prev| prev.checked_sub(capacity),
)
.unwrap();
Self {
size: capacity,
size: atomic::AtomicUsize::new(capacity),
registration: Arc::clone(&self.registration),
}
}

/// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn new_empty(&self) -> Self {
Self {
size: 0,
size: atomic::AtomicUsize::new(0),
registration: Arc::clone(&self.registration),
}
}

/// Splits off all the bytes from this [`MemoryReservation`] into
/// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn take(&mut self) -> MemoryReservation {
self.split(self.size)
self.split(self.size.load(atomic::Ordering::Relaxed))
}
}

Expand All @@ -492,7 +500,7 @@ mod tests {
#[test]
fn test_memory_pool_underflow() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut a1 = MemoryConsumer::new("a1").register(&pool);
let a1 = MemoryConsumer::new("a1").register(&pool);
assert_eq!(pool.reserved(), 0);

a1.grow(100);
Expand All @@ -507,7 +515,7 @@ mod tests {
a1.try_grow(30).unwrap();
assert_eq!(pool.reserved(), 30);

let mut a2 = MemoryConsumer::new("a2").register(&pool);
let a2 = MemoryConsumer::new("a2").register(&pool);
a2.try_grow(25).unwrap_err();
assert_eq!(pool.reserved(), 30);

Expand All @@ -521,7 +529,7 @@ mod tests {
#[test]
fn test_split() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);
let r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
assert_eq!(r1.size(), 20);
Expand All @@ -542,10 +550,10 @@ mod tests {
#[test]
fn test_new_empty() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);
let r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.new_empty();
let r2 = r1.new_empty();
r2.try_grow(5).unwrap();

assert_eq!(r1.size(), 20);
Expand All @@ -559,7 +567,7 @@ mod tests {
let mut r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.take();
let r2 = r1.take();
r2.try_grow(5).unwrap();

assert_eq!(r1.size(), 0);
Expand Down
Loading