Skip to content

feat: enable external reclaim for mem spillable df operators#21425

Open
nathanb9 wants to merge 2 commits intoapache:mainfrom
nathanb9:enable-external-operator-reclaim-spill-hooks-for-external-memory-managers
Open

feat: enable external reclaim for mem spillable df operators#21425
nathanb9 wants to merge 2 commits intoapache:mainfrom
nathanb9:enable-external-operator-reclaim-spill-hooks-for-external-memory-managers

Conversation

@nathanb9
Copy link
Copy Markdown
Contributor

@nathanb9 nathanb9 commented Apr 7, 2026

Proposed (LATEST from discussion and suggestions below)

  1. With streams the operator's state is mutated only inside poll_next(), which is driven by a single tokio task per partition. The reclaim channel is checked at the start of poll_next(). The pool sends to the channel and waits and it does not touch operator state directly. By the time the pool's try_send() returns, the channel buffer holds the request; the operator will process it on its next poll, which necessarily happens after the current poll completes (or in the current poll before any state mutation).
                                MemoryPool
  ┌────────────────────────────────────────────────────────────────────┐
  │                     ConsumerTracker (built-in)                     │
  │  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐  │
  │  │  Consumer A      │  │  Consumer B      │  │  Consumer C      │  │
  │  │  name: Sorter[0] │  │  name: Agg[0]    │  │  name: Repart[0] │  │
  │  │  reserved: 145MB │  │  reserved:  50MB │  │  reserved:   3MB │  │
  │  │  can_spill: true │  │  can_spill: true │  │  can_spill: false│  │
  │  │  reclaim_tx: Some│  │  reclaim_tx: Some│  │  reclaim_tx: None│  │
  │  └──────────────────┘  └──────────────────┘  └──────────────────┘  │
  └────────────────────────────────────────────────────────────────────┘
          │ try_grow() fails               │ external spill(size)
          │ OR external reclaim()          │ (Spark/Comet JNI)
          ▼                                ▼
     pool.reclaim(needed, exclude_id) ─────┘
          │
          │ 1. Snapshot candidates under lock, sort largest-first
          │ 2. Release lock
          │ 3. Send ReclaimRequest to each via channel
          │ 4. Wait for response with bounded timeout
          │
          ▼
  ┌───────────────────┐      ┌────────────────────┐
  │  ExternalSorter   │      │  HashAggStream     │
  │  poll_next() {    │      │  poll_next() {     │
  │    // check first │      │    // check first  │
  │    if let Some(r) │      │    if let Some(r)  │
  │    = reclaim_rx   │      │    = reclaim_rx    │
  │      .try_recv()  │      │      .try_recv()   │
  │    {              │      │    {               │
  │      spill(r.tgt) │      │      flush(r.tgt)  │
  │      r.resp.send()│      │      r.resp.send() │
  │    }              │      │    }               │
  │    // normal work │      │    // normal work  │
  │  }                │      │  }                 │
  └───────────────────┘      └────────────────────┘

Follow up work

@github-actions github-actions Bot added the execution Related to the execution crate label Apr 7, 2026
Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nathanb9
Thanks for working on this.
I have a few concerns around reclaimer semantics and composition that could lead to subtle bugs. Would love to get these clarified before merging.

Comment thread datafusion/execution/src/memory_pool/mod.rs Outdated
Comment thread datafusion/execution/src/memory_pool/mod.rs
Comment thread datafusion/execution/src/memory_pool/mod.rs
Ok(())
}

