feat(push-scheduler): port PR #2226 push scheduler onto tokio + crossbeam#21741
feat(push-scheduler): port PR #2226 push scheduler onto tokio + crossbeam#21741Dandandan wants to merge 6 commits intoapache:mainfrom
Conversation
… crossbeam Introduces a new `datafusion-push-scheduler` crate that ports the experimental morsel-driven scheduler from apache#2226 onto plain tokio + crossbeam-deque (the original used rayon). Highlights: - `Pipeline` trait (push/close/output_partitions/poll_partition) and `Task` + `TaskWaker` dedup — verbatim from PR apache#2226's `task.rs`. - `WorkerPool`: N OS threads with a crossbeam `Injector` + per-worker `Worker` LIFO deque + thread-local FIFO side-queue, parked on a Condvar. Each worker enters a shared `tokio::runtime::Handle` so wrapped pull-based operators' `tokio::spawn` internals keep working. - `ExecutionPipeline` wraps the full `ExecutionPlan` as one push leaf for v1. `RepartitionPipeline` / `SortPipeline` are implemented but not yet wired into the planner — porting PR apache#2226's "Inbox" rewiring is the next step before breakers can be cut natively. - `--push-scheduler` flag wired into `dfbench` (`CommonOpt`) and plumbed through the ClickBench runner so benchmarks can switch execution paths per run. End-to-end SQL round-trip tests cover projection/filter, hash repartition, sort, top-k with limit, and group-by — all match the default path's output. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (695a749) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (695a749) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (695a749) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
…aker cuts Adds `InboxExec` — a leaf `ExecutionPlan` whose per-partition streams are fed by tokio mpsc channels that the scheduler fills from an upstream breaker's output. `ExecutionPipeline::with_depth(plan, ctx, depth)` now rewrites the wrapped plan tree: it recurses `depth` levels through single-child operators and replaces the bottom operator's children with `InboxExec` leaves. Pushes from the breaker above flow into those inboxes; the wrapped subtree's ordinary `plan.execute(p)` call pulls from them as if nothing changed. The planner is restored to PR apache#2226's cutting shape: * `RepartitionExec` / `CoalescePartitionsExec` → `RepartitionPipeline` * `SortExec` (non-merging) → `SortPipeline` Everything else (incl. `AggregateExec` in any mode and `HashJoinExec`) stays in an `ExecutionPipeline` that now correctly receives its input from an Inbox when it sits above a breaker. Aggregates parallelize across the partial→repartition→final chain without per-task serialization. Added a planner test confirming that a `GROUP BY` query is split into >=2 pipelines — i.e. that the Repartition cut is active. All existing result-equality tests continue to pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (75032e9) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (75032e9) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (75032e9) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Flip `CommonOpt::push_scheduler` to default `true`. Benchmark runs will now execute through the push-based scheduler unless explicitly disabled with `--push-scheduler=false`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (b854dec) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (b854dec) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (b854dec) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Two overhead-removal fixes targeting ClickBench regressions: 1. **Single-pipeline short-circuit.** `Scheduler::schedule` now walks the `ExecutionPlan` up front; if it contains no operator the planner would cut (no `RepartitionExec`, `CoalescePartitionsExec`, or cuttable `SortExec`), we bypass the planner, the task queue, and all output channels — returning the raw `plan.execute(p)` streams unchanged via a new `ExecutionResults::direct` path. This eliminates ~100% of scheduler overhead on simple scan/filter/project queries (ClickBench Q0-Q7), which previously paid channel + dispatch cost for zero parallelism gain. 2. **Replace tokio mpsc with parking_lot queue in `InboxExec`.** Our per-partition inbox is exactly single-producer / single-consumer. Swap `tokio::sync::mpsc::unbounded_channel` for a plain `Mutex<VecDeque<RecordBatch>>` + stored `Waker` — same shape as `RepartitionPipeline`'s output buffers. Removes tokio mpsc's per-send atomic ref counting and intrusive list management from the hot per-batch path. Push also skips wake on non-empty queue and skips all work once the consumer's stream has been dropped (cancel fast path). Tests (9 total, incl. the planner-shape assertion) all pass; clippy + fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (3f167cb) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (3f167cb) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (3f167cb) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
…lter pushdown Drops the native-push `SortPipeline` entirely. It had two problems: 1. It **silently disabled TopK dynamic filter pushdown**. In-tree `SortExec` (with `fetch=Some(K)`) publishes tightening bounds back to the scan through a shared `Arc<DynamicFilterPhysicalExpr>`. Our native implementation had no knowledge of this Arc — it just buffered input and sorted at close. Any plan where `SortPipeline` activated (single-partition sorts or `preserve_partitioning=true` multi-partition sorts, the exact shape the optimizer produces for top-K queries) lost pruning entirely. Likely a large contributor to ClickBench Q24–Q26's 2.2–4.6x slowdown. 2. It had no spill support, so it was also an OOM hazard on large in-memory sorts. `SortExec` now stays inside `ExecutionPipeline` in all cases — the default path's implementation handles dynamic pushdown, spilling, and streaming merges correctly. Deletes `pipelines/sort.rs`, removes the `SortExec` branch in `PipelinePlanner::visit_operator`, and drops the `SortExec` check in `Scheduler::schedule`'s `has_cut` so sort-bearing plans that have no other breakers take the zero-overhead direct `plan.execute(p)` path. All 9 tests still pass; clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (1817674) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (1817674) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (1817674) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
…s Q19 5.86x regression) The no-breaker short-circuit was merging multi-partition plans with `futures::stream::select_all`. That polls all partition streams from a single task — so partition reads were interleaved within one task frame instead of running concurrently in separate tokio tasks. The default `collect()` path goes through `datafusion_physical_plan::execute_stream`, which wraps multi-partition plans in `CoalescePartitionsExec` (backed by per-partition `SpawnedTask`s over bounded channels) for genuine concurrent reads. ClickBench Q19 (`SELECT UserID FROM hits WHERE UserID = ...` — a simple scan with filter, no breakers) regressed from parity to 5.86x slower because of this. Fix: `ExecutionResults::Direct` now stores the raw plan + context instead of pre-executed streams. `stream()` delegates to `execute_stream` (same as `collect()`) and `stream_partitioned()` delegates to `execute_stream_partitioned`. Short-circuit runs the same code path the default path runs. All 9 tests still pass; clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (01dcb48) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (01dcb48) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-scheduler (01dcb48) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Summary
Ports the experimental morsel-driven scheduler from #2226 (tustvold, 2022) onto plain
tokio+crossbeam-deque(the original usedrayon). Lives in a new workspace cratedatafusion-push-schedulerand is opt-in via a new--push-schedulerflag ondfbench.Pipelinetrait (push/close/output_partitions/poll_partition) andTask+TaskWakerdedup — verbatim port of PR Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) #2226'stask.rs.WorkerPool: N OS threads, crossbeamInjector+ per-worker LIFOWorkerdeque + thread-local FIFO side-queue (matches rayon's spawn_local / spawn_local_fifo semantics), parked on aCondvar. Each worker enters a sharedtokio::runtime::Handleso wrapped pull-based operators' internaltokio::spawncalls still target a real multi-thread runtime (this is the key difference from the existingmorsel-schedulerbranch which uses per-workercurrent_threadruntimes and deadlocks on nestedtokio::spawn).ExecutionPipelinewraps the entireExecutionPlanas one push leaf for v1.RepartitionPipeline/SortPipeline(hash + round-robin repartition, in-memory sort viasort_batch_chunked) are implemented with the correctPipelinesemantics but not yet wired into the planner — porting PR Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) #2226's "Inbox" rewiring (to splice scheduler push output into the wrapped subtree's leaf) is the next step before breaker cuts can be enabled.dfbenchintegration:CommonOptgains--push-scheduler; the ClickBench runner routes.collect()through the scheduler when the flag is set.RepartitionPipelinewakes downstream only on the empty→non-empty transition on its output buffers (avoids redundant atomic ops on the hot push path).spawn_localonto the owner's deque skips the peer-wake notification — preserves cache locality since the owner will LIFO-pop its own push on the next iteration.End-to-end SQL round-trip tests cover projection/filter, hash repartition, sort, top-k with limit, and group-by — all match the default path's output.
Test plan
cargo test -p datafusion-push-scheduler— 3 unit + 5 end-to-end tests passcargo clippy -p datafusion-push-scheduler --all-targets --all-features -- -D warningscargo clippy -p datafusion-benchmarks -- -D warningscargo fmt --all--push-schedulervs. default and compare wall-clockRepartitionPipeline/SortPipelinecan be cut at in the planner🤖 Generated with Claude Code