perf: coalesce batches before sending to distributor channels in RepartitionExec#22010
perf: coalesce batches before sending to distributor channels in RepartitionExec#22010
Conversation
…butor channel, instead of after.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
| /// inside `inner` holds cloned `Arc`s pointing back at those shared | ||
| /// resources. | ||
| struct OutputChannels { | ||
| inner: HashMap<usize, OutputChannel>, |
There was a problem hiding this comment.
Probably no perf change but I think Vec<Option<OutputChannel>> is better
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
This should fix #20491 too! |
|
Ok, as it looks like it does what was intended, I'll open this for review and improve the PR description. |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
Today RepartitionExec emits one small batch per (input batch × non-empty output partition), then coalesces those small batches back into target-sized ones on the consumer side. That means the channel layer (memory accounting, gate, await suspensions) does work proportional to num_partitions per input batch, even though each small batch only carries batch_size / num_partitions rows. Moving the coalescing producer-side, before the gate, collapses the channel traffic and reduces sensitivity to high output fanout.
This gets much worst in https://github.com/datafusion-contrib/datafusion-distributed, which uses
RepartitionExecas the foundational backbone for network shuffles: it scales up theRepartitionExecoutput partitions to match P (partitions) * W (workers), where P is typically in the range of 12-24, and W can be from 1 to thousands. The fanout overhead that the currentRepartitionExecintroduces is pretty big in that setup.Ideally, the overhead of fanning many small
RecordBatchesshould be as small as possible.What changes are included in this PR?
Are these changes tested?
Yes, by existing tests
Are there any user-facing changes?
No