diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..1893e7793d64f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,6 +1811,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-proto", + "datafusion-push-scheduler", "env_logger", "futures", "libmimalloc-sys", @@ -2573,6 +2574,24 @@ dependencies = [ "log", ] +[[package]] +name = "datafusion-push-scheduler" +version = "53.0.0" +dependencies = [ + "ahash", + "arrow", + "crossbeam-deque", + "datafusion", + "datafusion-common", + "datafusion-execution", + "datafusion-physical-expr", + "datafusion-physical-plan", + "futures", + "log", + "parking_lot", + "tokio", +] + [[package]] name = "datafusion-session" version = "53.0.0" diff --git a/Cargo.toml b/Cargo.toml index 7e75bb59b68f2..ddeed3e0c8bfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "datafusion/physical-optimizer", "datafusion/pruning", "datafusion/physical-plan", + "datafusion/push-scheduler", "datafusion/proto", "datafusion/proto/gen", "datafusion/proto-common", @@ -146,6 +147,7 @@ datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "53.0.0", default-features = false } datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "53.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "53.0.0" } +datafusion-push-scheduler = { path = "datafusion/push-scheduler", version = "53.0.0" } datafusion-proto = { path = "datafusion/proto", version = "53.0.0" } datafusion-proto-common = { path = "datafusion/proto-common", version = "53.0.0" } datafusion-pruning = { path = "datafusion/pruning", version = "53.0.0" } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index f82f1c0a03e3d..eb0fc445eef3b 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -45,6 +45,7 @@ bytes = { workspace = true } clap = { version = "4.5.60", features = ["derive"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } +datafusion-push-scheduler = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } libmimalloc-sys = { version = "0.1", optional = true } diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 70aaeb7d2d192..8e41fd674ccba 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -19,7 +19,10 @@ use std::fs; use std::io::ErrorKind; use std::path::{Path, PathBuf}; -use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats}; +use crate::util::{ + BenchmarkRun, CommonOpt, QueryResult, collect_sql_via_push_scheduler, + print_memory_stats, +}; use clap::Args; use datafusion::logical_expr::{ExplainFormat, ExplainOption}; use datafusion::{ @@ -255,7 +258,11 @@ impl RunOpt { let mut query_results = vec![]; for i in 0..self.iterations() { let start = Instant::now(); - let results = ctx.sql(sql).await?.collect().await?; + let results = if self.common.push_scheduler { + collect_sql_via_push_scheduler(ctx, sql).await? + } else { + ctx.sql(sql).await?.collect().await? + }; let elapsed = start.elapsed(); let ms = elapsed.as_secs_f64() * 1000.0; millis.push(ms); diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index 6dc11c0f425bd..cab946442266a 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -19,8 +19,10 @@ pub mod latency_object_store; mod memory; mod options; +pub mod push_scheduler; mod run; pub use memory::print_memory_stats; pub use options::CommonOpt; +pub use push_scheduler::collect_sql_via_push_scheduler; pub use run::{BenchQuery, BenchmarkRun, QueryResult}; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a50a5268c0bfe..740d68bc002e3 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -70,6 +70,19 @@ pub struct CommonOpt { /// Adds random latency in the range 20-200ms to each object store operation. #[arg(long = "simulate-latency")] pub simulate_latency: bool, + + /// Execute queries via `datafusion-push-scheduler` — the push-based + /// morsel-driven scheduler ported from apache/datafusion#2226. On by + /// default; pass `--push-scheduler=false` to fall back to the + /// pull-based path. + #[arg( + long = "push-scheduler", + default_value_t = true, + num_args = 0..=1, + default_missing_value = "true", + action = clap::ArgAction::Set, + )] + pub push_scheduler: bool, } impl CommonOpt { @@ -190,6 +203,7 @@ mod tests { sort_spill_reservation_bytes: None, debug: false, simulate_latency: false, + push_scheduler: true, }; // With env var set, builder should succeed and have a memory pool diff --git a/benchmarks/src/util/push_scheduler.rs b/benchmarks/src/util/push_scheduler.rs new file mode 100644 index 0000000000000..0ba394697f474 --- /dev/null +++ b/benchmarks/src/util/push_scheduler.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark helpers for driving queries through +//! [`datafusion_push_scheduler::Scheduler`]. + +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::prelude::SessionContext; +use datafusion_push_scheduler::Scheduler; +use futures::StreamExt; + +/// Compile a SQL query via `ctx`, then execute the resulting physical plan +/// through [`Scheduler`] and collect all output batches. The worker count +/// is taken from the session's `target_partitions`, matching how the +/// default path fans out. +pub async fn collect_sql_via_push_scheduler( + ctx: &SessionContext, + sql: &str, +) -> Result> { + let df = ctx.sql(sql).await?; + let plan = df.create_physical_plan().await?; + let workers = ctx.state().config().target_partitions().max(1); + let scheduler = Scheduler::new(workers)?; + let mut stream = scheduler.schedule(plan, ctx.task_ctx())?.stream(); + + let mut out = Vec::new(); + while let Some(batch) = stream.next().await { + out.push(batch?); + } + Ok(out) +} diff --git a/datafusion/push-scheduler/Cargo.toml b/datafusion/push-scheduler/Cargo.toml new file mode 100644 index 0000000000000..4948bd87494cf --- /dev/null +++ b/datafusion/push-scheduler/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-push-scheduler" +description = "Push-based morsel scheduler for DataFusion (port of apache/datafusion#2226 onto plain tokio + crossbeam)" +keywords = ["arrow", "query", "sql", "scheduler", "morsel", "push"] +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_push_scheduler" + +[dependencies] +ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +arrow = { workspace = true } +crossbeam-deque = "0.8" +datafusion-common = { workspace = true, default-features = true } +datafusion-execution = { workspace = true, default-features = true } +datafusion-physical-expr = { workspace = true, default-features = true } +datafusion-physical-plan = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +parking_lot = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt", "sync"] } + +[dev-dependencies] +datafusion = { workspace = true, default-features = true } +tokio = { workspace = true, features = ["rt-multi-thread", "time", "macros"] } diff --git a/datafusion/push-scheduler/src/lib.rs b/datafusion/push-scheduler/src/lib.rs new file mode 100644 index 0000000000000..f16143df05eeb --- /dev/null +++ b/datafusion/push-scheduler/src/lib.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Push-based, morsel-driven scheduler for Apache DataFusion. +//! +//! This crate is a port of the experimental scheduler from +//! [apache/datafusion#2226](https://github.com/apache/datafusion/pull/2226) +//! onto plain `tokio` + `crossbeam-deque` (the original used `rayon`). +//! +//! # Architecture +//! +//! * A [`PipelinePlanner`] compiles an +//! [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan) into a flat +//! [`PipelinePlan`] — a vector of [`RoutablePipeline`]s plus their +//! [`OutputLink`]s. Linear chains of pull-based operators are grouped +//! into an [`ExecutionPipeline`](pipelines::execution::ExecutionPipeline); +//! `RepartitionExec`, `CoalescePartitionsExec`, `SortExec`, and partial +//! `AggregateExec` become dedicated breaker pipelines. +//! +//! * A [`Scheduler`] owns a pool of worker OS threads, each running a tokio +//! `current_thread` runtime + `LocalSet` and pulling tasks from a +//! `crossbeam_deque` work-stealing queue. Idle workers steal from peers; +//! external submissions go through a shared `Injector`. +//! +//! * A `Task = (pipeline_idx, partition)` is scheduled per output partition. +//! Each worker iteration calls +//! [`Pipeline::poll_partition`](pipeline::Pipeline::poll_partition) on the +//! task's pipeline and routes the result to the downstream pipeline's +//! `push` / `close`. `Poll::Pending` parks the task on a +//! [`ArcWake`](futures::task::ArcWake) that re-enqueues it when the +//! underlying future wakes. +//! +//! # Entry point +//! +//! ```ignore +//! use std::sync::Arc; +//! use datafusion_push_scheduler::Scheduler; +//! +//! # async fn run(plan: std::sync::Arc, +//! # ctx: std::sync::Arc) +//! # -> datafusion_common::Result<()> { +//! let scheduler = Scheduler::new(num_cpus::get()); +//! let results = scheduler.schedule(plan, ctx)?; +//! let mut stream = results.stream(); +//! while let Some(batch) = futures::StreamExt::next(&mut stream).await { +//! let _ = batch?; +//! } +//! # Ok(()) } +//! ``` + +#![deny(clippy::clone_on_ref_ptr)] + +pub mod pipeline; +pub mod pipelines; +pub mod plan; +pub mod scheduler; +pub mod task; +pub mod worker_pool; + +pub use pipeline::Pipeline; +pub use pipelines::execution::ExecutionPipeline; +pub use pipelines::repartition::RepartitionPipeline; +pub use plan::{OutputLink, PipelinePlan, PipelinePlanner, RoutablePipeline}; +pub use scheduler::Scheduler; +pub use task::{ExecutionResults, spawn_plan}; diff --git a/datafusion/push-scheduler/src/pipeline.rs b/datafusion/push-scheduler/src/pipeline.rs new file mode 100644 index 0000000000000..c86f03e865db7 --- /dev/null +++ b/datafusion/push-scheduler/src/pipeline.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Push-based [`Pipeline`] trait. Ported verbatim from PR apache/datafusion#2226. + +use std::fmt::Debug; +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; + +/// A push-based operator. The scheduler drives progress by polling one of the +/// pipeline's output partitions via [`Pipeline::poll_partition`]; when a batch +/// is produced the scheduler routes it to the next pipeline's +/// [`Pipeline::push`]. End-of-input is signalled by [`Pipeline::close`]. +/// +/// The split between eager (push-time) and lazy (poll-time) work is entirely +/// the pipeline's choice — e.g. a repartition breaker does most work in +/// `push`, while a streaming adapter forwards from a wrapped +/// `SendableRecordBatchStream` inside `poll_partition`. +pub trait Pipeline: Debug + Send + Sync { + /// Push a batch into `child` of the pipeline for the given `partition`. + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>; + + /// Signal that `child` / `partition` will receive no further batches. + fn close(&self, child: usize, partition: usize); + + /// Number of output partitions this pipeline produces. + fn output_partitions(&self) -> usize; + + /// Attempt to drive progress for one output partition. + /// + /// * `Poll::Ready(Some(Ok(batch)))` — next batch for the partition. + /// * `Poll::Ready(Some(Err(e)))` — query failed. + /// * `Poll::Ready(None)` — partition exhausted. + /// * `Poll::Pending` — pipeline has stored `cx.waker()` and will wake + /// the scheduler when it can make progress. + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>>; +} diff --git a/datafusion/push-scheduler/src/pipelines/execution.rs b/datafusion/push-scheduler/src/pipelines/execution.rs new file mode 100644 index 0000000000000..2b3b8ec82970d --- /dev/null +++ b/datafusion/push-scheduler/src/pipelines/execution.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExecutionPipeline`] — adapter that wraps a subtree of pull-based +//! [`ExecutionPlan`]s and exposes it as a push-based [`Pipeline`]. +//! +//! If `depth` is `Some(n)` the wrapped plan is rewritten so that the +//! operator `n` levels below the root has each of its children replaced +//! by an [`InboxExec`](super::inbox::InboxExec). The scheduler then +//! pushes batches from the upstream breaker into those inboxes via +//! [`Pipeline::push`], and the wrapped subtree's ordinary +//! `plan.execute(partition)` call pulls from them as if the breaker's +//! output had arrived pull-based. +//! +//! When `depth` is `None` (or the wrapped plan is a true leaf at +//! `depth=0`), no rewiring is performed and the pipeline is a leaf +//! adapter — `push` / `close` are unreachable. + +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_common::error::DataFusionError; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use futures::StreamExt; +use parking_lot::Mutex; + +use super::inbox::{InboxExec, InboxSendGroup}; +use crate::pipeline::Pipeline; + +pub struct ExecutionPipeline { + plan: Arc, + task_context: Arc, + output_partitions: usize, + /// One slot per output partition. Lazily populated on first + /// `poll_partition`. Wrapped in `Mutex` because `poll_partition` + /// takes `&self`. + streams: Vec>, + + /// Inbox senders, one group per original child of the rewiring + /// point. Empty when no rewiring was performed (true leaf adapter). + inboxes: Vec, +} + +enum PartitionState { + /// Not yet fetched `plan.execute(p)`. + Fresh, + /// Stream is live. + Running(SendableRecordBatchStream), + /// Stream reached EOS or errored — emit `Ready(None)` forever. + Done, +} + +impl std::fmt::Debug for ExecutionPipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutionPipeline") + .field("plan", &self.plan) + .field("output_partitions", &self.output_partitions) + .field("inbox_children", &self.inboxes.len()) + .finish() + } +} + +impl ExecutionPipeline { + /// Wrap `plan` with no rewiring. Use for true-leaf ExecutionPipelines + /// (whole plan, or below a breaker-free subtree) where no breaker + /// output needs to be injected. + pub fn new( + plan: Arc, + task_context: Arc, + ) -> Result { + Self::with_depth(plan, task_context, None) + } + + /// Wrap `plan`, optionally rewriting the operator `depth` levels + /// below the root so that each of its children is replaced with an + /// [`InboxExec`]. Used by [`PipelinePlanner`](crate::plan::PipelinePlanner) + /// when the group sits above a breaker cut. + pub fn with_depth( + plan: Arc, + task_context: Arc, + depth: Option, + ) -> Result { + let (plan, inboxes) = match depth { + Some(d) => rewrite_with_inboxes(plan, d)?, + None => (plan, Vec::new()), + }; + let output_partitions = plan.output_partitioning().partition_count(); + let streams = (0..output_partitions) + .map(|_| Mutex::new(PartitionState::Fresh)) + .collect(); + Ok(Self { + plan, + task_context, + output_partitions, + streams, + inboxes, + }) + } + + /// Number of `(child_idx, partition)` input slots this pipeline + /// expects from the scheduler — 0 for leaf adapters, `Σ partitions` + /// for Inbox-rewired pipelines. + pub fn input_children(&self) -> usize { + self.inboxes.len() + } +} + +/// Rewrite `plan` so that the operator `depth` levels below its root +/// has every child replaced with an [`InboxExec`]. Recurses through +/// single-child operators; panics if a non-single-child node is +/// encountered before `depth == 0` (the planner guarantees this invariant). +fn rewrite_with_inboxes( + plan: Arc, + depth: usize, +) -> Result<(Arc, Vec)> { + if depth == 0 { + let children = plan.children(); + if children.is_empty() { + // True leaf at depth 0 — no breaker below; nothing to rewire. + return Ok((plan, Vec::new())); + } + let mut new_children = Vec::with_capacity(children.len()); + let mut groups = Vec::with_capacity(children.len()); + for child in children { + let schema = child.schema(); + let props = Arc::clone(child.properties()); + let (inbox, group) = InboxExec::new(schema, props); + new_children.push(inbox as Arc); + groups.push(group); + } + let new_plan = plan.with_new_children(new_children)?; + return Ok((new_plan, groups)); + } + + let children = plan.children(); + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "rewrite_with_inboxes: depth={depth} but node has {} children \ + (planner should have flushed at a multi-child node)", + children.len() + ))); + } + let child = Arc::clone(children[0]); + let (new_child, groups) = rewrite_with_inboxes(child, depth - 1)?; + let new_plan = plan.with_new_children(vec![new_child])?; + Ok((new_plan, groups)) +} + +impl Pipeline for ExecutionPipeline { + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> { + if child >= self.inboxes.len() { + return Err(DataFusionError::Internal(format!( + "ExecutionPipeline::push for child {child} but only {} inboxes exist", + self.inboxes.len() + ))); + } + self.inboxes[child].push(partition, input) + } + + fn close(&self, child: usize, partition: usize) { + if let Some(group) = self.inboxes.get(child) { + group.close(partition); + } else { + log::error!( + "ExecutionPipeline::close for child {child} but only {} inboxes exist", + self.inboxes.len() + ); + } + } + + fn output_partitions(&self) -> usize { + self.output_partitions + } + + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>> { + let mut state = self.streams[partition].lock(); + loop { + match &mut *state { + PartitionState::Fresh => { + match self.plan.execute(partition, Arc::clone(&self.task_context)) { + Ok(stream) => *state = PartitionState::Running(stream), + Err(e) => { + *state = PartitionState::Done; + return Poll::Ready(Some(Err(e))); + } + } + } + PartitionState::Running(stream) => { + return match stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), + Poll::Ready(Some(Err(e))) => { + *state = PartitionState::Done; + Poll::Ready(Some(Err(e))) + } + Poll::Ready(None) => { + *state = PartitionState::Done; + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + }; + } + PartitionState::Done => return Poll::Ready(None), + } + } + } +} diff --git a/datafusion/push-scheduler/src/pipelines/inbox.rs b/datafusion/push-scheduler/src/pipelines/inbox.rs new file mode 100644 index 0000000000000..98894c0593d16 --- /dev/null +++ b/datafusion/push-scheduler/src/pipelines/inbox.rs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InboxExec`] — leaf `ExecutionPlan` whose per-partition streams are +//! fed by an upstream breaker's output via [`ExecutionPipeline::push`]. +//! +//! PR apache/datafusion#2226 calls these "inboxes". When +//! [`PipelinePlanner`](crate::plan::PipelinePlanner) cuts at a breaker, +//! the surrounding pull-based subtree is rewritten to replace the breaker +//! position (or each of a multi-child node's children) with an +//! `InboxExec`. The breaker's output is then *pushed* into the +//! corresponding inbox; the wrapped subtree's ordinary +//! `plan.execute(partition)` call pulls from the inbox stream as if +//! nothing had changed. +//! +//! The per-partition channel is a plain [`Mutex>`] +//! plus a stored [`Waker`] — not a `tokio::sync::mpsc`. For our exactly +//! single-producer / single-consumer use case this is materially cheaper +//! than tokio mpsc (no atomic ref counting on every send, no linked-list +//! management). + +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Context, Poll, Waker}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_common::error::DataFusionError; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; +use futures::Stream; +use parking_lot::Mutex; + +/// Per-partition state shared between the push side (scheduler) and the +/// pull side (wrapped `plan.execute(p)` stream). +struct InboxPartition { + queue: Mutex>, + waker: Mutex>, + closed: AtomicBool, + /// Set to true once the stream's `poll_next` sees an EOS so we don't + /// have to keep touching the closed atomic after we've observed it. + drained: AtomicBool, +} + +impl InboxPartition { + fn new() -> Self { + Self { + queue: Mutex::new(VecDeque::new()), + waker: Mutex::new(None), + closed: AtomicBool::new(false), + drained: AtomicBool::new(false), + } + } + + #[inline] + fn wake(&self) { + let waker = self.waker.lock().take(); + if let Some(w) = waker { + w.wake(); + } + } +} + +/// Sender side of an [`InboxExec`]'s per-partition queues. One +/// `InboxSendGroup` corresponds to one child of the surrounding +/// `ExecutionPipeline`'s rewritten plan. +#[derive(Debug)] +pub(crate) struct InboxSendGroup { + partitions: Vec>, +} + +impl std::fmt::Debug for InboxPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InboxPartition") + .field("queue_len", &self.queue.lock().len()) + .field("closed", &self.closed.load(Ordering::Relaxed)) + .finish() + } +} + +impl InboxSendGroup { + /// Push a batch into `partition`. Wakes the consumer only on the + /// empty→non-empty transition. + pub(crate) fn push(&self, partition: usize, batch: RecordBatch) -> Result<()> { + let p = &self.partitions[partition]; + if p.drained.load(Ordering::Relaxed) { + // Consumer dropped its stream (query cancelled). Nothing to do. + return Ok(()); + } + let was_empty = { + let mut q = p.queue.lock(); + let was_empty = q.is_empty(); + q.push_back(batch); + was_empty + }; + if was_empty { + p.wake(); + } + Ok(()) + } + + /// Close the sender for `partition` — the consumer's stream will + /// yield `Ready(None)` once its queue is drained. + pub(crate) fn close(&self, partition: usize) { + let p = &self.partitions[partition]; + p.closed.store(true, Ordering::SeqCst); + p.wake(); + } +} + +/// Leaf `ExecutionPlan` backed by N push-side queues — one per output +/// partition. `execute(p, _)` claims the stream for partition `p`; a +/// given partition may only be executed once. +pub struct InboxExec { + schema: SchemaRef, + properties: Arc, + partitions: Vec>, + executed: Mutex>, +} + +impl std::fmt::Debug for InboxExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InboxExec") + .field("partitions", &self.partitions.len()) + .finish() + } +} + +impl InboxExec { + /// Build a new inbox matching `properties` (schema, partitioning, + /// ordering, emission, boundedness preserved). Returns the + /// `ExecutionPlan` wrapper and the sender group keyed by partition. + pub(crate) fn new( + schema: SchemaRef, + properties: Arc, + ) -> (Arc, InboxSendGroup) { + let n = properties.partitioning.partition_count(); + let partitions: Vec<_> = + (0..n).map(|_| Arc::new(InboxPartition::new())).collect(); + let sender_handles = partitions.iter().map(Arc::clone).collect(); + let inbox = Arc::new(Self { + schema, + properties, + partitions, + executed: Mutex::new(vec![false; n]), + }); + let group = InboxSendGroup { + partitions: sender_handles, + }; + (inbox, group) + } +} + +impl DisplayAs for InboxExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "InboxExec(partitions={})", self.partitions.len()) + } +} + +impl ExecutionPlan for InboxExec { + fn name(&self) -> &'static str { + "InboxExec" + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut( + &dyn PhysicalExpr, + ) + -> Result, + ) -> Result { + Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue) + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if !children.is_empty() { + return Err(DataFusionError::Internal( + "InboxExec does not accept children".to_string(), + )); + } + Ok(self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + { + let mut executed = self.executed.lock(); + if executed[partition] { + return Err(DataFusionError::Internal(format!( + "InboxExec partition {partition} executed twice", + ))); + } + executed[partition] = true; + } + Ok(Box::pin(InboxStream { + schema: Arc::clone(&self.schema), + state: Arc::clone(&self.partitions[partition]), + })) + } +} + +struct InboxStream { + schema: SchemaRef, + state: Arc, +} + +impl Drop for InboxStream { + fn drop(&mut self) { + // Tell the producer it can stop pushing on this partition. + self.state.drained.store(true, Ordering::Relaxed); + } +} + +impl Stream for InboxStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Fast path: take a batch if one is queued. + { + let mut q = self.state.queue.lock(); + if let Some(batch) = q.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + } + // Closed? Drain done. + if self.state.closed.load(Ordering::SeqCst) { + // Race: a push could have raced between our pop attempt and + // this check. Re-check the queue. + let mut q = self.state.queue.lock(); + if let Some(batch) = q.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + return Poll::Ready(None); + } + // Park. Ordering matters: install waker FIRST, then recheck the + // queue + closed flag to avoid losing a wake that slipped in + // between the initial check and waker install. + *self.state.waker.lock() = Some(cx.waker().clone()); + { + let mut q = self.state.queue.lock(); + if let Some(batch) = q.pop_front() { + // Wake-up from our own installed waker is harmless; drop + // it explicitly to avoid a redundant re-poll. + let _ = self.state.waker.lock().take(); + return Poll::Ready(Some(Ok(batch))); + } + } + if self.state.closed.load(Ordering::SeqCst) { + let _ = self.state.waker.lock().take(); + return Poll::Ready(None); + } + Poll::Pending + } +} + +impl RecordBatchStream for InboxStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/push-scheduler/src/pipelines/mod.rs b/datafusion/push-scheduler/src/pipelines/mod.rs new file mode 100644 index 0000000000000..e6434444da190 --- /dev/null +++ b/datafusion/push-scheduler/src/pipelines/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Concrete [`Pipeline`](crate::pipeline::Pipeline) implementations. + +pub mod execution; +pub mod inbox; +pub mod repartition; diff --git a/datafusion/push-scheduler/src/pipelines/repartition.rs b/datafusion/push-scheduler/src/pipelines/repartition.rs new file mode 100644 index 0000000000000..3efc4cf36c9db --- /dev/null +++ b/datafusion/push-scheduler/src/pipelines/repartition.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RepartitionPipeline`] — N-input / M-output breaker implementing +//! hash or round-robin repartitioning. Reused for `CoalescePartitionsExec` +//! by constructing with `RoundRobinBatch(1)` output. + +use std::collections::VecDeque; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll, Waker}; + +use arrow::record_batch::RecordBatch; +use datafusion_common::error::DataFusionError; +use datafusion_common::{Result, not_impl_err}; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_plan::metrics::Time; +use datafusion_physical_plan::repartition::BatchPartitioner; +use parking_lot::Mutex; + +use crate::pipeline::Pipeline; + +pub struct RepartitionPipeline { + /// Number of input partitions ( = number of children = 1, with N + /// partitions on that single child). + input_partitions: usize, + /// Number of output partitions. + output_partitions: usize, + + /// One partitioner per input partition. Protected by `Mutex` because + /// the scheduler's Task owns the input and we only need interior + /// mutability for the partitioner's internal buffers. + partitioners: Vec>, + + /// Shared state for each output partition. + outputs: Vec>, + + /// Number of input partitions that have not yet sent `close`. When it + /// drops to zero every output flips to "closed" and emits `None`. + open_inputs: AtomicUsize, +} + +impl std::fmt::Debug for RepartitionPipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RepartitionPipeline") + .field("input_partitions", &self.input_partitions) + .field("output_partitions", &self.output_partitions) + .finish() + } +} + +struct OutputBuffer { + queue: VecDeque, + waker: Option, + closed: bool, +} + +impl OutputBuffer { + fn new() -> Self { + Self { + queue: VecDeque::new(), + waker: None, + closed: false, + } + } +} + +impl RepartitionPipeline { + pub fn try_new(input: &Partitioning, output: &Partitioning) -> Result { + let input_partitions = input.partition_count(); + let output_partitions = output.partition_count(); + if input_partitions == 0 { + return Err(DataFusionError::Internal( + "RepartitionPipeline requires at least one input partition".to_string(), + )); + } + if output_partitions == 0 { + return Err(DataFusionError::Internal( + "RepartitionPipeline requires at least one output partition".to_string(), + )); + } + + // Refuse partitioning schemes BatchPartitioner can't handle. + match output { + Partitioning::Hash(..) | Partitioning::RoundRobinBatch(_) => {} + other => { + return not_impl_err!( + "RepartitionPipeline does not support output partitioning {other:?}" + ); + } + } + + let mut partitioners = Vec::with_capacity(input_partitions); + for i in 0..input_partitions { + let p = BatchPartitioner::try_new( + output.clone(), + Time::default(), + i, + input_partitions, + )?; + partitioners.push(Mutex::new(p)); + } + + let outputs = (0..output_partitions) + .map(|_| Mutex::new(OutputBuffer::new())) + .collect(); + + Ok(Self { + input_partitions, + output_partitions, + partitioners, + outputs, + open_inputs: AtomicUsize::new(input_partitions), + }) + } +} + +impl Pipeline for RepartitionPipeline { + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> { + debug_assert_eq!(child, 0, "RepartitionPipeline has a single input child"); + debug_assert!( + partition < self.input_partitions, + "input partition out of range" + ); + + // Partition the batch — need to be careful with the partitioner + // lock: BatchPartitioner::partition_iter borrows `&mut self`, so + // we collect results into a Vec before pushing into outputs. + let outputs_to_push: Vec<(usize, RecordBatch)> = { + let mut p = self.partitioners[partition].lock(); + let iter = p.partition_iter(input)?; + iter.collect::>>()? + }; + + for (out_p, batch) in outputs_to_push { + // Only wake the downstream task on the empty→non-empty + // transition. The Task's wake_count coalesces redundant wakes + // anyway, but skipping them here saves the atomic op cost on + // the hot push path. + let waker = { + let mut o = self.outputs[out_p].lock(); + let was_empty = o.queue.is_empty(); + o.queue.push_back(batch); + if was_empty { o.waker.take() } else { None } + }; + if let Some(w) = waker { + w.wake(); + } + } + + Ok(()) + } + + fn close(&self, child: usize, partition: usize) { + debug_assert_eq!(child, 0, "RepartitionPipeline has a single input child"); + debug_assert!(partition < self.input_partitions); + + let prev = self.open_inputs.fetch_sub(1, Ordering::SeqCst); + if prev == 1 { + // Last input closed — close every output and wake pending + // pollers. + for out in &self.outputs { + let waker = { + let mut o = out.lock(); + o.closed = true; + o.waker.take() + }; + if let Some(w) = waker { + w.wake(); + } + } + } + } + + fn output_partitions(&self) -> usize { + self.output_partitions + } + + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>> { + debug_assert!(partition < self.output_partitions); + let mut o = self.outputs[partition].lock(); + if let Some(batch) = o.queue.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + if o.closed { + return Poll::Ready(None); + } + // Stash the waker; replace any prior one since the latest call's + // context represents the current task execution. + o.waker = Some(cx.waker().clone()); + Poll::Pending + } +} diff --git a/datafusion/push-scheduler/src/plan.rs b/datafusion/push-scheduler/src/plan.rs new file mode 100644 index 0000000000000..40fdb00d31bb7 --- /dev/null +++ b/datafusion/push-scheduler/src/plan.rs @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Planner that lowers an [`ExecutionPlan`] into a flat [`PipelinePlan`]. +//! +//! Follows PR apache/datafusion#2226 `plan.rs`: a depth-first walk that +//! groups runs of pull-based operators into a single +//! [`ExecutionPipeline`](crate::pipelines::execution::ExecutionPipeline) +//! and cuts at [`RepartitionExec`] and [`CoalescePartitionsExec`] — +//! replacing them with [`RepartitionPipeline`](crate::pipelines::repartition::RepartitionPipeline). +//! +//! At each cut the surrounding `ExecutionPipeline` is rewritten so the +//! breaker's position in the plan tree becomes an +//! [`InboxExec`](crate::pipelines::inbox::InboxExec). The scheduler +//! pushes the breaker's output into those inboxes; the wrapped subtree +//! pulls from them as usual. +//! +//! `SortExec` is intentionally **not** cut. The in-tree `SortExec` +//! participates in TopK dynamic-filter pushdown (sort bounds get +//! published back to the scan through a shared +//! `Arc`), which a native push +//! implementation would silently drop. Keeping `SortExec` inside an +//! `ExecutionPipeline` preserves that pushdown and the default path's +//! spill support. + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; + +use crate::pipeline::Pipeline; +use crate::pipelines::execution::ExecutionPipeline; +use crate::pipelines::repartition::RepartitionPipeline; + +/// Points a pipeline's output at a specific input of another pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OutputLink { + /// Index of the consuming pipeline in [`PipelinePlan::pipelines`]. + pub pipeline: usize, + /// Which input (child) of the consuming pipeline to push into. + pub child: usize, +} + +/// A [`Pipeline`] paired with an [`OutputLink`] describing where to send +/// its output. `output == None` means the pipeline feeds the query's final +/// result stream. +pub struct RoutablePipeline { + pub pipeline: Box, + pub output: Option, +} + +impl std::fmt::Debug for RoutablePipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RoutablePipeline") + .field("pipeline", &self.pipeline) + .field("output", &self.output) + .finish() + } +} + +/// Scheduler-facing representation of a compiled query: a flat list of +/// pipelines with routing info. +#[derive(Debug)] +pub struct PipelinePlan { + pub schema: SchemaRef, + pub output_partitions: usize, + pub pipelines: Vec, +} + +/// Accumulator for an `ExecutionPipeline` under construction. +struct OperatorGroup { + /// Where the eventual ExecutionPipeline's output should be routed. + output: Option, + /// Topmost operator in the group (the one that will be wrapped). + root: Arc, + /// Number of levels we have descended past `root` into its + /// single-child chain. When flushed, the breaker (or multi-child + /// node) at `depth` levels below `root` has its children replaced + /// with [`InboxExec`](crate::pipelines::inbox::InboxExec). + depth: usize, +} + +/// Lowers an [`ExecutionPlan`] into a [`PipelinePlan`]. +pub struct PipelinePlanner { + task_context: Arc, + schema: SchemaRef, + output_partitions: usize, + completed: Vec, + /// DFS stack of nodes still to visit, paired with the output link + /// their eventual pipeline should route to. + to_visit: Vec<(Arc, Option)>, + /// Current in-progress `ExecutionPipeline` group, if any. + execution_operators: Option, +} + +impl PipelinePlanner { + pub fn new(plan: Arc, task_context: Arc) -> Self { + let schema = plan.schema(); + let output_partitions = plan.output_partitioning().partition_count(); + Self { + completed: vec![], + to_visit: vec![(plan, None)], + task_context, + execution_operators: None, + schema, + output_partitions, + } + } + + /// Flush the in-progress group into a new `ExecutionPipeline` (with + /// Inbox rewiring at `group.depth`) and return its index in + /// `completed`. + fn flush_exec(&mut self) -> Result { + let group = self.execution_operators.take().unwrap(); + let node_idx = self.completed.len(); + let pipeline = ExecutionPipeline::with_depth( + group.root, + Arc::clone(&self.task_context), + Some(group.depth), + )?; + self.completed.push(RoutablePipeline { + pipeline: Box::new(pipeline), + output: group.output, + }); + Ok(node_idx) + } + + fn visit_exec( + &mut self, + plan: &Arc, + parent: Option, + ) -> Result<()> { + let children = plan.children(); + + match self.execution_operators.as_mut() { + Some(group) => { + debug_assert_eq!( + parent, group.output, + "PipelinePlanner: operator group's output link diverged" + ); + group.depth += 1; + } + None => { + self.execution_operators = Some(OperatorGroup { + output: parent, + root: Arc::clone(plan), + depth: 0, + }); + } + } + + match children.len() { + 1 => { + // Continue DFS into the single child, keeping the same + // parent output link — the child joins the current group. + self.to_visit.push((Arc::clone(children[0]), parent)); + } + _ => { + // Leaf (0 children) or multi-child node — flush the + // group here. Multi-child nodes have each child wired + // into its own pipeline feeding this one's inboxes. + let node = self.flush_exec()?; + self.enqueue_children( + children.into_iter().cloned().collect::>(), + node, + ); + } + } + + Ok(()) + } + + fn enqueue_children( + &mut self, + children: Vec>, + parent_node_idx: usize, + ) { + for (child_idx, child) in children.into_iter().enumerate() { + self.to_visit.push(( + child, + Some(OutputLink { + pipeline: parent_node_idx, + child: child_idx, + }), + )); + } + } + + /// Push a new `RoutablePipeline` and enqueue its children. + fn push_pipeline( + &mut self, + node: RoutablePipeline, + children: Vec>, + ) { + let node_idx = self.completed.len(); + self.completed.push(node); + self.enqueue_children(children, node_idx); + } + + /// Flush any in-progress group, then push `pipeline` as a new + /// breaker. The breaker's `parent` routing link is rewritten to + /// point at the flushed group's pipeline (child 0 — single-child + /// groups only) when a group was in progress. + fn push_breaker( + &mut self, + pipeline: Box, + parent: Option, + children: Vec>, + ) -> Result<()> { + let parent = match &self.execution_operators { + Some(group) => { + debug_assert_eq!( + group.output, parent, + "PipelinePlanner: breaker's parent diverged from group's output" + ); + Some(OutputLink { + pipeline: self.flush_exec()?, + child: 0, + }) + } + None => parent, + }; + self.push_pipeline( + RoutablePipeline { + pipeline, + output: parent, + }, + children, + ); + Ok(()) + } + + fn visit_operator( + &mut self, + plan: &Arc, + parent: Option, + ) -> Result<()> { + if let Some(repartition) = plan.downcast_ref::() { + let input_partitioning = repartition.input().output_partitioning().clone(); + let output_partitioning = repartition.partitioning().clone(); + let pipeline = Box::new(RepartitionPipeline::try_new( + &input_partitioning, + &output_partitioning, + )?); + self.push_breaker( + pipeline, + parent, + plan.children().into_iter().cloned().collect(), + ) + } else if let Some(coalesce) = plan.downcast_ref::() { + let input_partitioning = coalesce.input().output_partitioning().clone(); + let pipeline = Box::new(RepartitionPipeline::try_new( + &input_partitioning, + &Partitioning::RoundRobinBatch(1), + )?); + self.push_breaker( + pipeline, + parent, + plan.children().into_iter().cloned().collect(), + ) + } else { + self.visit_exec(plan, parent) + } + } + + /// Run the DFS walk and produce the final [`PipelinePlan`]. + pub fn build(mut self) -> Result { + while let Some((plan, parent)) = self.to_visit.pop() { + self.visit_operator(&plan, parent)?; + } + if self.execution_operators.is_some() { + self.flush_exec()?; + } + Ok(PipelinePlan { + schema: self.schema, + output_partitions: self.output_partitions, + pipelines: self.completed, + }) + } +} diff --git a/datafusion/push-scheduler/src/scheduler.rs b/datafusion/push-scheduler/src/scheduler.rs new file mode 100644 index 0000000000000..ec0bee3dbfcc7 --- /dev/null +++ b/datafusion/push-scheduler/src/scheduler.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Public [`Scheduler`] entry point and task-submission primitives used by +//! [`task`](crate::task). + +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::ExecutionPlan; +use tokio::runtime::Handle; + +use crate::plan::{PipelinePlan, PipelinePlanner}; +use crate::task::{ExecutionResults, Task, spawn_plan}; +use crate::worker_pool::{self, WorkerPool}; + +/// Handle for submitting [`Task`]s to the worker pool. Cheap to clone. +#[derive(Clone)] +pub struct Spawner { + pool: Arc, +} + +impl std::fmt::Debug for Spawner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Spawner") + .field("workers", &self.pool.worker_count()) + .finish() + } +} + +impl Spawner { + pub(crate) fn new(pool: Arc) -> Self { + Self { pool } + } + + /// Submit a [`Task`] from outside a worker thread. Goes through the + /// shared injector; any idle worker may pick it up. + pub fn spawn(&self, task: Task) { + self.pool.spawn(Box::new(move || task.do_work())); + } + + /// Pool id — matches the id stored in each worker's thread-local. + pub fn pool_id(&self) -> usize { + self.pool.id() + } +} + +/// Push the task onto the current worker's LIFO deque. Used by the task +/// waker when woken from inside a worker thread. +pub(crate) fn spawn_local(task: Task) { + worker_pool::spawn_local(Box::new(move || task.do_work())); +} + +/// Push the task onto the current worker's FIFO side-queue. Used by +/// [`Task::do_work`] to reschedule itself *after* routing an output batch. +pub(crate) fn spawn_local_fifo(task: Task) { + worker_pool::spawn_local_fifo(Box::new(move || task.do_work())); +} + +/// Re-export for convenience. +pub use crate::worker_pool::is_worker; + +/// Returns `true` if `plan`'s tree contains any operator the planner +/// would cut into a separate pipeline. Mirrors the predicates in +/// [`PipelinePlanner::visit_operator`](crate::plan::PipelinePlanner). +fn has_cut(plan: &Arc) -> bool { + if plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + { + return true; + } + plan.children().iter().any(|c| has_cut(c)) +} + +/// Public, high-level scheduler handle. +/// +/// Owns a [`WorkerPool`]. Queries are submitted via [`Scheduler::schedule`] +/// (from a raw `ExecutionPlan`) or [`Scheduler::schedule_plan`] (from a +/// pre-built [`PipelinePlan`]). +pub struct Scheduler { + pool: Arc, +} + +impl Scheduler { + /// Build a scheduler backed by `worker_threads` OS threads, attached + /// to the **current** tokio runtime. MUST be called from within a + /// tokio runtime context — otherwise use [`Scheduler::with_handle`]. + pub fn new(worker_threads: usize) -> Result { + let handle = Handle::current(); + Self::with_handle(worker_threads, handle) + } + + /// Build a scheduler backed by `worker_threads` OS threads, attached + /// to the given tokio runtime handle. + pub fn with_handle(worker_threads: usize, handle: Handle) -> Result { + let pool = WorkerPool::new(worker_threads, handle)?; + Ok(Self { pool }) + } + + /// Number of worker threads backing this scheduler. + pub fn worker_count(&self) -> usize { + self.pool.worker_count() + } + + /// Compile and schedule an [`ExecutionPlan`]. + /// + /// If the plan contains no breakers (`RepartitionExec` or + /// `CoalescePartitionsExec`) the scheduler short-circuits and + /// returns the raw `plan.execute(p)` streams unchanged — no task + /// queue, no channels, no worker dispatch. This avoids pure-overhead + /// runs on simple scan/filter/project queries where cutting buys + /// nothing. + pub fn schedule( + &self, + plan: Arc, + context: Arc, + ) -> Result { + if !has_cut(&plan) { + let schema = plan.schema(); + return Ok(ExecutionResults::direct(schema, plan, context)); + } + let pipeline_plan = PipelinePlanner::new(plan, context).build()?; + Ok(self.schedule_plan(pipeline_plan)) + } + + /// Schedule a pre-built [`PipelinePlan`]. + pub fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults { + let spawner = Spawner::new(Arc::clone(&self.pool)); + spawn_plan(plan, spawner) + } + + /// Handle to the scheduler's spawner — useful for tests that want to + /// submit raw [`Task`]s. + pub fn spawner(&self) -> Spawner { + Spawner::new(Arc::clone(&self.pool)) + } +} diff --git a/datafusion/push-scheduler/src/task.rs b/datafusion/push-scheduler/src/task.rs new file mode 100644 index 0000000000000..9121e9acd8273 --- /dev/null +++ b/datafusion/push-scheduler/src/task.rs @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`Task`], [`TaskWaker`], and [`ExecutionResults`] — the scheduling unit +//! and the query-level harness that owns it. Ported from PR +//! apache/datafusion#2226 `task.rs`. + +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::error::{DataFusionError, Result}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; +use datafusion_physical_plan::{ExecutionPlan, execute_stream, execute_stream_partitioned}; +use futures::channel::mpsc; +use futures::task::ArcWake; +use futures::{Stream, StreamExt, ready}; +use log::{debug, trace}; + +use crate::plan::{PipelinePlan, RoutablePipeline}; +use crate::scheduler::{Spawner, is_worker, spawn_local, spawn_local_fifo}; + +/// Spawn every output partition of every pipeline in `plan` as an initial +/// [`Task`]. Returns an [`ExecutionResults`] whose output partitions stream +/// the final pipeline's results back to the caller. +pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults { + debug!( + "Spawning plan: {} pipelines, {} output partitions", + plan.pipelines.len(), + plan.output_partitions + ); + + let (senders, receivers) = (0..plan.output_partitions) + .map(|_| mpsc::unbounded()) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + let context = Arc::new(ExecutionContext { + spawner, + pipelines: plan.pipelines, + schema: plan.schema, + output: senders, + }); + + for (pipeline_idx, routable) in context.pipelines.iter().enumerate() { + for partition in 0..routable.pipeline.output_partitions() { + let task = Task { + context: Arc::clone(&context), + waker: Arc::new(TaskWaker { + context: Arc::downgrade(&context), + wake_count: AtomicUsize::new(1), + pipeline: pipeline_idx, + partition, + }), + }; + context.spawner.spawn(task); + } + } + + let streams = receivers + .into_iter() + .map(|receiver| ExecutionResultStream { + receiver, + context: Arc::clone(&context), + }) + .collect(); + + ExecutionResults { + schema: Arc::clone(&context.schema), + kind: ExecutionResultsKind::Scheduled(streams), + } +} + +/// One schedulable unit — an output partition of a pipeline that *may* be +/// able to make progress. +pub struct Task { + /// Shared query state. Holds the pipeline list, output channels, and + /// spawner used for re-enqueueing. + pub(crate) context: Arc, + pub(crate) waker: Arc, +} + +impl std::fmt::Debug for Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let output = &self.context.pipelines[self.waker.pipeline].output; + f.debug_struct("Task") + .field("pipeline", &self.waker.pipeline) + .field("partition", &self.waker.partition) + .field("output", output) + .finish() + } +} + +impl Task { + fn handle_error( + &self, + partition: usize, + routable: &RoutablePipeline, + error: DataFusionError, + ) { + // Surface the error on this partition's output channel. + self.context.send_query_output(partition, Err(error)); + + // Close the downstream link so its task observes EOS for this + // partition. We use a best-effort empty push-error to keep the + // API simple; the important thing is the close. + if let Some(link) = routable.output { + trace!( + "Closing pipeline: {link:?}, partition: {} (error propagation)", + self.waker.partition, + ); + self.context.pipelines[link.pipeline] + .pipeline + .close(link.child, self.waker.partition); + } else { + // No downstream — mark this output partition finished. + self.context.finish(partition); + } + } + + /// Core worker step. Calls + /// [`Pipeline::poll_partition`](crate::pipeline::Pipeline::poll_partition) + /// and routes the result. MUST be called on a worker thread. + pub fn do_work(self) { + debug_assert!(is_worker(), "Task::do_work called outside of worker pool"); + if self.context.is_cancelled() { + return; + } + + // Capture the wake count prior to calling poll_partition; this + // lets us detect concurrent wake-ups and reschedule correctly. + let wake_count = self.waker.wake_count.load(Ordering::SeqCst); + + let node = self.waker.pipeline; + let partition = self.waker.partition; + + let waker = futures::task::waker_ref(&self.waker); + let mut cx = Context::from_waker(&waker); + + let pipelines = &self.context.pipelines; + let routable = &pipelines[node]; + let poll = routable.pipeline.poll_partition(&mut cx, partition); + // Release the waker borrow so `self` can be moved below. + let _ = waker; + match poll { + Poll::Ready(Some(Ok(batch))) => { + trace!("Poll {self:?}: Ok rows={}", batch.num_rows()); + match routable.output { + Some(link) => { + let push_result = pipelines[link.pipeline] + .pipeline + .push(batch, link.child, partition); + if let Err(e) = push_result { + self.handle_error(partition, routable, e); + return; + } + } + None => self.context.send_query_output(partition, Ok(batch)), + } + + // Reschedule ourselves AFTER routing the batch. Using FIFO + // so freshly awoken tasks triggered by the `push` above + // get a chance to run first. + spawn_local_fifo(self); + } + Poll::Ready(Some(Err(e))) => { + trace!("Poll {self:?}: Err {e:?}"); + self.handle_error(partition, routable, e); + } + Poll::Ready(None) => { + trace!("Poll {self:?}: None"); + match routable.output { + Some(link) => { + pipelines[link.pipeline] + .pipeline + .close(link.child, partition); + } + None => self.context.finish(partition), + } + } + Poll::Pending => { + trace!("Poll {self:?}: Pending"); + // Try to reset the wake count to 0. If that fails, a wake + // happened during poll_partition and we must reschedule. + let reset = self.waker.wake_count.compare_exchange( + wake_count, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ); + if reset.is_err() { + trace!("Wakeup during poll_partition: {self:?}"); + spawn_local(self); + } + } + } + } +} + +/// Per-query shared state — pipelines, output channels, and the spawner. +pub(crate) struct ExecutionContext { + pub(crate) spawner: Spawner, + pub(crate) pipelines: Vec, + pub schema: SchemaRef, + output: Vec>>>, +} + +impl std::fmt::Debug for ExecutionContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutionContext") + .field("pipelines", &self.pipelines.len()) + .field("output_partitions", &self.output.len()) + .finish() + } +} + +impl Drop for ExecutionContext { + fn drop(&mut self) { + debug!("ExecutionContext dropped"); + } +} + +impl ExecutionContext { + fn is_cancelled(&self) -> bool { + self.output.iter().all(|x| x.is_closed()) + } + + fn send_query_output(&self, partition: usize, output: Result) { + let _ = self.output[partition].unbounded_send(Some(output)); + } + + fn finish(&self, partition: usize) { + let _ = self.output[partition].unbounded_send(None); + } +} + +/// Waker for a [`Task`]. Implements [`ArcWake`] so it can be turned into a +/// `std::task::Waker` via [`futures::task::waker_ref`]. On wake, atomically +/// bumps `wake_count` — if the value prior to the increment was 0 the task +/// is re-enqueued, otherwise the wakeup is coalesced. +pub(crate) struct TaskWaker { + context: Weak, + wake_count: AtomicUsize, + pipeline: usize, + partition: usize, +} + +impl ArcWake for TaskWaker { + fn wake(self: Arc) { + if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 { + trace!("Ignoring duplicate wakeup"); + return; + } + + if let Some(context) = self.context.upgrade() { + let task = Task { + context, + waker: Arc::clone(&self), + }; + if is_worker() { + spawn_local(task); + } else { + task.context.spawner.clone().spawn(task); + } + } else { + trace!("Dropped wakeup (context gone)"); + } + } + + fn wake_by_ref(arc_self: &Arc) { + ArcWake::wake(Arc::clone(arc_self)) + } +} + +// --------------------------------------------------------------------------- +// Caller-facing result streams. +// --------------------------------------------------------------------------- + +/// Results of scheduling a plan. Drop to cancel. +/// +/// Two shapes: +/// +/// * **Scheduled** — a [`PipelinePlan`] with ≥1 breaker cut is driving +/// execution on the scheduler's worker pool. Each output partition is +/// fed by its own mpsc channel from a `Task`. +/// * **Direct** — the plan had no breakers (no `RepartitionExec`, +/// `CoalescePartitionsExec`, or cuttable `SortExec`), so the scheduler +/// is a no-op: we hand back the raw `ExecutionPlan::execute(p)` streams +/// unchanged. Avoids all scheduler overhead for trivially simple +/// queries. +pub struct ExecutionResults { + schema: SchemaRef, + kind: ExecutionResultsKind, +} + +enum ExecutionResultsKind { + Scheduled(Vec), + /// No breakers — defer execution to the wrapped plan so `stream()` + /// and `stream_partitioned()` match the default path's + /// `execute_stream` / `execute_stream_partitioned` behaviour + /// (including wrapping multi-partition plans in + /// `CoalescePartitionsExec` for concurrent per-partition reads). + Direct { + plan: Arc, + context: Arc, + }, +} + +impl ExecutionResults { + pub(crate) fn direct( + schema: SchemaRef, + plan: Arc, + context: Arc, + ) -> Self { + Self { + schema, + kind: ExecutionResultsKind::Direct { plan, context }, + } + } + + /// Merge all partitions into one [`SendableRecordBatchStream`]. + pub fn stream(self) -> SendableRecordBatchStream { + match self.kind { + ExecutionResultsKind::Scheduled(streams) => { + // Each `ExecutionResultStream` already holds its own + // `Arc`, so the task state stays alive + // until all partitions are drained. + Box::pin(RecordBatchStreamAdapter::new( + self.schema, + futures::stream::select_all(streams), + )) + } + ExecutionResultsKind::Direct { plan, context } => { + let schema = Arc::clone(&self.schema); + execute_stream(plan, context).unwrap_or_else(|e| { + Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { Err(e) }), + )) + }) + } + } + } + + /// Return one [`SendableRecordBatchStream`] per output partition. + pub fn stream_partitioned(self) -> Vec { + match self.kind { + ExecutionResultsKind::Scheduled(streams) => { + streams.into_iter().map(|s| Box::pin(s) as _).collect() + } + ExecutionResultsKind::Direct { plan, context } => { + execute_stream_partitioned(plan, context).unwrap_or_default() + } + } + } +} + +/// One output partition's result stream. +struct ExecutionResultStream { + receiver: mpsc::UnboundedReceiver>>, + context: Arc, +} + +impl Stream for ExecutionResultStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten(); + Poll::Ready(opt) + } +} + +impl RecordBatchStream for ExecutionResultStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.context.schema) + } +} diff --git a/datafusion/push-scheduler/src/worker_pool.rs b/datafusion/push-scheduler/src/worker_pool.rs new file mode 100644 index 0000000000000..9dc6cfead3214 --- /dev/null +++ b/datafusion/push-scheduler/src/worker_pool.rs @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Crossbeam-deque work-stealing pool for push-based scheduler tasks. +//! +//! Each worker is an OS thread that: +//! * enters a **shared** `tokio::runtime::Handle` (so `tokio::spawn` inside +//! wrapped pull-based operators goes to a real multi-thread runtime), and +//! * runs a tight blocking loop that pops [`BoxedJob`]s from the work-stealing +//! queue and executes them synchronously. +//! +//! # Queues +//! +//! * A shared [`Injector`] receives all external submissions. +//! * Each worker owns: +//! * a stealable [`Worker`] (LIFO) for `spawn_local` pushes, +//! * a thread-local `VecDeque` FIFO side-queue (non-stealable) +//! for `spawn_local_fifo` — used by `Task::do_work` to reschedule +//! itself after routing a batch, preserving PR #2226's intent of +//! prioritising freshly woken work. +//! +//! Search order on each worker iteration: local FIFO → own LIFO deque → +//! shared injector (batch-steal) → peer stealers (round-robin). Idle +//! workers park on a `Condvar` keyed by a single atomic wake counter; +//! submissions bump that counter to unblock one parker. + +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex as StdMutex}; +use std::thread::JoinHandle; +use std::time::Duration; + +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use datafusion_common::error::{DataFusionError, Result}; +use tokio::runtime::Handle; + +/// A job queued onto a worker: a `Send` closure that, when invoked on the +/// worker thread, does any work it needs (including synchronously driving +/// one step of a [`Task`](crate::task::Task)). +pub(crate) type BoxedJob = Box; + +/// State shared by the pool handle and every worker thread. +struct Shared { + /// Unique pool id, used by [`current_worker`] to reject matches when + /// multiple pools coexist in the same process. + id: usize, + /// Global submission queue — any thread may push, any worker may steal + /// a batch. Wait-free. + injector: Injector, + /// One stealer per worker, used by peers when their own deque is empty. + stealers: Vec>, + /// Parker state. Workers wait on `wake_cvar` while the pool is idle; + /// submissions bump `wake_count` and notify one waiter. + wake_count: AtomicUsize, + wake_mutex: StdMutex<()>, + wake_cvar: Condvar, + /// Shut down flag — set on drop. Workers exit on the next wake. + shutdown: AtomicBool, + /// Tokio runtime the workers attach to (so `tokio::spawn` inside + /// wrapped pipelines targets a real async runtime). + handle: Handle, +} + +static NEXT_POOL_ID: AtomicUsize = AtomicUsize::new(0); + +thread_local! { + /// `Some((pool_id, worker_id))` on threads that host a worker loop; + /// `None` elsewhere. + static CURRENT_WORKER: Cell> = const { Cell::new(None) }; + + /// FIFO side-queue used by [`spawn_local_fifo`]. Only the owner touches + /// this — no synchronisation needed. + static FIFO_QUEUE: RefCell> = RefCell::new(VecDeque::new()); + + /// Raw pointer to the owning worker's LIFO deque. Set by `run_worker` + /// before entering the loop and cleared on exit. Safe to dereference + /// from any code running inside the worker's job execution. + static OWNER_DEQUE: Cell>> = const { Cell::new(None) }; + + /// Handle to the pool that owns the current worker. + static CURRENT_SHARED: RefCell>> = const { RefCell::new(None) }; +} + +/// Pool of OS threads with crossbeam-deque work-stealing, entered into +/// a shared tokio runtime handle. +pub struct WorkerPool { + shared: Arc, + /// Join handles for the worker OS threads. Kept so [`Drop`] can cleanly + /// wait for workers to finish. + handles: StdMutex>>>, +} + +impl WorkerPool { + /// Spawn `workers` OS threads attached to the given tokio runtime + /// handle. `handle` is used by the workers to drive async futures + /// spawned by wrapped pipelines (e.g. `tokio::spawn` inside + /// `ExecutionPlan::execute` internals). + pub fn new(workers: usize, handle: Handle) -> Result> { + if workers == 0 { + return Err(DataFusionError::Configuration( + "WorkerPool requires at least one worker".to_string(), + )); + } + + let mut locals: Vec>> = Vec::with_capacity(workers); + let mut stealers = Vec::with_capacity(workers); + for _ in 0..workers { + let w = Worker::::new_lifo(); + stealers.push(w.stealer()); + locals.push(Some(w)); + } + + let shared = Arc::new(Shared { + id: NEXT_POOL_ID.fetch_add(1, Ordering::Relaxed), + injector: Injector::new(), + stealers, + wake_count: AtomicUsize::new(0), + wake_mutex: StdMutex::new(()), + wake_cvar: Condvar::new(), + shutdown: AtomicBool::new(false), + handle, + }); + + let mut handles = Vec::with_capacity(workers); + for (i, slot) in locals.iter_mut().enumerate() { + let shared_clone = Arc::clone(&shared); + let local = slot.take().expect("local worker deque missing"); + let th = std::thread::Builder::new() + .name(format!("push-sched-worker-{i}")) + .spawn(move || { + run_worker(i, &shared_clone, &local); + // `local` lives until here, dropped after the worker + // loop exits so any in-flight stealers see an empty + // deque. + drop(local); + }) + .map_err(|e| { + DataFusionError::External( + format!("failed to spawn push-scheduler worker: {e}").into(), + ) + })?; + handles.push(th); + } + + Ok(Arc::new(Self { + shared, + handles: StdMutex::new(Some(handles)), + })) + } + + pub fn worker_count(&self) -> usize { + self.shared.stealers.len() + } + + pub fn id(&self) -> usize { + self.shared.id + } + + /// Submit a job to the shared injector. + pub fn spawn(&self, job: BoxedJob) { + self.shared.injector.push(job); + self.shared.wake_one(); + } +} + +impl Drop for WorkerPool { + fn drop(&mut self) { + self.shared.shutdown.store(true, Ordering::SeqCst); + self.shared.wake_all(); + if let Some(handles) = self.handles.lock().unwrap().take() { + for h in handles { + let _ = h.join(); + } + } + } +} + +impl Shared { + /// Bump the wake counter and notify one parker. + fn wake_one(&self) { + self.wake_count.fetch_add(1, Ordering::SeqCst); + // Holding the mutex while notifying is the standard Condvar + // pattern — prevents a lost wake-up when a parker is in between + // checking the counter and calling wait(). + let _guard = self.wake_mutex.lock().unwrap(); + self.wake_cvar.notify_one(); + } + + fn wake_all(&self) { + self.wake_count.fetch_add(1, Ordering::SeqCst); + let _guard = self.wake_mutex.lock().unwrap(); + self.wake_cvar.notify_all(); + } +} + +fn run_worker(my_id: usize, shared: &Arc, local: &Worker) { + // Attach this thread to the shared tokio runtime so `tokio::spawn` + // calls inside pipeline futures target a real runtime. + let _enter = shared.handle.enter(); + + CURRENT_WORKER.with(|cell| cell.set(Some((shared.id, my_id)))); + OWNER_DEQUE.with(|cell| cell.set(Some(local as *const _))); + CURRENT_SHARED.with(|cell| *cell.borrow_mut() = Some(Arc::clone(shared))); + + let mut observed_wake = shared.wake_count.load(Ordering::SeqCst); + loop { + if shared.shutdown.load(Ordering::SeqCst) { + break; + } + if let Some(job) = find_job(local, shared, my_id) { + job(); + continue; + } + + // No work. Park on the wake counter. + let current = shared.wake_count.load(Ordering::SeqCst); + if current != observed_wake { + // A wake has happened since we last looked — recheck queues. + observed_wake = current; + continue; + } + let mut guard = shared.wake_mutex.lock().unwrap(); + while shared.wake_count.load(Ordering::SeqCst) == observed_wake + && !shared.shutdown.load(Ordering::SeqCst) + { + // Brief timeout prevents any pathological wake-up loss from + // wedging the pool forever. + let (g, _timeout) = shared + .wake_cvar + .wait_timeout(guard, Duration::from_millis(50)) + .unwrap(); + guard = g; + } + observed_wake = shared.wake_count.load(Ordering::SeqCst); + } + + FIFO_QUEUE.with(|q| q.borrow_mut().clear()); + OWNER_DEQUE.with(|cell| cell.set(None)); + CURRENT_WORKER.with(|cell| cell.set(None)); + CURRENT_SHARED.with(|cell| *cell.borrow_mut() = None); +} + +#[inline] +fn find_job(local: &Worker, shared: &Shared, my_id: usize) -> Option { + if let Some(job) = FIFO_QUEUE.with(|q| q.borrow_mut().pop_front()) { + return Some(job); + } + if let Some(job) = local.pop() { + return Some(job); + } + loop { + match shared.injector.steal_batch_and_pop(local) { + Steal::Success(job) => return Some(job), + Steal::Empty => break, + Steal::Retry => continue, + } + } + let n = shared.stealers.len(); + for offset in 1..n { + let victim = (my_id + offset) % n; + loop { + match shared.stealers[victim].steal_batch_and_pop(local) { + Steal::Success(job) => return Some(job), + Steal::Empty => break, + Steal::Retry => continue, + } + } + } + None +} + +// --------------------------------------------------------------------------- +// Thread-local helpers used by `scheduler::spawn_local{_fifo}`. +// --------------------------------------------------------------------------- + +pub fn is_worker() -> bool { + CURRENT_WORKER.with(|cell| cell.get().is_some()) +} + +pub fn current_worker() -> Option<(usize, usize)> { + CURRENT_WORKER.with(|cell| cell.get()) +} + +/// Push a job onto the current worker's LIFO deque. Falls back to the +/// shared injector if called off a worker thread or if no pool context +/// is attached to this thread. +pub(crate) fn spawn_local(job: BoxedJob) { + let ptr = OWNER_DEQUE.with(|cell| cell.get()); + let shared = CURRENT_SHARED.with(|cell| cell.borrow().clone()); + + match ptr { + Some(ptr) => { + // SAFETY: the worker thread sets this pointer to its own local + // deque before entering the loop and clears it on exit; any + // code running on this thread sees a valid pointer. + let worker: &Worker = unsafe { &*ptr }; + worker.push(job); + // Don't wake peers — preserves cache locality. Owner picks + // this up on its next iteration (LIFO). + let _ = shared; + } + None => match shared { + Some(shared) => { + shared.injector.push(job); + shared.wake_one(); + } + None => { + drop(job); + log::error!("spawn_local called outside a pool worker; job dropped"); + } + }, + } +} + +/// Push a job onto the current worker's FIFO side-queue (non-stealable). +/// Falls back to [`spawn_local`] when called off a worker thread. +pub(crate) fn spawn_local_fifo(job: BoxedJob) { + let on_worker = OWNER_DEQUE.with(|c| c.get().is_some()); + if on_worker { + FIFO_QUEUE.with(|q| q.borrow_mut().push_back(job)); + } else { + spawn_local(job); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::instant::Instant; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + use tokio::sync::oneshot; + + #[test] + fn spawn_runs_on_a_worker() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let pool = WorkerPool::new(3, Handle::current()).unwrap(); + let (tx, rx) = oneshot::channel(); + pool.spawn(Box::new(move || { + let _ = tx.send(is_worker()); + })); + assert!(rx.await.unwrap()); + }); + } + + #[test] + fn many_jobs_run_in_parallel() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let pool = WorkerPool::new(4, Handle::current()).unwrap(); + let counter = Arc::new(AtomicUsize::new(0)); + let start = Instant::now(); + let mut rxs = Vec::new(); + for _ in 0..16 { + let (tx, rx) = oneshot::channel(); + let c = Arc::clone(&counter); + pool.spawn(Box::new(move || { + std::thread::sleep(Duration::from_millis(50)); + c.fetch_add(1, Ordering::Relaxed); + let _ = tx.send(()); + })); + rxs.push(rx); + } + for rx in rxs { + rx.await.unwrap(); + } + let elapsed = start.elapsed(); + assert_eq!(counter.load(Ordering::Relaxed), 16); + assert!( + elapsed < Duration::from_millis(400), + "expected 16 x 50ms jobs to fan across 4 workers; got {elapsed:?}" + ); + }); + } + + #[test] + fn spawn_local_fifo_inside_worker_uses_side_queue() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let pool = WorkerPool::new(1, Handle::current()).unwrap(); + let (tx, rx) = oneshot::channel(); + pool.spawn(Box::new(move || { + spawn_local_fifo(Box::new(move || { + let _ = tx.send(is_worker()); + })); + })); + assert!(rx.await.unwrap()); + }); + } +} diff --git a/datafusion/push-scheduler/tests/end_to_end.rs b/datafusion/push-scheduler/tests/end_to_end.rs new file mode 100644 index 0000000000000..7dc746c14ba44 --- /dev/null +++ b/datafusion/push-scheduler/tests/end_to_end.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end tests driving SQL through `datafusion_push_scheduler::Scheduler` +//! and comparing results against the default pull-based path. + +use std::sync::Arc; + +use arrow::array::{Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use datafusion::datasource::MemTable; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::collect; +use datafusion::prelude::SessionConfig; +use datafusion_common::Result; +use datafusion_push_scheduler::{PipelinePlanner, Scheduler}; +use futures::StreamExt; + +/// Build a small test dataset with multiple partitions so repartition / +/// sort paths get exercised. +fn sample_context() -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("grp", DataType::Utf8, false), + Field::new("v", DataType::Int32, false), + ])); + + let make_batch = |ids: &[i32], grps: &[&str], vs: &[i32]| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(ids.to_vec())), + Arc::new(StringArray::from(grps.to_vec())), + Arc::new(Int32Array::from(vs.to_vec())), + ], + ) + .unwrap() + }; + + let b1 = make_batch(&[1, 3, 5, 7], &["a", "b", "a", "b"], &[10, 20, 30, 40]); + let b2 = make_batch(&[2, 4, 6, 8], &["a", "b", "a", "b"], &[11, 21, 31, 41]); + let b3 = make_batch(&[9, 11, 13, 15], &["a", "b", "a", "b"], &[12, 22, 32, 42]); + + let config = SessionConfig::new() + .with_target_partitions(4) + .with_batch_size(2); + let ctx = SessionContext::new_with_config(config); + + // Register the table with 2 pre-existing partitions so the pipeline + // sees >1 input partition at the source. + let table = MemTable::try_new(Arc::clone(&schema), vec![vec![b1, b2], vec![b3]])?; + ctx.register_table("t", Arc::new(table))?; + Ok(ctx) +} + +async fn collect_default(ctx: &SessionContext, sql: &str) -> Result> { + let df = ctx.sql(sql).await?; + let plan = df.create_physical_plan().await?; + collect(plan, ctx.task_ctx()).await +} + +async fn collect_scheduler(ctx: &SessionContext, sql: &str) -> Result> { + let df = ctx.sql(sql).await?; + let plan = df.create_physical_plan().await?; + let scheduler = Scheduler::new(num_cpus())?; + let mut stream = scheduler.schedule(plan, ctx.task_ctx())?.stream(); + + let mut out = Vec::new(); + while let Some(batch) = stream.next().await { + out.push(batch?); + } + Ok(out) +} + +fn num_cpus() -> usize { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4) +} + +fn normalise(batches: &[RecordBatch]) -> String { + // Compare as sorted text to paper over partition / batch-boundary + // differences between the two execution paths. + let formatted = pretty_format_batches(batches).unwrap().to_string(); + let mut lines: Vec<&str> = formatted.lines().collect(); + lines.sort(); + lines.join("\n") +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn projection_and_filter_matches_default() -> Result<()> { + let ctx = sample_context()?; + let sql = "SELECT id, v FROM t WHERE id > 4"; + let a = collect_default(&ctx, sql).await?; + let b = collect_scheduler(&ctx, sql).await?; + assert_eq!(normalise(&a), normalise(&b)); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn repartition_hash_matches_default() -> Result<()> { + let ctx = sample_context()?; + // A GROUP BY forces a RepartitionExec in the plan. We wrap the + // aggregate with an ExecutionPipeline, so the repartition is the + // interesting push-based breaker exercised here. + let sql = "SELECT grp, COUNT(*) AS c, SUM(v) AS s FROM t GROUP BY grp"; + let a = collect_default(&ctx, sql).await?; + let b = collect_scheduler(&ctx, sql).await?; + assert_eq!(normalise(&a), normalise(&b)); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn sort_matches_default() -> Result<()> { + let ctx = sample_context()?; + let sql = "SELECT id, v FROM t ORDER BY v DESC, id ASC"; + let a = collect_default(&ctx, sql).await?; + let b = collect_scheduler(&ctx, sql).await?; + assert_eq!(normalise(&a), normalise(&b)); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn limit_with_sort_matches_default() -> Result<()> { + let ctx = sample_context()?; + let sql = "SELECT id, v FROM t ORDER BY v DESC LIMIT 3"; + let a = collect_default(&ctx, sql).await?; + let b = collect_scheduler(&ctx, sql).await?; + assert_eq!(normalise(&a), normalise(&b)); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn aggregate_with_where_and_order_by_matches_default() -> Result<()> { + let ctx = sample_context()?; + let sql = "\ + SELECT grp, SUM(v) AS s FROM t \ + WHERE id <= 10 \ + GROUP BY grp \ + ORDER BY s DESC"; + let a = collect_default(&ctx, sql).await?; + let b = collect_scheduler(&ctx, sql).await?; + assert_eq!(normalise(&a), normalise(&b)); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn planner_cuts_at_repartition_for_group_by() -> Result<()> { + // Confirm that the planner actually produces multiple pipelines for + // a query with a RepartitionExec — i.e. that Inbox rewiring is on. + let ctx = sample_context()?; + let sql = "SELECT grp, COUNT(*) FROM t GROUP BY grp"; + let df = ctx.sql(sql).await?; + let plan = df.create_physical_plan().await?; + let pipeline_plan = PipelinePlanner::new(plan, ctx.task_ctx()).build()?; + assert!( + pipeline_plan.pipelines.len() >= 2, + "expected the group-by plan to be split into >=2 pipelines \ + (breaker cut at RepartitionExec), got {}", + pipeline_plan.pipelines.len(), + ); + Ok(()) +}