-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: enable external reclaim for mem spillable df operators #21425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,16 @@ | |
| // under the License. | ||
|
|
||
| use crate::memory_pool::{ | ||
| MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size, | ||
| MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation, | ||
| human_readable_size, | ||
| }; | ||
| use datafusion_common::HashMap; | ||
| use datafusion_common::{DataFusionError, Result, resources_datafusion_err}; | ||
| use log::debug; | ||
| use parking_lot::Mutex; | ||
| use std::{ | ||
| num::NonZeroUsize, | ||
| sync::Arc, | ||
| sync::atomic::{AtomicUsize, Ordering}, | ||
| }; | ||
|
|
||
|
|
@@ -269,12 +271,24 @@ fn insufficient_capacity_err( | |
| ) | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct TrackedConsumer { | ||
| name: String, | ||
| can_spill: bool, | ||
| reserved: AtomicUsize, | ||
| peak: AtomicUsize, | ||
| reclaimer: Option<Arc<dyn MemoryReclaimer>>, | ||
| } | ||
|
|
||
| impl std::fmt::Debug for TrackedConsumer { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("TrackedConsumer") | ||
| .field("name", &self.name) | ||
| .field("can_spill", &self.can_spill) | ||
| .field("reserved", &self.reserved()) | ||
| .field("peak", &self.peak()) | ||
| .field("has_reclaimer", &self.reclaimer.is_some()) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| impl TrackedConsumer { | ||
|
|
@@ -428,6 +442,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> { | |
| can_spill: consumer.can_spill(), | ||
| reserved: Default::default(), | ||
| peak: Default::default(), | ||
| reclaimer: consumer.reclaimer(), | ||
| }, | ||
| ); | ||
|
|
||
|
|
@@ -488,6 +503,50 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn reclaim( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since If someone wraps a custom reclaim-aware pool just to get the top-consumer diagnostics, Should we consider calling
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Im not entirely sure what best approach is for this. For now I will experiment with using tracked consumer reclaim first, then fall back to |
||
| &self, | ||
| target_bytes: usize, | ||
| exclude_consumer_id: Option<usize>, | ||
| ) -> Result<usize> { | ||
|
Comment on lines
+506
to
+510
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we can define a common |
||
| if target_bytes == 0 { | ||
| return Ok(0); | ||
| } | ||
|
|
||
| let mut candidates = self | ||
| .tracked_consumers | ||
| .lock() | ||
| .iter() | ||
| .filter_map(|(consumer_id, tracked_consumer)| { | ||
| let reserved = tracked_consumer.reserved(); | ||
| let reclaimer = tracked_consumer.reclaimer.as_ref()?; | ||
| if exclude_consumer_id == Some(*consumer_id) | ||
| || !tracked_consumer.can_spill | ||
| || reserved == 0 | ||
| { | ||
| return None; | ||
| } | ||
|
|
||
| Some((*consumer_id, reserved, Arc::clone(reclaimer))) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| candidates.sort_by( | ||
| |(left_id, left_reserved, _), (right_id, right_reserved, _)| { | ||
| right_reserved | ||
| .cmp(left_reserved) | ||
| .then_with(|| left_id.cmp(right_id)) | ||
| }, | ||
| ); | ||
|
|
||
| let mut reclaimed = 0; | ||
| for (_, _, reclaimer) in candidates { | ||
| if reclaimed >= target_bytes { | ||
| break; | ||
| } | ||
| reclaimed += reclaimer.reclaim(target_bytes - reclaimed)?; | ||
| } | ||
| Ok(reclaimed) | ||
| } | ||
|
|
||
| fn reserved(&self) -> usize { | ||
| self.inner.reserved() | ||
| } | ||
|
|
@@ -513,6 +572,24 @@ mod tests { | |
| use insta::{Settings, allow_duplicates, assert_snapshot}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Debug)] | ||
| struct TestReclaimer { | ||
| reservation: Arc<Mutex<Option<Arc<MemoryReservation>>>>, | ||
| } | ||
|
|
||
| impl MemoryReclaimer for TestReclaimer { | ||
| fn reclaim(&self, target_bytes: usize) -> Result<usize> { | ||
| let Some(reservation) = self.reservation.lock().clone() else { | ||
| return Ok(0); | ||
| }; | ||
| let reclaimed = reservation.size().min(target_bytes); | ||
| if reclaimed > 0 { | ||
| reservation.shrink(reclaimed); | ||
| } | ||
| Ok(reclaimed) | ||
| } | ||
| } | ||
|
|
||
| fn make_settings() -> Settings { | ||
| let mut settings = Settings::clone_current(); | ||
| settings.add_filter( | ||
|
|
@@ -811,4 +888,80 @@ mod tests { | |
| r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B. | ||
| "); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_tracked_consumers_pool_reclaim_prefers_largest_consumer() { | ||
| let pool = Arc::new(TrackConsumersPool::new( | ||
| GreedyMemoryPool::new(200), | ||
| NonZeroUsize::new(3).unwrap(), | ||
| )) as Arc<dyn MemoryPool>; | ||
|
|
||
| let first_reservation_handle = Arc::new(Mutex::new(None)); | ||
| let first = Arc::new( | ||
| MemoryConsumer::new("spillable-1") | ||
| .with_can_spill(true) | ||
| .with_reclaimer(Arc::new(TestReclaimer { | ||
| reservation: Arc::clone(&first_reservation_handle), | ||
| })) | ||
| .register(&pool), | ||
| ); | ||
| *first_reservation_handle.lock() = Some(Arc::clone(&first)); | ||
| first.grow(100); | ||
|
|
||
| let second_reservation_handle = Arc::new(Mutex::new(None)); | ||
| let second = Arc::new( | ||
| MemoryConsumer::new("spillable-2") | ||
| .with_can_spill(true) | ||
| .with_reclaimer(Arc::new(TestReclaimer { | ||
| reservation: Arc::clone(&second_reservation_handle), | ||
| })) | ||
| .register(&pool), | ||
| ); | ||
| *second_reservation_handle.lock() = Some(Arc::clone(&second)); | ||
| second.grow(60); | ||
|
|
||
| let reclaimed = pool.reclaim(80, None).unwrap(); | ||
|
|
||
| assert_eq!(reclaimed, 80); | ||
| assert_eq!(first.size(), 20); | ||
| assert_eq!(second.size(), 60); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_tracked_consumers_pool_reclaim_excludes_requester() { | ||
| let pool = Arc::new(TrackConsumersPool::new( | ||
| GreedyMemoryPool::new(200), | ||
| NonZeroUsize::new(3).unwrap(), | ||
| )) as Arc<dyn MemoryPool>; | ||
|
|
||
| let first_reservation_handle = Arc::new(Mutex::new(None)); | ||
| let first = Arc::new( | ||
| MemoryConsumer::new("spillable-1") | ||
| .with_can_spill(true) | ||
| .with_reclaimer(Arc::new(TestReclaimer { | ||
| reservation: Arc::clone(&first_reservation_handle), | ||
| })) | ||
| .register(&pool), | ||
| ); | ||
| *first_reservation_handle.lock() = Some(Arc::clone(&first)); | ||
| first.grow(100); | ||
|
|
||
| let second_reservation_handle = Arc::new(Mutex::new(None)); | ||
| let second = Arc::new( | ||
| MemoryConsumer::new("spillable-2") | ||
| .with_can_spill(true) | ||
| .with_reclaimer(Arc::new(TestReclaimer { | ||
| reservation: Arc::clone(&second_reservation_handle), | ||
| })) | ||
| .register(&pool), | ||
| ); | ||
| *second_reservation_handle.lock() = Some(Arc::clone(&second)); | ||
| second.grow(60); | ||
|
|
||
| let reclaimed = pool.reclaim(80, Some(first.consumer().id())).unwrap(); | ||
|
|
||
| assert_eq!(reclaimed, 60); | ||
| assert_eq!(first.size(), 100); | ||
| assert_eq!(second.size(), 0); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reclamation of memory usually involves spilling internal states to disk, which is usually async. Defining
reclaimas a synchronous interface might be unnatural.