fn reclaim(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since TrackConsumersPool wraps an arbitrary I: MemoryPool, this override ends up hiding any reclaim logic implemented by the inner pool.

If someone wraps a custom reclaim-aware pool just to get the top-consumer diagnostics, reclaim will only use the tracked callbacks here and never delegate.

Should we consider calling self.inner.reclaim(...) as a fallback after trying local candidates? Or alternatively, document clearly that this wrapper fully owns reclaim selection?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not entirely sure what best approach is for this.

For now I will experiment with using tracked consumer reclaim first, then fall back to inner.reclaim(...) for any remaining bytes. It makes sense to me but want to see behavior in memory stressed workloads

@nathanb9
Copy link
Copy Markdown
Contributor Author

@kosiew Thank you for reviewing.

Didn't mean to take this out of draft as im still testing things and there are bugs!

If its okay with you I will bring this back from draft once I've addressed them and @ you.

@nathanb9 nathanb9 marked this pull request as draft April 15, 2026 23:32
@milenkovicm
Copy link
Copy Markdown
Contributor

One question better to understand concept.

From what I understand velox will stop execution of operator before letting reclaimer access operator state, how would this proposal handle such case?

for discussion, what if spillable operators get access to 'reclaim stream' which they must poll together with 'batch stream" this way operator could either handle spill or batch so the state is never mutated concurrently

///
/// `exclude_consumer_id`, when provided, identifies the current requester and should not be
/// reclaimed from to avoid re-entering the same operator while it is mid-allocation.
fn reclaim(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reclamation of memory usually involves spilling internal states to disk, which is usually async. Defining reclaim as a synchronous interface might be unnatural.

Comment on lines +506 to +510
fn reclaim(
&self,
target_bytes: usize,
exclude_consumer_id: Option<usize>,
) -> Result<usize> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrackConsumersPool needs to be redesigned as we introduce cooperative memory management: tracking consumers would be a hard requirement for cooperative memory management to work. Defining it as a decorator would be a bit strange:

  1. GreedyMemoryPool won't reclaim other consumer's memory unless being used with TrackConsumersPool.
  2. There is no point reclaiming other consumer's memory when using FairSpillPool.

I think we can define a common ConsumerTracker and integrate it into both GreedyMemoryPool and FairSpillPool. GreedyMemoryPool::reclaim implements cooperative memory reclamation while FairSpillPool just need an empty implementation. Having consumer tracking always on won't bring too much performance penalty while enabling rich memory usage reporting on memory reservation failures, and I haven't seen a reason for not enabling it in practice.

@2010YOUY01
Copy link
Copy Markdown
Contributor

I think the major considerations are 1. It is a blocking operation as stated in #21425 (comment) 2. All spilling operators have to explicitly implement this method to clear the buffer

Here is an alternative idea: allow the existing reserve() API to wait. Currently, for non-spillable consumers, a failed reservation immediately returns an error. Instead, we could let it wait for a period of time; once memory becomes available, the operation would be re-scheduled and retried.

This approach reuses the existing API to support similar behavior. The key difference from the proposed reclaim() API is that reclaim() enables operators to immediately free buffers, which is more complex to implement. As a first step, we could extend the semantics of reserve() instead—though this would likely require introducing a separate memory pool.

@nathanb9
Copy link
Copy Markdown
Contributor Author

nathanb9 commented Apr 20, 2026

Thanks for looking.

From what I understand velox will stop execution of operator before letting reclaimer access operator state, how would this proposal handle such case?

@milenkovicm Yep, thats my understanding of velox as well.
Initially, my thought was to deal with it at the operator level so basically like velox too. So something like this sample from velox:

nonReclaimableSection_ = false;
pool->reserve(bytes); // safe
nonReclaimableSection_ = false;

But seems like antipattern which requires bunch of locks.... unreasonable to push this concurrency problem constraint onto each workers implementation. So re-considering this and leaning towards exploring reclaim stream youre suggesting.

@nathanb9
Copy link
Copy Markdown
Contributor Author

@milenkovicm

for discussion, what if spillable operators get access to 'reclaim stream' which they must poll together with 'batch stream" this way operator could either handle spill or batch so the state is never mutated concurrently

Thanks for taking the time @2010YOUY01 @Kontinuation. Any thoughts on the above?^ I think this is a nice pattern and also addresses all major concerns.

Conceptually, stream is most intuitive to me and most critically implementing the operators reclaim with needs no locks / impossible concurrent access, and can continue doing async spills IO. Im working on experimenting with this now.

One concern ive found experimenting with doing this with streams:
During execution lets say we have SortExec.poll_next()RepartitionExec.poll_next()try_grow() FAILS → pool.reclaim() → sends signal to SortExec → waits for SortExec → DEADLOCK (SortExec is stuck waiting for RepartitionExec to return)

Basically we could one operator A which is streaming data to another B which runs out memory and wants to reclaim from A.

@Kontinuation
Copy link
Copy Markdown
Member

One concern ive found experimenting with doing this with streams: During execution lets say we have SortExec.poll_next()RepartitionExec.poll_next()try_grow() FAILS → pool.reclaim() → sends signal to SortExec → waits for SortExec → DEADLOCK (SortExec is stuck waiting for RepartitionExec to return)

Basically we could one operator A which is streaming data to another B which runs out memory and wants to reclaim from A.

Processing next batch and reclaim memory in the same loop may work for pipeline-execution engine, but not work well for volcano style execution engine such as DataFusion.

In a pipeline execution engine, a central scheduler manages control flow and invokes operators directly. Because RepartitionExec and SortExec are decoupled in the scheduler's task logic, the engine can emit a reclamation request to SortExec even while RepartitionExec is active, without risking a call-stack deadlock.

In DataFusion’s async Volcano model, the control flow is nested within the operators themselves via async streams. This means SortExec and RepartitionExec are coupled in the async poll stack. If a child operator (RepartitionExec) blocks while waiting for a memory reclamation response from its parent (SortExec), it creates a circular dependency: the parent cannot spill because it is suspended awaiting the child's output.

To avoid this, we should embrace the concurrent nature of memory allocation and reclamation. Spillable operators must implement spilling as an asynchronous, independent operation that does not rely on the current execution stack. We should establish clear implementation guidelines for these operators to ensure they can safely handle reclamation requests while in a suspended state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

execution Related to the execution crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants