Skip to content

Commit 204927c

Browse files
committed
Remove unnecessary Mutex in SharedMemoryReservation
1 parent ec92925 commit 204927c

3 files changed

Lines changed: 10 additions & 14 deletions

File tree

datafusion/physical-plan/src/common.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ use datafusion_common::{Result, plan_err};
3333
use datafusion_execution::memory_pool::MemoryReservation;
3434

3535
use futures::{StreamExt, TryStreamExt};
36-
use parking_lot::Mutex;
3736

3837
/// [`MemoryReservation`] used across query execution streams
39-
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
38+
pub(crate) type SharedMemoryReservation = Arc<MemoryReservation>;
4039

4140
/// Create a vector of record batches from a stream
4241
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen
8282
use datafusion_common::hash_utils::RandomState;
8383
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8484
use futures::{Stream, StreamExt, ready};
85-
use parking_lot::Mutex;
8685

8786
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
8887

@@ -549,12 +548,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
549548
let enforce_batch_size_in_joins =
550549
context.session_config().enforce_batch_size_in_joins();
551550

552-
let reservation = Arc::new(Mutex::new(
551+
let reservation = Arc::new(
553552
MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
554553
.register(context.memory_pool()),
555-
));
554+
);
556555
if let Some(g) = graph.as_ref() {
557-
reservation.lock().try_grow(g.size())?;
556+
reservation.try_grow(g.size())?;
558557
}
559558

560559
if enforce_batch_size_in_joins {
@@ -1745,7 +1744,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
17451744
let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
17461745
let capacity = self.size();
17471746
self.metrics.stream_memory_usage.set(capacity);
1748-
self.reservation.lock().try_resize(capacity)?;
1747+
self.reservation.try_resize(capacity)?;
17491748
Ok(result)
17501749
}
17511750
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,11 @@ impl RepartitionExecState {
322322

323323
let mut channels = HashMap::with_capacity(txs.len());
324324
for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
325-
let reservation = Arc::new(Mutex::new(
325+
let reservation = Arc::new(
326326
MemoryConsumer::new(format!("{name}[{partition}]"))
327327
.with_can_spill(true)
328328
.register(context.memory_pool()),
329-
));
329+
);
330330

331331
// Create spill channels based on mode:
332332
// - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering
@@ -1401,7 +1401,7 @@ impl RepartitionExec {
14011401
// if there is still a receiver, send to it
14021402
if let Some(channel) = output_channels.get_mut(&partition) {
14031403
let (batch_to_send, is_memory_batch) =
1404-
match channel.reservation.lock().try_grow(size) {
1404+
match channel.reservation.try_grow(size) {
14051405
Ok(_) => {
14061406
// Memory available - send in-memory batch
14071407
(RepartitionBatch::Memory(batch), true)
@@ -1419,7 +1419,7 @@ impl RepartitionExec {
14191419
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
14201420
// Only shrink memory if it was a memory batch
14211421
if is_memory_batch {
1422-
channel.reservation.lock().shrink(size);
1422+
channel.reservation.shrink(size);
14231423
}
14241424
output_channels.remove(&partition);
14251425
}
@@ -1638,9 +1638,7 @@ impl PerPartitionStream {
16381638
Some(Some(v)) => match v {
16391639
Ok(RepartitionBatch::Memory(batch)) => {
16401640
// Release memory and return batch
1641-
self.reservation
1642-
.lock()
1643-
.shrink(batch.get_array_memory_size());
1641+
self.reservation.shrink(batch.get_array_memory_size());
16441642
return Poll::Ready(Some(Ok(batch)));
16451643
}
16461644
Ok(RepartitionBatch::Spilled) => {

0 commit comments

Comments
 (0)