Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// On error the `allocation` will not be increased in size
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;

/// Attempt to reclaim `target_bytes` from existing spillable consumers already registered
/// with this pool.
///
/// `exclude_consumer_id`, when provided, identifies the current requester and should not be
/// reclaimed from to avoid re-entering the same operator while it is mid-allocation.
fn reclaim(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reclamation of memory usually involves spilling internal states to disk, which is usually async. Defining reclaim as a synchronous interface might be unnatural.

&self,
_target_bytes: usize,
_exclude_consumer_id: Option<usize>,
) -> Result<usize> {
Ok(0)
}

/// Return the total amount of memory reserved
fn reserved(&self) -> usize;

Expand Down Expand Up @@ -240,11 +253,22 @@ pub enum MemoryLimit {
/// For help with allocation accounting, see the [`proxy`] module.
///
/// [proxy]: datafusion_common::utils::proxy
#[derive(Debug)]
pub struct MemoryConsumer {
name: String,
can_spill: bool,
id: usize,
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
}

impl std::fmt::Debug for MemoryConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryConsumer")
.field("name", &self.name)
.field("can_spill", &self.can_spill)
.field("id", &self.id)
.field("has_reclaimer", &self.reclaimer.is_some())
.finish()
}
}

impl PartialEq for MemoryConsumer {
Expand Down Expand Up @@ -283,17 +307,24 @@ impl MemoryConsumer {
name: name.into(),
can_spill: false,
id: Self::new_unique_id(),
reclaimer: None,
}
}

/// Returns a clone of this [`MemoryConsumer`] with a new unique id,
/// which can be registered with a [`MemoryPool`],
/// This new consumer is separate from the original.
///
/// The cloned consumer intentionally does not inherit any registered
/// [`MemoryReclaimer`]. Reclaimers are expected to be tied to the original
/// spillable operator state, and carrying them across a new consumer id can
/// cause externally-triggered reclaim to target the wrong owner.
pub fn clone_with_new_id(&self) -> Self {
Self {
name: self.name.clone(),
can_spill: self.can_spill,
id: Self::new_unique_id(),
reclaimer: None,
}
}

Expand All @@ -307,6 +338,18 @@ impl MemoryConsumer {
Self { can_spill, ..self }
}

/// Configure a callback that can reclaim memory from this consumer when another consumer in
/// the same pool is under pressure.
///
/// A consumer with a reclaimer is considered spill-capable by default.
pub fn with_reclaimer(self, reclaimer: Arc<dyn MemoryReclaimer>) -> Self {
Comment thread
nathanb9 marked this conversation as resolved.
Self {
can_spill: true,
reclaimer: Some(reclaimer),
..self
}
}

/// Returns true if this allocation can spill to disk
pub fn can_spill(&self) -> bool {
self.can_spill
Expand All @@ -317,6 +360,11 @@ impl MemoryConsumer {
&self.name
}

/// Returns the reclaim callback registered for this consumer, if any.
pub fn reclaimer(&self) -> Option<Arc<dyn MemoryReclaimer>> {
self.reclaimer.clone()
}

/// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
/// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
Expand All @@ -331,6 +379,19 @@ impl MemoryConsumer {
}
}

/// Callback implemented by spillable operators that can synchronously reclaim existing
/// reservations when another consumer in the same pool is under pressure.
///
/// Implementations should:
///
/// - only report bytes actually released from pool-tracked reservations
/// - return at most `target_bytes`
/// - avoid holding strong reference cycles back to [`MemoryReservation`]s,
/// [`MemoryConsumer`]s, or other state that keeps those objects alive
pub trait MemoryReclaimer: Send + Sync {
fn reclaim(&self, target_bytes: usize) -> Result<usize>;
Comment thread
nathanb9 marked this conversation as resolved.
}

/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
///
/// Calls [`MemoryPool::unregister`] on drop to return any memory to
Expand Down Expand Up @@ -503,6 +564,16 @@ impl Drop for MemoryReservation {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

#[derive(Debug)]
struct NoopReclaimer;

impl MemoryReclaimer for NoopReclaimer {
fn reclaim(&self, _target_bytes: usize) -> Result<usize> {
Ok(0)
}
}

#[test]
fn test_id_uniqueness() {
Expand Down Expand Up @@ -629,4 +700,27 @@ mod tests {
assert_eq!(r1.size(), 0);
assert_eq!(pool.reserved(), 80);
}

#[test]
fn test_clone_with_new_id_drops_reclaimer() {
let consumer =
MemoryConsumer::new("spillable").with_reclaimer(Arc::new(NoopReclaimer));

let clone = consumer.clone_with_new_id();

assert!(consumer.reclaimer().is_some());
assert!(consumer.can_spill());
assert!(clone.reclaimer().is_none());
assert!(clone.can_spill());
assert_ne!(consumer.id(), clone.id());
}

#[test]
fn test_with_reclaimer_marks_consumer_spillable() {
let consumer =
MemoryConsumer::new("spillable").with_reclaimer(Arc::new(NoopReclaimer));

assert!(consumer.can_spill());
assert!(consumer.reclaimer().is_some());
}
}
Loading
Loading