Skip to content

Commit 3574960

Browse files
authored
[branch-53] perf: sort replace free()->try_grow() pattern with try_resize() to reduce memory pool interactions (#20733)
Backport #20729 to `branch-53`.
1 parent f8a03d8 commit 3574960

1 file changed

Lines changed: 20 additions & 30 deletions

File tree

  • datafusion/physical-plan/src/sorts

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -730,37 +730,27 @@ impl ExternalSorter {
730730
// Sort the batch immediately and get all output batches
731731
let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
732732

733-
// Free the old reservation and grow it to match the actual sorted output size
734-
reservation.free();
733+
// Resize the reservation to match the actual sorted output size.
734+
// Using try_resize avoids a release-then-reacquire cycle, which
735+
// matters for MemoryPool implementations where grow/shrink have
736+
// non-trivial cost (e.g. JNI calls in Comet).
737+
let total_sorted_size: usize = sorted_batches
738+
.iter()
739+
.map(get_record_batch_memory_size)
740+
.sum();
741+
reservation
742+
.try_resize(total_sorted_size)
743+
.map_err(Self::err_with_oom_context)?;
735744

736-
Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation))
737-
})
738-
.then({
739-
move |batches| async move {
740-
match batches {
741-
Ok((schema, sorted_batches, reservation)) => {
742-
// Calculate the total size of sorted batches
743-
let total_sorted_size: usize = sorted_batches
744-
.iter()
745-
.map(get_record_batch_memory_size)
746-
.sum();
747-
reservation
748-
.try_grow(total_sorted_size)
749-
.map_err(Self::err_with_oom_context)?;
750-
751-
// Wrap in ReservationStream to hold the reservation
752-
Ok(Box::pin(ReservationStream::new(
753-
Arc::clone(&schema),
754-
Box::pin(RecordBatchStreamAdapter::new(
755-
schema,
756-
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
757-
)),
758-
reservation,
759-
)) as SendableRecordBatchStream)
760-
}
761-
Err(e) => Err(e),
762-
}
763-
}
745+
// Wrap in ReservationStream to hold the reservation
746+
Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
747+
Arc::clone(&schema),
748+
Box::pin(RecordBatchStreamAdapter::new(
749+
Arc::clone(&schema),
750+
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
751+
)),
752+
reservation,
753+
)) as SendableRecordBatchStream)
764754
})
765755
.try_flatten()
766756
.map(move |batch| match batch {

0 commit comments

Comments
 (0)