Skip to content

Commit 61fe692

Browse files
authored
Remove unnecessary Mutex in SharedMemoryReservation (#21899)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> After #19759, all the methods of a `MemoryReservation` take &self instead of &mut self, so no need to hide a `SharedMemoryReservation` behind a Mutex. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Remove an unnecessary Mutex ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> By existing unit tests ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent e8a93bb commit 61fe692

3 files changed

Lines changed: 10 additions & 14 deletions

File tree

datafusion/physical-plan/src/common.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ use datafusion_common::{Result, plan_err};
3333
use datafusion_execution::memory_pool::MemoryReservation;
3434

3535
use futures::{StreamExt, TryStreamExt};
36-
use parking_lot::Mutex;
3736

3837
/// [`MemoryReservation`] used across query execution streams
39-
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
38+
pub(crate) type SharedMemoryReservation = Arc<MemoryReservation>;
4039

4140
/// Create a vector of record batches from a stream
4241
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen
8282
use datafusion_common::hash_utils::RandomState;
8383
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8484
use futures::{Stream, StreamExt, ready};
85-
use parking_lot::Mutex;
8685

8786
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
8887

@@ -549,12 +548,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
549548
let enforce_batch_size_in_joins =
550549
context.session_config().enforce_batch_size_in_joins();
551550

552-
let reservation = Arc::new(Mutex::new(
551+
let reservation = Arc::new(
553552
MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
554553
.register(context.memory_pool()),
555-
));
554+
);
556555
if let Some(g) = graph.as_ref() {
557-
reservation.lock().try_grow(g.size())?;
556+
reservation.try_grow(g.size())?;
558557
}
559558

560559
if enforce_batch_size_in_joins {
@@ -1745,7 +1744,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
17451744
let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
17461745
let capacity = self.size();
17471746
self.metrics.stream_memory_usage.set(capacity);
1748-
self.reservation.lock().try_resize(capacity)?;
1747+
self.reservation.try_resize(capacity)?;
17491748
Ok(result)
17501749
}
17511750
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,11 @@ impl RepartitionExecState {
322322

323323
let mut channels = HashMap::with_capacity(txs.len());
324324
for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
325-
let reservation = Arc::new(Mutex::new(
325+
let reservation = Arc::new(
326326
MemoryConsumer::new(format!("{name}[{partition}]"))
327327
.with_can_spill(true)
328328
.register(context.memory_pool()),
329-
));
329+
);
330330

331331
// Create spill channels based on mode:
332332
// - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering
@@ -1401,7 +1401,7 @@ impl RepartitionExec {
14011401
// if there is still a receiver, send to it
14021402
if let Some(channel) = output_channels.get_mut(&partition) {
14031403
let (batch_to_send, is_memory_batch) =
1404-
match channel.reservation.lock().try_grow(size) {
1404+
match channel.reservation.try_grow(size) {
14051405
Ok(_) => {
14061406
// Memory available - send in-memory batch
14071407
(RepartitionBatch::Memory(batch), true)
@@ -1419,7 +1419,7 @@ impl RepartitionExec {
14191419
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
14201420
// Only shrink memory if it was a memory batch
14211421
if is_memory_batch {
1422-
channel.reservation.lock().shrink(size);
1422+
channel.reservation.shrink(size);
14231423
}
14241424
output_channels.remove(&partition);
14251425
}
@@ -1638,9 +1638,7 @@ impl PerPartitionStream {
16381638
Some(Some(v)) => match v {
16391639
Ok(RepartitionBatch::Memory(batch)) => {
16401640
// Release memory and return batch
1641-
self.reservation
1642-
.lock()
1643-
.shrink(batch.get_array_memory_size());
1641+
self.reservation.shrink(batch.get_array_memory_size());
16441642
return Poll::Ready(Some(Ok(batch)));
16451643
}
16461644
Ok(RepartitionBatch::Spilled) => {

0 commit comments

Comments
 (0)