perf(repartition): drop redundant reservation Mutex and halve per-batch Instant::now() calls#21745
perf(repartition): drop redundant reservation Mutex and halve per-batch Instant::now() calls#21745Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
…ch Instant::now() calls `SharedMemoryReservation` wraps `MemoryReservation` in an `Arc<Mutex<_>>`, but every method on `MemoryReservation` (`try_grow`, `shrink`, `try_resize`, ...) already takes `&self` and mutates an `AtomicUsize` internally. The extra `Mutex` is dead weight on the hot path: both the producer (`try_grow` in `pull_from_input`) and the consumer (`shrink` in `PerPartitionStream`) paid a lock acquire per batch for no correctness benefit. Type the three fields in `repartition/mod.rs` as `Arc<MemoryReservation>` and drop the three `.lock()` callsites. The shared-channel alias `SharedMemoryReservation` in `common.rs` stays as-is for now — it is still used by `symmetric_hash_join`. Independently, the per-sub-batch `send_time[partition].timer()` idiom calls `Instant::now()` twice per sub-batch (once at `timer()`, once at `done()`/drop). Replace with a single advancing `Instant` that tracks elapsed-since-last-partition, giving one `Instant::now()` per sub-batch while preserving the per-partition `send_time` metric. Also add a criterion microbench (`benches/repartition.rs`) that drives a 1M-row, 16-input / 16-output hash repartition, a round-robin variant, and a 16→1 coalesce. Before / after on the new bench (macOS ARM): hash_16_to_16 1.62 ms -> 1.55 ms (~ -2.5%) round_robin_16_to_16 303 us -> 268 us (~ -10%) hash_16_to_1_coalesce 932 us -> 917 us (~ -5%) The hash case is dominated by `BatchPartitioner` hashing itself, so the plumbing wins show up most clearly on the round-robin variant. All 41 existing `repartition` unit tests pass, including spill and memory-pool paths. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-perf-cheap-wins (b8fba56) to 43d32a8 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-perf-cheap-wins (b8fba56) to 43d32a8 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-perf-cheap-wins (b8fba56) to 43d32a8 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
RepartitionExec's per-batch hot path has two small but measurable sources of overhead that are easy to remove without any semantic change:SharedMemoryReservationalias isArc<Mutex<MemoryReservation>>, but every method onMemoryReservation(try_grow,shrink,try_resize, ...) already takes&selfand mutates anAtomicUsizeinternally. The outerMutexadds a lock acquire per batch on both the producer (pull_from_input'stry_grow) and the consumer (PerPartitionStream'sshrink) for no correctness benefit.send_time[partition].timer()idiom callsInstant::now()twice per sub-batch (once ontimer(), once ondone()/drop). With a 16-output hash repartition that is 32Instant::now()calls per input batch just for metrics.A new criterion microbench (
datafusion/physical-plan/benches/repartition.rs) was added to make both the current cost and the effect of the fix visible in isolation.What changes are included in this PR?
repartition/mod.rsSharedMemoryReservationfields (OutputChannel,PartitionChannels,PerPartitionStream) asArc<MemoryReservation>and drop the three.lock()callsites. A doc comment onOutputChannelexplains why the outer lock is unnecessary. TheSharedMemoryReservationalias incommon.rsis left alone — it is still used bysymmetric_hash_join.ScopedTimerGuardin thepull_from_inputinner loop with a single advancingInstant. OneInstant::now()per sub-batch, preserving the existing per-partitionsend_timemetric.benches/repartition.rs(new)Criterion microbench that drives 1 M rows through
RepartitionExecin three configurations: hash 16→16, round-robin 16→16, and hash 16→1 coalesce. Output partitions are drained concurrently so the measurement reflects real breaker behaviour (mpsc send, reservation lock, metric timers,SpawnedTaskper input).Are these changes tested?
Yes — all 41 tests in
datafusion/physical-plan/src/repartition/testscontinue to pass, including the spill, memory-pool, order-preservation, and early-shutdown cases. Clippy andcargo fmtare clean.Before / after on the new microbench (macOS ARM, 1 M rows):
hash_16_to_16round_robin_16_to_16hash_16_to_1_coalesceThe hash cases are dominated by
BatchPartitioner::partition_iter(hashing +take_arrays) itself, so the plumbing wins show up most clearly on the round-robin variant where hashing is not the bottleneck.Are there any user-facing changes?
No. Metrics keep the same shape (per-partition
send_time), no public API changes, no behavioural changes to spill / backpressure / ordering.🤖 Generated with Claude Code