Skip to content

Commit 631c918

Browse files
authored
perf: sort replace free()->try_grow() pattern with try_resize() to reduce memory pool interactions (#20729)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20728. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See full discussion in #20728, but the tl;dr is: > Commit 3f0b342 ("Improve sort memory resilience", #19494) added per-chunk reservation.try_grow(total_sorted_size) calls inside the sort's unfold closure (sort.rs:748-750). I believe this causes a performance regression for projects that implement custom MemoryPool backends where try_grow is not trivially cheap. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Replace the pattern of `free()`->`try_grow()` with a `try_resize()` that in the common case doesn't do much work since a sorted output batch is likely the same size as the input batch. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing tests, and we are running some longer benchmarks with Comet at the moment. When I profile TPC-H Q21 SF100 locally I see the giant stack of memory allocations that occur on DF52... <img width="639" height="197" alt="Screenshot 2026-03-05 at 2 23 15 PM" src="https://github.com/user-attachments/assets/237c7099-0bf0-48f5-a994-8959d3427377" /> ...completely disappear... <img width="546" height="122" alt="Screenshot 2026-03-05 at 2 24 31 PM" src="https://github.com/user-attachments/assets/550cddc1-fcde-486e-88d2-2156eab661b9" /> ... and TPC-H Q21 looks back to DF51 performance, at least locally. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No.
1 parent 13cebf8 commit 631c918

1 file changed

Lines changed: 20 additions & 30 deletions

File tree

  • datafusion/physical-plan/src/sorts

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -731,37 +731,27 @@ impl ExternalSorter {
731731
// Sort the batch immediately and get all output batches
732732
let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
733733

734-
// Free the old reservation and grow it to match the actual sorted output size
735-
reservation.free();
734+
// Resize the reservation to match the actual sorted output size.
735+
// Using try_resize avoids a release-then-reacquire cycle, which
736+
// matters for MemoryPool implementations where grow/shrink have
737+
// non-trivial cost (e.g. JNI calls in Comet).
738+
let total_sorted_size: usize = sorted_batches
739+
.iter()
740+
.map(get_record_batch_memory_size)
741+
.sum();
742+
reservation
743+
.try_resize(total_sorted_size)
744+
.map_err(Self::err_with_oom_context)?;
736745

737-
Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation))
738-
})
739-
.then({
740-
move |batches| async move {
741-
match batches {
742-
Ok((schema, sorted_batches, reservation)) => {
743-
// Calculate the total size of sorted batches
744-
let total_sorted_size: usize = sorted_batches
745-
.iter()
746-
.map(get_record_batch_memory_size)
747-
.sum();
748-
reservation
749-
.try_grow(total_sorted_size)
750-
.map_err(Self::err_with_oom_context)?;
751-
752-
// Wrap in ReservationStream to hold the reservation
753-
Ok(Box::pin(ReservationStream::new(
754-
Arc::clone(&schema),
755-
Box::pin(RecordBatchStreamAdapter::new(
756-
schema,
757-
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
758-
)),
759-
reservation,
760-
)) as SendableRecordBatchStream)
761-
}
762-
Err(e) => Err(e),
763-
}
764-
}
746+
// Wrap in ReservationStream to hold the reservation
747+
Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
748+
Arc::clone(&schema),
749+
Box::pin(RecordBatchStreamAdapter::new(
750+
Arc::clone(&schema),
751+
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
752+
)),
753+
reservation,
754+
)) as SendableRecordBatchStream)
765755
})
766756
.try_flatten()
767757
.map(move |batch| match batch {

0 commit comments

Comments
 (0)