diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 374fc275a06e0..d559dc4a3975d 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -91,6 +91,10 @@ tokio = { workspace = true, features = [ harness = false name = "partial_ordering" +[[bench]] +harness = false +name = "repartition" + [[bench]] harness = false name = "spill_io" diff --git a/datafusion/physical-plan/benches/repartition.rs b/datafusion/physical-plan/benches/repartition.rs new file mode 100644 index 0000000000000..90101b67dd9b2 --- /dev/null +++ b/datafusion/physical-plan/benches/repartition.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Microbenchmark for `RepartitionExec` in isolation. +//! +//! Stages a pre-built in-memory set of batches across N input partitions and +//! runs them through `RepartitionExec` to N output partitions. Drains all +//! output partitions concurrently so the measurement reflects the true +//! per-batch cost of the breaker: mpsc `send().await`, reservation lock, +//! metric timers, and `SpawnedTask`-per-input scheduling. +//! +//! Run with: +//! ```sh +//! cargo bench --bench repartition -- --sample-size=20 +//! ``` + +use std::sync::Arc; + +use arrow::array::{ArrayRef, UInt64Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::test::TestMemoryExec; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use futures::StreamExt; + +const TOTAL_ROWS: usize = 1_000_000; +const BATCH_SIZE: usize = 8_192; +const PARTITIONS: usize = 16; + +fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)])) +} + +/// Build `PARTITIONS` input partitions, each containing +/// `TOTAL_ROWS / PARTITIONS` rows split into `BATCH_SIZE`-sized batches. +fn make_partitions(schema: &SchemaRef) -> Vec> { + let per_partition = TOTAL_ROWS / PARTITIONS; + (0..PARTITIONS) + .map(|p| { + let mut batches = Vec::new(); + let mut offset: u64 = (p * per_partition) as u64; + let end: u64 = offset + per_partition as u64; + while offset < end { + let n = std::cmp::min(BATCH_SIZE as u64, end - offset) as usize; + let arr: ArrayRef = Arc::new(UInt64Array::from_iter_values( + (offset..offset + n as u64).collect::>(), + )); + batches + .push(RecordBatch::try_new(Arc::clone(schema), vec![arr]).unwrap()); + offset += n as u64; + } + batches + }) + .collect() +} + +fn build_plan( + partitions: &[Vec], + schema: &SchemaRef, + partitioning: Partitioning, +) -> Arc { + let src = TestMemoryExec::try_new_exec(partitions, Arc::clone(schema), None).unwrap(); + Arc::new(RepartitionExec::try_new(src, partitioning).unwrap()) +} + +fn drain_all( + rt: &tokio::runtime::Runtime, + plan: Arc, + task_ctx: Arc, +) { + rt.block_on(async move { + let out = plan.output_partitioning().partition_count(); + let mut handles = Vec::with_capacity(out); + for p in 0..out { + let mut stream = plan.execute(p, Arc::clone(&task_ctx)).unwrap(); + handles.push(tokio::spawn(async move { + let mut rows = 0usize; + while let Some(batch) = stream.next().await { + rows += batch.unwrap().num_rows(); + } + rows + })); + } + for h in handles { + let _ = h.await.unwrap(); + } + }); +} + +fn bench_repartition(c: &mut Criterion) { + let schema = schema(); + let partitions = make_partitions(&schema); + let hash_expr = vec![col("c0", &schema).unwrap()]; + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(PARTITIONS) + .enable_all() + .build() + .unwrap(); + let task_ctx = Arc::new(TaskContext::default()); + + c.bench_function("repartition/hash_16_to_16", |b| { + b.iter_batched( + || { + build_plan( + &partitions, + &schema, + Partitioning::Hash(hash_expr.clone(), PARTITIONS), + ) + }, + |plan| drain_all(&rt, plan, Arc::clone(&task_ctx)), + BatchSize::LargeInput, + ) + }); + + c.bench_function("repartition/round_robin_16_to_16", |b| { + b.iter_batched( + || { + build_plan( + &partitions, + &schema, + Partitioning::RoundRobinBatch(PARTITIONS), + ) + }, + |plan| drain_all(&rt, plan, Arc::clone(&task_ctx)), + BatchSize::LargeInput, + ) + }); + + c.bench_function("repartition/hash_16_to_1_coalesce", |b| { + b.iter_batched( + || { + build_plan( + &partitions, + &schema, + Partitioning::Hash(hash_expr.clone(), 1), + ) + }, + |plan| drain_all(&rt, plan, Arc::clone(&task_ctx)), + BatchSize::LargeInput, + ) + }); +} + +criterion_group!(benches, bench_repartition); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d4406360504f9..02fee70175d08 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -25,7 +25,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; -use super::common::SharedMemoryReservation; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -58,7 +57,7 @@ use datafusion_common::{ use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; -use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -143,10 +142,15 @@ type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; type InputPartitionsToCurrentPartitionReceiver = Vec>; -/// Output channel with its associated memory reservation and spill writer +/// Output channel with its associated memory reservation and spill writer. +/// +/// Every method on [`MemoryReservation`] takes `&self` and mutates atomic +/// counters internally, so the shared reservation needs no extra lock on the +/// hot path — both the producer (`try_grow`) and the consumer (`shrink`) +/// operate concurrently via atomics. struct OutputChannel { sender: DistributionSender, - reservation: SharedMemoryReservation, + reservation: Arc, spill_writer: SpillPoolWriter, } @@ -177,7 +181,7 @@ struct PartitionChannels { /// Receivers for each input partition sending data to this output partition rx: InputPartitionsToCurrentPartitionReceiver, /// Memory reservation for this output partition - reservation: SharedMemoryReservation, + reservation: Arc, /// Spill writers for writing spilled data. /// SpillPoolWriter is Clone, so multiple writers can share state in non-preserve-order mode. spill_writers: Vec, @@ -322,11 +326,11 @@ impl RepartitionExecState { let mut channels = HashMap::with_capacity(txs.len()); for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { - let reservation = Arc::new(Mutex::new( + let reservation = Arc::new( MemoryConsumer::new(format!("{name}[{partition}]")) .with_can_spill(true) .register(context.memory_pool()), - )); + ); // Create spill channels based on mode: // - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering @@ -1393,15 +1397,18 @@ impl RepartitionExec { continue; } + // Track per-partition send time by advancing a single `Instant` + // through the inner loop — one `Instant::now()` per sub-batch, + // instead of two via `ScopedTimerGuard::{timer, done}`. + let mut last = datafusion_common::instant::Instant::now(); for res in partitioner.partition_iter(batch)? { let (partition, batch) = res?; let size = batch.get_array_memory_size(); - let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it if let Some(channel) = output_channels.get_mut(&partition) { let (batch_to_send, is_memory_batch) = - match channel.reservation.lock().try_grow(size) { + match channel.reservation.try_grow(size) { Ok(_) => { // Memory available - send in-memory batch (RepartitionBatch::Memory(batch), true) @@ -1419,12 +1426,14 @@ impl RepartitionExec { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // Only shrink memory if it was a memory batch if is_memory_batch { - channel.reservation.lock().shrink(size); + channel.reservation.shrink(size); } output_channels.remove(&partition); } } - timer.done(); + let now = datafusion_common::instant::Instant::now(); + metrics.send_time[partition].add_duration(now - last); + last = now; } // If the input stream is endless, we may spin forever and @@ -1567,7 +1576,7 @@ struct PerPartitionStream { _drop_helper: Arc>>, /// Memory reservation. - reservation: SharedMemoryReservation, + reservation: Arc, /// Infinite stream for reading from the spill pool spill_stream: SendableRecordBatchStream, @@ -1593,7 +1602,7 @@ impl PerPartitionStream { schema: SchemaRef, receiver: DistributionReceiver, drop_helper: Arc>>, - reservation: SharedMemoryReservation, + reservation: Arc, spill_stream: SendableRecordBatchStream, num_input_partitions: usize, baseline_metrics: BaselineMetrics, @@ -1638,9 +1647,7 @@ impl PerPartitionStream { Some(Some(v)) => match v { Ok(RepartitionBatch::Memory(batch)) => { // Release memory and return batch - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); + self.reservation.shrink(batch.get_array_memory_size()); return Poll::Ready(Some(Ok(batch))); } Ok(RepartitionBatch::Spilled) => {