feat: enable external reclaim for mem spillable df operators#21425
feat: enable external reclaim for mem spillable df operators#21425nathanb9 wants to merge 2 commits intoapache:mainfrom
Conversation
| Ok(()) | ||
| } | ||
|
|
||
| fn reclaim( |
There was a problem hiding this comment.
Since TrackConsumersPool wraps an arbitrary I: MemoryPool, this override ends up hiding any reclaim logic implemented by the inner pool.
If someone wraps a custom reclaim-aware pool just to get the top-consumer diagnostics, reclaim will only use the tracked callbacks here and never delegate.
Should we consider calling self.inner.reclaim(...) as a fallback after trying local candidates? Or alternatively, document clearly that this wrapper fully owns reclaim selection?
There was a problem hiding this comment.
Im not entirely sure what best approach is for this.
For now I will experiment with using tracked consumer reclaim first, then fall back to inner.reclaim(...) for any remaining bytes. It makes sense to me but want to see behavior in memory stressed workloads
|
@kosiew Thank you for reviewing. Didn't mean to take this out of draft as im still testing things and there are bugs! If its okay with you I will bring this back from draft once I've addressed them and @ you. |
|
One question better to understand concept. From what I understand velox will stop execution of operator before letting reclaimer access operator state, how would this proposal handle such case? for discussion, what if spillable operators get access to 'reclaim stream' which they must poll together with 'batch stream" this way operator could either handle spill or batch so the state is never mutated concurrently |
| /// | ||
| /// `exclude_consumer_id`, when provided, identifies the current requester and should not be | ||
| /// reclaimed from to avoid re-entering the same operator while it is mid-allocation. | ||
| fn reclaim( |
There was a problem hiding this comment.
Reclamation of memory usually involves spilling internal states to disk, which is usually async. Defining reclaim as a synchronous interface might be unnatural.
| fn reclaim( | ||
| &self, | ||
| target_bytes: usize, | ||
| exclude_consumer_id: Option<usize>, | ||
| ) -> Result<usize> { |
There was a problem hiding this comment.
TrackConsumersPool needs to be redesigned as we introduce cooperative memory management: tracking consumers would be a hard requirement for cooperative memory management to work. Defining it as a decorator would be a bit strange:
GreedyMemoryPoolwon't reclaim other consumer's memory unless being used withTrackConsumersPool.- There is no point reclaiming other consumer's memory when using
FairSpillPool.
I think we can define a common ConsumerTracker and integrate it into both GreedyMemoryPool and FairSpillPool. GreedyMemoryPool::reclaim implements cooperative memory reclamation while FairSpillPool just need an empty implementation. Having consumer tracking always on won't bring too much performance penalty while enabling rich memory usage reporting on memory reservation failures, and I haven't seen a reason for not enabling it in practice.
|
I think the major considerations are 1. It is a blocking operation as stated in #21425 (comment) 2. All spilling operators have to explicitly implement this method to clear the buffer Here is an alternative idea: allow the existing This approach reuses the existing API to support similar behavior. The key difference from the proposed |
|
Thanks for looking.
@milenkovicm Yep, thats my understanding of velox as well. nonReclaimableSection_ = false;
pool->reserve(bytes); // safe
nonReclaimableSection_ = false;But seems like antipattern which requires bunch of locks.... unreasonable to push this concurrency problem constraint onto each workers implementation. So re-considering this and leaning towards exploring reclaim stream youre suggesting. |
Thanks for taking the time @2010YOUY01 @Kontinuation. Any thoughts on the above?^ I think this is a nice pattern and also addresses all major concerns. Conceptually, stream is most intuitive to me and most critically implementing the operators reclaim with needs no locks / impossible concurrent access, and can continue doing async spills IO. Im working on experimenting with this now. One concern ive found experimenting with doing this with streams: Basically we could one operator A which is streaming data to another B which runs out memory and wants to reclaim from A. |
Processing next batch and reclaim memory in the same loop may work for pipeline-execution engine, but not work well for volcano style execution engine such as DataFusion. In a pipeline execution engine, a central scheduler manages control flow and invokes operators directly. Because RepartitionExec and SortExec are decoupled in the scheduler's task logic, the engine can emit a reclamation request to SortExec even while RepartitionExec is active, without risking a call-stack deadlock. In DataFusion’s async Volcano model, the control flow is nested within the operators themselves via async streams. This means SortExec and RepartitionExec are coupled in the async poll stack. If a child operator (RepartitionExec) blocks while waiting for a memory reclamation response from its parent (SortExec), it creates a circular dependency: the parent cannot spill because it is suspended awaiting the child's output. To avoid this, we should embrace the concurrent nature of memory allocation and reclamation. Spillable operators must implement spilling as an asynchronous, independent operation that does not rely on the current execution stack. We should establish clear implementation guidelines for these operators to ensure they can safely handle reclamation requests while in a suspended state. |
Proposed (LATEST from discussion and suggestions below)
poll_next(), which is driven by a single tokio task per partition. The reclaim channel is checked at the start ofpoll_next(). The pool sends to the channel and waits and it does not touch operator state directly. By the time the pool'stry_send()returns, the channel buffer holds the request; the operator will process it on its next poll, which necessarily happens after the current poll completes (or in the current poll before any state mutation).Follow up work