-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Do not require mut in memory reservation methods #19759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,7 @@ | |
| //! [`MemoryPool`] for memory management during query execution, [`proxy`] for | ||
| //! help with allocation accounting. | ||
|
|
||
| use datafusion_common::{Result, internal_err}; | ||
| use datafusion_common::{Result, internal_datafusion_err}; | ||
| use std::hash::{Hash, Hasher}; | ||
| use std::{cmp::Ordering, sync::Arc, sync::atomic}; | ||
|
|
||
|
|
@@ -322,7 +322,7 @@ impl MemoryConsumer { | |
| pool: Arc::clone(pool), | ||
| consumer: self, | ||
| }), | ||
| size: 0, | ||
| size: atomic::AtomicUsize::new(0), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -351,13 +351,13 @@ impl Drop for SharedRegistration { | |
| #[derive(Debug)] | ||
| pub struct MemoryReservation { | ||
| registration: Arc<SharedRegistration>, | ||
| size: usize, | ||
| size: atomic::AtomicUsize, | ||
| } | ||
|
|
||
| impl MemoryReservation { | ||
| /// Returns the size of this reservation in bytes | ||
| pub fn size(&self) -> usize { | ||
| self.size | ||
| self.size.load(atomic::Ordering::Relaxed) | ||
| } | ||
|
|
||
| /// Returns [MemoryConsumer] for this [MemoryReservation] | ||
|
|
@@ -367,8 +367,8 @@ impl MemoryReservation { | |
|
|
||
| /// Frees all bytes from this reservation back to the underlying | ||
| /// pool, returning the number of bytes freed. | ||
| pub fn free(&mut self) -> usize { | ||
| let size = self.size; | ||
| pub fn free(&self) -> usize { | ||
| let size = self.size.load(atomic::Ordering::Relaxed); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect that the reservations should be consistent (Ordering::seqcst), otherwise I would worry that we run the risk of not seeing other changes. However, Relaxed seems to be used in the MemoryPools themselves, so this is consistent
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the actual accounting that matters is done in the |
||
| if size != 0 { | ||
| self.shrink(size) | ||
| } | ||
|
|
@@ -380,60 +380,62 @@ impl MemoryReservation { | |
| /// # Panics | ||
| /// | ||
| /// Panics if `capacity` exceeds [`Self::size`] | ||
| pub fn shrink(&mut self, capacity: usize) { | ||
| let new_size = self.size.checked_sub(capacity).unwrap(); | ||
| pub fn shrink(&self, capacity: usize) { | ||
| self.size.fetch_sub(capacity, atomic::Ordering::Relaxed); | ||
| self.registration.pool.shrink(self, capacity); | ||
| self.size = new_size | ||
| } | ||
|
|
||
| /// Tries to free `capacity` bytes from this reservation | ||
| /// if `capacity` does not exceed [`Self::size`] | ||
| /// Returns new reservation size | ||
| /// or error if shrinking capacity is more than allocated size | ||
| pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> { | ||
| if let Some(new_size) = self.size.checked_sub(capacity) { | ||
| self.registration.pool.shrink(self, capacity); | ||
| self.size = new_size; | ||
| Ok(new_size) | ||
| } else { | ||
| internal_err!( | ||
| "Cannot free the capacity {capacity} out of allocated size {}", | ||
| self.size | ||
| pub fn try_shrink(&self, capacity: usize) -> Result<usize> { | ||
| let updated = self.size.fetch_update( | ||
| atomic::Ordering::Relaxed, | ||
| atomic::Ordering::Relaxed, | ||
| |prev| prev.checked_sub(capacity), | ||
| ); | ||
| updated.map_err(|_| { | ||
| let prev = self.size.load(atomic::Ordering::Relaxed); | ||
| internal_datafusion_err!( | ||
| "Cannot free the capacity {capacity} out of allocated size {prev}" | ||
| ) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| /// Sets the size of this reservation to `capacity` | ||
| pub fn resize(&mut self, capacity: usize) { | ||
| match capacity.cmp(&self.size) { | ||
| Ordering::Greater => self.grow(capacity - self.size), | ||
| Ordering::Less => self.shrink(self.size - capacity), | ||
| pub fn resize(&self, capacity: usize) { | ||
| let size = self.size.load(atomic::Ordering::Relaxed); | ||
| match capacity.cmp(&size) { | ||
| Ordering::Greater => self.grow(capacity - size), | ||
| Ordering::Less => self.shrink(size - capacity), | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| /// Try to set the size of this reservation to `capacity` | ||
| pub fn try_resize(&mut self, capacity: usize) -> Result<()> { | ||
| match capacity.cmp(&self.size) { | ||
| Ordering::Greater => self.try_grow(capacity - self.size)?, | ||
| Ordering::Less => self.shrink(self.size - capacity), | ||
| pub fn try_resize(&self, capacity: usize) -> Result<()> { | ||
| let size = self.size.load(atomic::Ordering::Relaxed); | ||
| match capacity.cmp(&size) { | ||
| Ordering::Greater => self.try_grow(capacity - size)?, | ||
| Ordering::Less => self.shrink(size - capacity), | ||
| _ => {} | ||
| }; | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Increase the size of this reservation by `capacity` bytes | ||
| pub fn grow(&mut self, capacity: usize) { | ||
| pub fn grow(&self, capacity: usize) { | ||
| self.registration.pool.grow(self, capacity); | ||
| self.size += capacity; | ||
| self.size.fetch_add(capacity, atomic::Ordering::Relaxed); | ||
| } | ||
|
|
||
| /// Try to increase the size of this reservation by `capacity` | ||
| /// bytes, returning error if there is insufficient capacity left | ||
| /// in the pool. | ||
| pub fn try_grow(&mut self, capacity: usize) -> Result<()> { | ||
| pub fn try_grow(&self, capacity: usize) -> Result<()> { | ||
| self.registration.pool.try_grow(self, capacity)?; | ||
| self.size += capacity; | ||
| self.size.fetch_add(capacity, atomic::Ordering::Relaxed); | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -447,26 +449,32 @@ impl MemoryReservation { | |
| /// # Panics | ||
| /// | ||
| /// Panics if `capacity` exceeds [`Self::size`] | ||
| pub fn split(&mut self, capacity: usize) -> MemoryReservation { | ||
| self.size = self.size.checked_sub(capacity).unwrap(); | ||
| pub fn split(&self, capacity: usize) -> MemoryReservation { | ||
| self.size | ||
| .fetch_update( | ||
| atomic::Ordering::Relaxed, | ||
| atomic::Ordering::Relaxed, | ||
| |prev| prev.checked_sub(capacity), | ||
| ) | ||
| .unwrap(); | ||
| Self { | ||
| size: capacity, | ||
| size: atomic::AtomicUsize::new(capacity), | ||
| registration: Arc::clone(&self.registration), | ||
| } | ||
| } | ||
|
|
||
| /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`] | ||
| pub fn new_empty(&self) -> Self { | ||
| Self { | ||
| size: 0, | ||
| size: atomic::AtomicUsize::new(0), | ||
| registration: Arc::clone(&self.registration), | ||
| } | ||
| } | ||
|
|
||
| /// Splits off all the bytes from this [`MemoryReservation`] into | ||
| /// a new [`MemoryReservation`] with the same [`MemoryConsumer`] | ||
| pub fn take(&mut self) -> MemoryReservation { | ||
| self.split(self.size) | ||
| self.split(self.size.load(atomic::Ordering::Relaxed)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -492,7 +500,7 @@ mod tests { | |
| #[test] | ||
| fn test_memory_pool_underflow() { | ||
| let pool = Arc::new(GreedyMemoryPool::new(50)) as _; | ||
| let mut a1 = MemoryConsumer::new("a1").register(&pool); | ||
| let a1 = MemoryConsumer::new("a1").register(&pool); | ||
| assert_eq!(pool.reserved(), 0); | ||
|
|
||
| a1.grow(100); | ||
|
|
@@ -507,7 +515,7 @@ mod tests { | |
| a1.try_grow(30).unwrap(); | ||
| assert_eq!(pool.reserved(), 30); | ||
|
|
||
| let mut a2 = MemoryConsumer::new("a2").register(&pool); | ||
| let a2 = MemoryConsumer::new("a2").register(&pool); | ||
| a2.try_grow(25).unwrap_err(); | ||
| assert_eq!(pool.reserved(), 30); | ||
|
|
||
|
|
@@ -521,7 +529,7 @@ mod tests { | |
| #[test] | ||
| fn test_split() { | ||
| let pool = Arc::new(GreedyMemoryPool::new(50)) as _; | ||
| let mut r1 = MemoryConsumer::new("r1").register(&pool); | ||
| let r1 = MemoryConsumer::new("r1").register(&pool); | ||
|
|
||
| r1.try_grow(20).unwrap(); | ||
| assert_eq!(r1.size(), 20); | ||
|
|
@@ -542,10 +550,10 @@ mod tests { | |
| #[test] | ||
| fn test_new_empty() { | ||
| let pool = Arc::new(GreedyMemoryPool::new(50)) as _; | ||
| let mut r1 = MemoryConsumer::new("r1").register(&pool); | ||
| let r1 = MemoryConsumer::new("r1").register(&pool); | ||
|
|
||
| r1.try_grow(20).unwrap(); | ||
| let mut r2 = r1.new_empty(); | ||
| let r2 = r1.new_empty(); | ||
| r2.try_grow(5).unwrap(); | ||
|
|
||
| assert_eq!(r1.size(), 20); | ||
|
|
@@ -559,7 +567,7 @@ mod tests { | |
| let mut r1 = MemoryConsumer::new("r1").register(&pool); | ||
|
|
||
| r1.try_grow(20).unwrap(); | ||
| let mut r2 = r1.take(); | ||
| let r2 = r1.take(); | ||
| r2.try_grow(5).unwrap(); | ||
|
|
||
| assert_eq!(r1.size(), 0); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this technically this is a breaking API change? I thought about it and from what I can tell the answer is no as to all this API in DataFusion 52 the caller needs a
mutand in 53 would not (but could still call it with mut even though that is not needed)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, this is not a breaking API change.
One thing that could happen is that people have some clippy lint that goes off in case of "unused muts". In that case people will start seeing new clippy warnings with DataFusion 53 in their own code.