From d4b564a1a2550fecc5300b8af4a7f94ac1d18dcf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 3 May 2026 08:26:28 -0500 Subject: [PATCH 1/4] feat(parquet): row-group and row-range sampling on ParquetSource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + datafusion/datasource-parquet/Cargo.toml | 1 + datafusion/datasource-parquet/src/mod.rs | 2 + datafusion/datasource-parquet/src/opener.rs | 148 ++++- datafusion/datasource-parquet/src/sampling.rs | 540 ++++++++++++++++++ datafusion/datasource-parquet/src/source.rs | 55 ++ 6 files changed, 745 insertions(+), 2 deletions(-) create mode 100644 datafusion/datasource-parquet/src/sampling.rs diff --git a/Cargo.lock b/Cargo.lock index af52588e5338e..89f891bc81e15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2025,6 +2025,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "rand 0.9.4", "tempfile", "tokio", ] diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5855af17a536..60527ab5aa643 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -53,6 +53,7 @@ log = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } +rand = { workspace = true, features = ["small_rng"] } tokio = { workspace = true } [dev-dependencies] diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..25098577093d1 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -33,6 +33,7 @@ mod page_filter; mod reader; mod row_filter; mod row_group_filter; +mod sampling; mod sort; pub mod source; mod supported_predicates; @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; +pub use sampling::ParquetSampling; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..2f0f9b37cc314 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -136,6 +136,9 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Sampling config carried from `ParquetSource`. Applied lazily + /// inside the opener once the parquet metadata is available. + pub sampling: crate::sampling::ParquetSampling, } impl fmt::Debug for ParquetMorselizer { @@ -287,6 +290,7 @@ struct PreparedParquetOpen { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + sampling: crate::sampling::ParquetSampling, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, } @@ -656,6 +660,7 @@ impl ParquetMorselizer { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + sampling: self.sampling.clone(), #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, }) @@ -882,11 +887,33 @@ impl FiltersPreparedParquetOpen { // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query - let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan( + let mut initial_plan = create_initial_plan( &prepared.file_name, prepared.extensions.clone(), rg_metadata.len(), - )?); + )?; + + // Apply optional row-group and row-range sampling now that we + // know the actual row-group count. Both calls are no-ops when + // their respective fraction is `None`. Selection is + // deterministic per `(partition_index, row_group_index, + // fraction, cluster_size)` so re-runs match. The execution + // `partition_index` is the stable per-file id we plumb in: + // it makes sampling reproducible across environments without + // depending on object-store paths, and decorrelates files + // assigned to different partitions. + prepared.sampling.apply_row_group_sampling( + &mut initial_plan, + rg_metadata.len(), + prepared.partition_index, + ); + prepared.sampling.apply_row_fraction_sampling( + &mut initial_plan, + rg_metadata, + prepared.partition_index, + ); + + let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan); // If there is a range restricting what parts of the file to read if let Some(range) = prepared.file_range.as_ref() { @@ -1676,6 +1703,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + sampling: crate::sampling::ParquetSampling, } impl ParquetMorselizerBuilder { @@ -1702,9 +1730,16 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + sampling: crate::sampling::ParquetSampling::default(), } } + /// Set the sampling config. + fn with_sampling(mut self, sampling: crate::sampling::ParquetSampling) -> Self { + self.sampling = sampling; + self + } + /// Set the object store (required for building). fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); @@ -1816,6 +1851,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sampling: self.sampling, } } } @@ -2720,4 +2756,112 @@ mod test { "without page index all rows are returned" ); } + + /// End-to-end: a parquet file with 4 row groups, scanned with + /// `row_group_fraction = 0.5`, should return rows from exactly 2 + /// of the 4 row groups. + #[tokio::test] + async fn row_group_sampling_end_to_end() { + let store = Arc::new(InMemory::new()) as Arc; + + // 4 row groups of 3 rows each = 12 rows total. + let batches = (0..4) + .map(|g| { + record_batch!(( + "a", + Int32, + vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),] + )) + .unwrap() + }) + .collect::>(); + let schema = batches[0].schema(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + + let data_len = write_parquet_batches( + Arc::clone(&store), + "rg_sampled.parquet", + batches, + Some(props), + ) + .await; + + let file = PartitionedFile::new( + "rg_sampled.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let sampling = crate::sampling::ParquetSampling { + row_group_fraction: Some(0.5), + ..Default::default() + }; + + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_sampling(sampling) + .build(); + + let stream = open_file(&opener, file).await.unwrap(); + let (_num_batches, num_rows) = count_batches_and_rows(stream).await; + + // ceil(4 * 0.5) = 2 row groups kept, each with 3 rows. + assert_eq!( + num_rows, 6, + "row_group_fraction=0.5 over 4 row groups should yield 2 row groups × 3 rows" + ); + } + + /// End-to-end: a single row group of 100 rows scanned with + /// `row_fraction = 0.1` and the default cluster size should yield + /// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) = + /// 10` plus how the windows pack — we assert the count is in the + /// expected range and significantly less than 100. + #[tokio::test] + async fn row_fraction_end_to_end() { + let store = Arc::new(InMemory::new()) as Arc; + + // One row group of 100 rows so we exercise the per-row-group + // RowSelection, not the row-group-level skip. + let values: Vec> = (0..100).map(Some).collect(); + let batch = record_batch!(("a", Int32, values)).unwrap(); + let schema = batch.schema(); + let data_len = + write_parquet(Arc::clone(&store), "rf.parquet", batch.clone()).await; + let file = PartitionedFile::new( + "rf.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let sampling = crate::sampling::ParquetSampling { + row_fraction: Some(0.1), + row_cluster_size: 4, // small cluster -> several windows + ..Default::default() + }; + + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_sampling(sampling) + .build(); + + let stream = open_file(&opener, file).await.unwrap(); + let (_num_batches, num_rows) = count_batches_and_rows(stream).await; + + // We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3 + // windows of ceil(10/3)=4 rows each, capped at the total -> + // up to 12 rows in practice. Assert the bounds. + assert!( + (1..100).contains(&num_rows), + "row_fraction=0.1 should drop the vast majority of rows; got {num_rows}" + ); + assert!( + num_rows <= 16, + "row_fraction=0.1 should yield ~10-12 rows; got {num_rows}" + ); + } } diff --git a/datafusion/datasource-parquet/src/sampling.rs b/datafusion/datasource-parquet/src/sampling.rs new file mode 100644 index 0000000000000..c89a9c3e5b8ae --- /dev/null +++ b/datafusion/datasource-parquet/src/sampling.rs @@ -0,0 +1,540 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Random sampling primitives for parquet scans. +//! +//! [`ParquetSampling`] holds the sampling configuration carried on +//! [`crate::source::ParquetSource`]. The two `apply_*_sampling` +//! methods mutate a [`ParquetAccessPlan`] in place — they are invoked +//! by the parquet [`crate::opener`] once the file footer is loaded. +//! +//! Selection within a row group is deterministic-but-random per +//! `(file_index, row_group_index, fraction, cluster_size)`: the methods +//! seed an `SmallRng` from a hash of those inputs so re-runs match. +//! The caller supplies a stable `file_index` (for the parquet opener, +//! that is the execution `partition_index`) so sampling is independent +//! of the on-disk path string and is reproducible across environments. + +use crate::access_plan::ParquetAccessPlan; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use rand::Rng; + +/// Hierarchical sampling config for parquet scans. +/// +/// All fractions are in `(0.0, 1.0]`. `None` (or `1.0`) means "no +/// sampling". +/// +/// * `row_group_fraction` — within each scanned file, keep this +/// fraction of row groups. Decision made inside the opener after +/// the footer is loaded so we sample by actual row-group index. +/// * `row_fraction` — within each kept row group, keep this fraction +/// of rows by translating to a `RowSelection` of K small contiguous +/// windows spread across the row group. The parquet reader uses +/// the page index to read only the data pages covering the +/// selected rows, so this gives "page-level" IO savings without +/// requiring per-column page alignment. Falls back to scanning +/// whole pages if the page index is missing. +/// * `row_cluster_size` — controls how the per-row-group target is +/// split into contiguous windows. Smaller = more diversity, more +/// page-index lookups; larger = cheaper, fewer regions covered. +/// +/// **Why this lives here, not as a one-shot `ParquetAccessPlan`:** the +/// natural entry-point for "I want a sample" is at config time, before +/// any metadata IO has happened. The actual *which row groups* / +/// *which rows* selection still needs to be deferred until the opener +/// has the footer — that's why these fractions get carried through and +/// applied lazily. +/// +/// **Why no file-level fraction:** [`crate::source::ParquetSource`] +/// doesn't own the file list — that lives on `FileScanConfig.file_groups`. +/// Callers that want to drop files should rebuild the `FileScanConfig` +/// with a reduced `file_groups`. Adding a file-fraction setter here +/// would have been a no-op and confusing. +/// +/// Selection within a row group is deterministic-but-random per +/// `(file_index, row_group_index, fraction, cluster_size)`: we seed +/// an `SmallRng` from a hash of those inputs so re-runs match exactly. +/// The caller-supplied `file_index` is a stable per-file identifier +/// (the parquet opener uses the execution `partition_index`), keeping +/// sampling reproducible across environments without the keying +/// depending on object-store paths. +#[derive(Debug, Clone)] +pub struct ParquetSampling { + /// Fraction of row groups to keep in each scanned file. + pub row_group_fraction: Option, + /// Fraction of rows to keep within each scanned row group. + pub row_fraction: Option, + /// Maximum size (in rows) of each contiguous window emitted by + /// row-fraction sampling. The total target rows are split into + /// `ceil(target / row_cluster_size)` windows distributed across + /// the row group with a random offset within each stride. + pub row_cluster_size: usize, +} + +impl Default for ParquetSampling { + fn default() -> Self { + Self { + row_group_fraction: None, + row_fraction: None, + row_cluster_size: 32_768, + } + } +} + +impl ParquetSampling { + /// Mutate `plan` in-place to keep only a random subset of its + /// currently-scannable row groups, sized to `self.row_group_fraction` + /// of the total. No-op if `row_group_fraction` is `None`, `>= 1.0`, + /// or out of range. + /// + /// Selection is deterministic given `(file_index, row_group_count, + /// fraction)`: we seed an `SmallRng` from a hash of those inputs + /// and use a partial Fisher-Yates shuffle. Same inputs → same + /// sample on re-runs. Different `file_index` values produce + /// uncorrelated samples even when row-group counts and fractions + /// match, so files in the same scan don't all keep the same + /// indices. + pub(crate) fn apply_row_group_sampling( + &self, + plan: &mut ParquetAccessPlan, + row_group_count: usize, + file_index: usize, + ) { + use rand::SeedableRng; + use rand::seq::SliceRandom; + + let Some(fraction) = self.row_group_fraction else { + return; + }; + if row_group_count == 0 || !(0.0..1.0).contains(&fraction) { + return; // no-op for fraction >= 1.0 (keep all) or invalid input + } + let target = ((row_group_count as f64) * fraction).ceil().max(1.0) as usize; + if target >= row_group_count { + return; + } + + let seed = derive_seed( + b"row-group", + file_index, + row_group_count, + fraction, + self.row_cluster_size, + ); + + let mut indices: Vec = (0..row_group_count).collect(); + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + indices.shuffle(&mut rng); + let kept: std::collections::HashSet = + indices.into_iter().take(target).collect(); + + for i in 0..row_group_count { + if !kept.contains(&i) { + plan.skip(i); + } + } + } + + /// For each row group still marked `Scan`, replace it with a + /// `Selection` that covers `self.row_fraction` of the row group's + /// rows in `ceil(target / self.row_cluster_size)` spread-out + /// contiguous windows. No-op if `row_fraction` is `None`, `>= 1.0`, + /// or out of range. + /// + /// Selection is deterministic per `(file_index, row_group_index, + /// fraction, cluster_size)`. + /// + /// If the parquet file has page indexes, the reader only reads the + /// data pages covering the selected rows. If page indexes are + /// missing, the reader still has to read whole pages and discard + /// rows; the IO win disappears in that case but correctness is + /// unaffected. + pub(crate) fn apply_row_fraction_sampling( + &self, + plan: &mut ParquetAccessPlan, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + file_index: usize, + ) { + use rand::SeedableRng; + + let Some(fraction) = self.row_fraction else { + return; + }; + if !(0.0..1.0).contains(&fraction) { + return; // no-op for fraction >= 1.0 (keep all) or invalid input + } + let cluster_size = self.row_cluster_size.max(1); + + for (idx, rg) in rg_metadata.iter().enumerate() { + if !plan.should_scan(idx) { + continue; + } + let total_rows = rg.num_rows() as usize; + if total_rows == 0 { + continue; + } + + let seed = + derive_seed(b"row-fraction", file_index, idx, fraction, cluster_size); + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + + let target_rows = ((total_rows as f64) * fraction).ceil().max(1.0) as usize; + let target_rows = target_rows.min(total_rows); + + let Some(selectors) = build_row_window_selectors( + total_rows, + target_rows, + cluster_size, + &mut rng, + ) else { + continue; + }; + plan.scan_selection(idx, RowSelection::from(selectors)); + } + } +} + +/// Hash the given inputs into a stable `u64` seed. +/// +/// Uses Rust's `DefaultHasher`. Within a single binary the output is +/// deterministic; we deliberately don't promise stability across Rust +/// versions because sampling is a probabilistic optimization, not a +/// data-integrity boundary. +fn derive_seed( + domain: &[u8], + file_index: usize, + secondary_index: usize, + fraction: f64, + cluster_size: usize, +) -> u64 { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + domain.hash(&mut hasher); + file_index.hash(&mut hasher); + secondary_index.hash(&mut hasher); + fraction.to_bits().hash(&mut hasher); + cluster_size.hash(&mut hasher); + hasher.finish() +} + +/// Build a `[skip, select, ..., skip]` selector layout that picks +/// `~target_rows` rows out of `total_rows` as several spread-out +/// contiguous windows. +/// +/// Strategy: +/// +/// 1. Choose `n_windows = ceil(target_rows / cluster_size)`. The +/// `cluster_size` cap keeps individual selected ranges small enough +/// to align with parquet data-page granularity. +/// 2. Choose `window_size = min(ceil(target_rows / n_windows), stride)` +/// where `stride = total_rows / n_windows`. Capping at `stride` is +/// what guarantees windows in adjacent strides cannot overlap even +/// when `target_rows` is close to `total_rows`. The per-window cap +/// means total selected rows can be slightly below `target_rows` +/// near `fraction → 1.0`; that is the price for correctness. +/// 3. For each stride `[w * stride, (w + 1) * stride)`, place a window +/// of size `window_size` at a random offset in `[0, stride - +/// window_size]`. This jitter makes the sample uniform within each +/// stride rather than always-from-the-start. +/// +/// Returns `None` if no valid layout is possible (e.g. zero-row group, +/// zero `cluster_size`, or `target_rows == 0`). The caller should +/// treat that as "nothing to select for this row group" and leave its +/// `Scan` entry untouched. +/// +/// ## Invariants (verified by `row_window_selection_*` tests) +/// +/// * Selectors cover exactly `total_rows` rows in total — sum of +/// skip + select equals the row group size. +/// * No two `select` selectors overlap; rows in the layout are +/// strictly monotonic. +/// * Every selected position is in `0..total_rows`. +/// * Total selected rows is positive and within +/// `[1, n_windows * window_size]`. It can sit slightly above +/// `target_rows` when `target_rows / n_windows` is not an integer +/// (each window rounds up); the cap at `n_windows * window_size` +/// keeps the over-shoot bounded by `n_windows`. +fn build_row_window_selectors( + total_rows: usize, + target_rows: usize, + cluster_size: usize, + rng: &mut impl Rng, +) -> Option> { + if total_rows == 0 || target_rows == 0 || cluster_size == 0 { + return None; + } + let target_rows = target_rows.min(total_rows); + + let n_windows = target_rows.div_ceil(cluster_size).max(1); + let stride = total_rows / n_windows; + if stride == 0 { + return None; + } + // Capping `window_size` at `stride` is what guarantees adjacent + // strides cannot produce overlapping windows: each window lives + // entirely inside its own stride. + let window_size = target_rows.div_ceil(n_windows).min(stride); + if window_size == 0 { + return None; + } + + let mut selectors: Vec = Vec::with_capacity(2 * n_windows + 1); + let mut cursor = 0usize; + for w in 0..n_windows { + let stride_start = w * stride; + let max_offset = stride - window_size; // safe: window_size <= stride + let offset = if max_offset == 0 { + 0 + } else { + rng.random_range(0..=max_offset) + }; + let win_start = stride_start + offset; + let win_end = win_start + window_size; // <= stride_start + stride <= total_rows + debug_assert!(win_start >= cursor, "windows must be monotonic"); + debug_assert!(win_end <= total_rows, "window must stay in-bounds"); + if win_start > cursor { + selectors.push(RowSelector::skip(win_start - cursor)); + } + selectors.push(RowSelector::select(window_size)); + cursor = win_end; + } + if cursor < total_rows { + selectors.push(RowSelector::skip(total_rows - cursor)); + } + Some(selectors) +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::SeedableRng; + use rand::rngs::SmallRng; + + fn count_scanned(plan: &ParquetAccessPlan) -> usize { + (0..plan.len()).filter(|i| plan.should_scan(*i)).count() + } + + fn cfg(row_group_fraction: Option) -> ParquetSampling { + ParquetSampling { + row_group_fraction, + ..Default::default() + } + } + + #[test] + fn row_group_sampling_keeps_target_count() { + let mut plan = ParquetAccessPlan::new_all(20); + cfg(Some(0.25)).apply_row_group_sampling(&mut plan, 20, 0); + assert_eq!(count_scanned(&plan), 5, "ceil(20 * 0.25) = 5"); + } + + #[test] + fn row_group_sampling_is_deterministic() { + let mut a = ParquetAccessPlan::new_all(50); + let mut b = ParquetAccessPlan::new_all(50); + cfg(Some(0.10)).apply_row_group_sampling(&mut a, 50, 0); + cfg(Some(0.10)).apply_row_group_sampling(&mut b, 50, 0); + let kept_a: Vec = (0..50).filter(|i| a.should_scan(*i)).collect(); + let kept_b: Vec = (0..50).filter(|i| b.should_scan(*i)).collect(); + assert_eq!(kept_a, kept_b, "same seed inputs -> same selection"); + } + + #[test] + fn row_group_sampling_differs_per_file_index() { + let mut a = ParquetAccessPlan::new_all(50); + let mut b = ParquetAccessPlan::new_all(50); + cfg(Some(0.10)).apply_row_group_sampling(&mut a, 50, 0); + cfg(Some(0.10)).apply_row_group_sampling(&mut b, 50, 1); + let kept_a: Vec = (0..50).filter(|i| a.should_scan(*i)).collect(); + let kept_b: Vec = (0..50).filter(|i| b.should_scan(*i)).collect(); + // Hash differs by file_index -> overwhelmingly likely the + // sampled subsets differ. (Probability of identical 5-of-50 + // pick by chance is ~5e-7.) + assert_ne!(kept_a, kept_b); + } + + #[test] + fn row_group_sampling_no_op_when_fraction_is_one() { + let mut plan = ParquetAccessPlan::new_all(8); + cfg(Some(1.0)).apply_row_group_sampling(&mut plan, 8, 0); + assert_eq!(count_scanned(&plan), 8, "fraction>=1.0 keeps all"); + } + + #[test] + fn row_group_sampling_target_at_least_one() { + let mut plan = ParquetAccessPlan::new_all(100); + cfg(Some(0.001)).apply_row_group_sampling(&mut plan, 100, 0); + // ceil(100 * 0.001) == 1, so we keep at least one row group. + assert_eq!(count_scanned(&plan), 1); + } + + #[test] + fn row_group_sampling_no_op_when_unset() { + let mut plan = ParquetAccessPlan::new_all(8); + cfg(None).apply_row_group_sampling(&mut plan, 8, 0); + assert_eq!(count_scanned(&plan), 8, "no fraction set -> no-op"); + } + + /// Sum of skip + select selectors equals total_rows; positions + /// are monotonic and in-bounds; selected count is close to (but + /// can slightly exceed) target_rows because of ceil rounding when + /// splitting target across windows. Hand-checked single case to + /// anchor the fuzz tests. + #[test] + fn row_window_selection_basic_layout() { + let mut rng = SmallRng::seed_from_u64(1); + // total=100, target=10, cluster=4 → n_windows=3, window_size=4, + // so total selected = 12 (slightly above target due to ceil). + let selectors = build_row_window_selectors(100, 10, 4, &mut rng).unwrap(); + let (skip, select) = sum_selectors(&selectors); + assert_eq!( + skip + select, + 100, + "selectors must cover the full row group" + ); + assert!(select > 0, "must select something"); + assert!( + select <= 12, + "selected {select} exceeds n_windows * window_size = 12" + ); + verify_no_overlap(&selectors, 100); + } + + #[test] + fn row_window_selection_returns_none_on_invalid_input() { + let mut rng = SmallRng::seed_from_u64(0); + assert!(build_row_window_selectors(0, 5, 4, &mut rng).is_none()); + assert!(build_row_window_selectors(100, 0, 4, &mut rng).is_none()); + assert!(build_row_window_selectors(100, 10, 0, &mut rng).is_none()); + } + + #[test] + fn row_window_selection_full_target_no_overlap() { + // target_rows == total_rows is the worst case for overlap: the + // earlier ceil-based window_size could exceed stride. Verify + // the cap at stride keeps things disjoint. + let mut rng = SmallRng::seed_from_u64(0); + let selectors = build_row_window_selectors(100, 100, 7, &mut rng).unwrap(); + verify_no_overlap(&selectors, 100); + } + + /// Fuzz: across 5_000 randomized configurations of (total_rows, + /// target_rows, cluster_size, seed), every layout must satisfy the + /// invariants documented on `build_row_window_selectors`. + /// + /// This is the regression test the reviewer asked for in + /// https://github.com/apache/datafusion/pull/22000#discussion_r3187392811. + #[test] + fn row_window_selection_fuzz_invariants() { + let mut driver = SmallRng::seed_from_u64(0xD474_F051_0000_0001); + for _ in 0..5_000 { + let total_rows = driver.random_range(1usize..=4_096); + let target_rows = driver.random_range(1usize..=total_rows); + // Mix small and large cluster sizes — the small ones force + // many windows and stress the per-stride math. + let cluster_size = driver.random_range(1usize..=2_048); + let seed = driver.random::(); + let mut rng = SmallRng::seed_from_u64(seed); + let Some(selectors) = build_row_window_selectors( + total_rows, + target_rows, + cluster_size, + &mut rng, + ) else { + continue; + }; + let (skip, select) = sum_selectors(&selectors); + assert_eq!( + skip + select, + total_rows, + "selectors must cover the row group exactly: total_rows={total_rows}, target={target_rows}, cluster={cluster_size}, seed={seed}" + ); + // `select` can sit slightly above `target_rows` because + // `window_size = ceil(target / n_windows)` rounds up. The + // bound below is the constructive maximum: each of the + // `n_windows` windows holds at most `window_size` rows. + let n_windows = target_rows.div_ceil(cluster_size).max(1); + let stride = total_rows / n_windows; + let window_size = target_rows.div_ceil(n_windows).min(stride); + let max_select = n_windows.saturating_mul(window_size); + assert!( + select > 0 && select <= max_select && select <= total_rows, + "selected count {select} not in (0, min({max_select}, {total_rows})] for total_rows={total_rows}, target={target_rows}, cluster={cluster_size}, seed={seed}" + ); + verify_no_overlap(&selectors, total_rows); + } + } + + /// Fuzz: identical inputs (including the rng seed) must produce + /// the exact same selectors. Catches accidental reliance on + /// non-deterministic sources inside the windowing function. + #[test] + fn row_window_selection_fuzz_determinism() { + let mut driver = SmallRng::seed_from_u64(0xD474_F051_0000_0002); + for _ in 0..1_000 { + let total_rows = driver.random_range(1usize..=4_096); + let target_rows = driver.random_range(1usize..=total_rows); + let cluster_size = driver.random_range(1usize..=2_048); + let seed = driver.random::(); + let a = build_row_window_selectors( + total_rows, + target_rows, + cluster_size, + &mut SmallRng::seed_from_u64(seed), + ); + let b = build_row_window_selectors( + total_rows, + target_rows, + cluster_size, + &mut SmallRng::seed_from_u64(seed), + ); + assert_eq!(format!("{a:?}"), format!("{b:?}")); + } + } + + fn sum_selectors(selectors: &[RowSelector]) -> (usize, usize) { + let mut skip = 0usize; + let mut select = 0usize; + for s in selectors { + if s.skip { + skip += s.row_count; + } else { + select += s.row_count; + } + } + (skip, select) + } + + /// Walks selectors as a virtual cursor and asserts every row index + /// stays within `[0, total_rows]`. Because the layout is + /// `[skip, select, skip, select, ...]` over a single forward + /// cursor, no-overlap is equivalent to the cursor never going + /// backwards or past `total_rows` — both are checked here. + fn verify_no_overlap(selectors: &[RowSelector], total_rows: usize) { + let mut cursor: usize = 0; + for s in selectors { + let next = cursor.checked_add(s.row_count).expect("overflow"); + assert!( + next <= total_rows, + "selector runs past total_rows={total_rows}: cursor={cursor}, len={}", + s.row_count + ); + cursor = next; + } + assert_eq!(cursor, total_rows, "selectors must end at total_rows"); + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..4a318bf45f17f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -25,6 +25,7 @@ use crate::ParquetFileReaderFactory; use crate::opener::ParquetMorselizer; use crate::opener::build_pruning_predicates; use crate::row_filter::can_expr_be_pushed_down_with_schemas; +use crate::sampling::ParquetSampling; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; @@ -294,8 +295,17 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Optional sampling config. The fractions are deferred — the actual + /// "which row groups" / "which rows" decision is made inside the + /// opener once the parquet metadata is available. + pub(crate) sampling: ParquetSampling, } +// `ParquetSampling` lives in `crate::sampling` so the helpers that +// consume it (`apply_row_group_sampling`, `apply_row_fraction_sampling`) +// can be defined alongside the struct as methods rather than free +// functions in `opener.rs`. + impl ParquetSource { /// Create a new ParquetSource to read the data specified in the file scan /// configuration with the provided schema. @@ -319,9 +329,53 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + sampling: ParquetSampling::default(), } } + /// Sample only this fraction of row groups in each scanned file. + /// + /// `fraction` is in `(0.0, 1.0]`. The actual *which* row groups are + /// chosen is deferred until the opener has loaded the parquet footer + /// (so we can sample by real row-group index). Selection is + /// deterministic per `(file_name, row_group_count, fraction)`. + /// + /// All collected values from a sampled scan should be treated as + /// `Precision::Inexact` since the sample is not the full data. + pub fn with_row_group_sampling(mut self, fraction: f64) -> Self { + self.sampling.row_group_fraction = Some(fraction); + self + } + + /// Sample only this fraction of rows in each scanned row group, via + /// a `RowSelection` of K contiguous windows spread across the row + /// group. + /// + /// Pages aren't aligned across columns in parquet, so true "page + /// sampling" doesn't have a coherent table-level meaning. This + /// row-range form gives the same IO savings (the parquet reader + /// uses the page index to read only the data pages covering the + /// selected rows) but remains aligned across columns. + /// + /// Selection is deterministic-but-random per + /// `(file_name, row_group_index, fraction, cluster_size)`. + pub fn with_row_fraction(mut self, fraction: f64) -> Self { + self.sampling.row_fraction = Some(fraction); + self + } + + /// Override the per-row-group cluster size used by + /// [`Self::with_row_fraction`]. See [`ParquetSampling::row_cluster_size`]. + pub fn with_row_cluster_size(mut self, rows: usize) -> Self { + self.sampling.row_cluster_size = rows; + self + } + + /// Returns the current sampling config (mostly for introspection). + pub fn sampling(&self) -> &ParquetSampling { + &self.sampling + } + /// Set the `TableParquetOptions` for this ParquetSource. pub fn with_table_parquet_options( mut self, @@ -580,6 +634,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + sampling: self.sampling.clone(), })) } From f9182fbb42e72b980d13272b17cc151f5ba1c237 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 16:58:38 -0500 Subject: [PATCH 2/4] feat: SamplePushdown rule + Sample logical/physical nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the cross-cutting infrastructure for pushing TABLESAMPLE-shaped sampling into file sources, with parquet as the first absorbing source. There is no SQL surface yet; this commit only ships the primitives. Wiring a RelationPlanner / ExtensionPlanner so it works out of the box from SQL is the next commit in this stack. - `Sample` `UserDefinedLogicalNodeCore` extension node in `datafusion-expr` (`logical_plan/sample.rs`). Schema-preserving; validates `fraction ∈ (0, 1]`. Currently encodes `SampleMethod::System` only. - `SampleExec` placeholder in `datafusion-physical-plan`. Errors at `execute` (it's a marker — the `SamplePushdown` rule is expected to remove it). Implements filter / sort pushdown passthrough so unrelated optimizer rules see straight through it. - New `try_push_sample` method on `ExecutionPlan` and `FileSource`, returning `Absorbed { inner }` / `Passthrough` / `Unsupported { reason }`. Default is `Unsupported`; per-node `Passthrough` overrides on filter, projection, coalesce_batches, coalesce_partitions, repartition, and non-fetch sort. - `ParquetSource::try_push_sample` runs the (intentionally private) hierarchical block-level reduction across files / row groups / rows, with adaptive collapse when an axis can't reduce. Coordinates with the opener via `pub(crate)` `system_target_remaining` and `seed` fields on `ParquetSampling`. Single-file, single-row-group inputs hit ~p × N rows instead of undershooting at p^(1/3) × N. - `REPEATABLE(seed)` is plumbed all the way through: when set, `ParquetSampling::apply_row_group_sampling` and `apply_row_fraction_sampling` key only on `(seed, ...)` and ignore the file path, so the same query is reproducible across environments. - `SamplePushdown` optimizer rule (between `PushdownSort` and `EnsureCooperative`) walks top-down. On `Absorbed` it replaces `SampleExec` with the rebuilt source; on `Passthrough` it pushes through the single-child node and recurses; on `Unsupported` it errors at planning time with `"TABLESAMPLE is not supported for this source"`. There is intentionally no generic post-scan `SampleExec` yet. - EXPLAIN visibility: `ParquetSource::fmt_extra` surfaces `sample_system_target_remaining` when set. - `optimizer_rule_reference.md` updated to list `SamplePushdown` in the documented rule order. - `explain.slt` updated with `physical_plan after SamplePushdown SAME TEXT AS ABOVE` lines under each verbose-explain test. Tests: 7 unit tests on `ParquetSource::try_push_sample` covering the pushdown contract (full / single-file / multi-file / target clamping / REPEATABLE determinism / multi-file rounding compensation), and 3 opener end-to-end tests covering the adaptive split for single vs multi row group inputs and REPEATABLE-seed reproducibility across file paths. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/src/optimizer_rule_reference.md | 7 +- datafusion/datasource-parquet/src/opener.rs | 299 +++++++++++++++++- datafusion/datasource-parquet/src/sampling.rs | 49 ++- datafusion/datasource-parquet/src/source.rs | 258 +++++++++++++++ datafusion/datasource/src/file.rs | 49 +++ datafusion/datasource/src/source.rs | 60 +++- datafusion/expr/src/logical_plan/mod.rs | 2 + datafusion/expr/src/logical_plan/sample.rs | 269 ++++++++++++++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 8 + .../physical-optimizer/src/sample_pushdown.rs | 128 ++++++++ .../physical-plan/src/coalesce_batches.rs | 8 + .../physical-plan/src/coalesce_partitions.rs | 8 + .../physical-plan/src/execution_plan.rs | 31 ++ datafusion/physical-plan/src/filter.rs | 11 + datafusion/physical-plan/src/lib.rs | 3 + datafusion/physical-plan/src/projection.rs | 9 + .../physical-plan/src/repartition/mod.rs | 9 + datafusion/physical-plan/src/sample.rs | 268 ++++++++++++++++ .../physical-plan/src/sample_pushdown.rs | 96 ++++++ datafusion/physical-plan/src/sorts/sort.rs | 19 ++ .../sqllogictest/test_files/explain.slt | 4 + 22 files changed, 1566 insertions(+), 30 deletions(-) create mode 100644 datafusion/expr/src/logical_plan/sample.rs create mode 100644 datafusion/physical-optimizer/src/sample_pushdown.rs create mode 100644 datafusion/physical-plan/src/sample.rs create mode 100644 datafusion/physical-plan/src/sample_pushdown.rs diff --git a/datafusion/core/src/optimizer_rule_reference.md b/datafusion/core/src/optimizer_rule_reference.md index fcbb200c71624..86acb9051395c 100644 --- a/datafusion/core/src/optimizer_rule_reference.md +++ b/datafusion/core/src/optimizer_rule_reference.md @@ -88,6 +88,7 @@ in multiple phases. | 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. | | 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. | | 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. | -| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. | -| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. | -| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. | +| 20 | `SamplePushdown` | - | Pushes `TABLESAMPLE` into the source; errors at planning time if the sample can't be absorbed. | +| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. | +| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. | +| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. | diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 2f0f9b37cc314..886d075f7e49a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -894,24 +894,57 @@ impl FiltersPreparedParquetOpen { )?; // Apply optional row-group and row-range sampling now that we - // know the actual row-group count. Both calls are no-ops when - // their respective fraction is `None`. Selection is - // deterministic per `(partition_index, row_group_index, - // fraction, cluster_size)` so re-runs match. The execution - // `partition_index` is the stable per-file id we plumb in: - // it makes sampling reproducible across environments without + // know the actual row-group count. Selection is deterministic + // per `(partition_index, row_group_index, fraction, + // cluster_size)` so re-runs match. The execution + // `partition_index` is the stable per-file id we plumb in: it + // makes sampling reproducible across environments without // depending on object-store paths, and decorrelates files // assigned to different partitions. - prepared.sampling.apply_row_group_sampling( - &mut initial_plan, - rg_metadata.len(), - prepared.partition_index, - ); - prepared.sampling.apply_row_fraction_sampling( - &mut initial_plan, - rg_metadata, - prepared.partition_index, - ); + if let Some(remaining) = prepared.sampling.system_target_remaining { + // SYSTEM-mode adaptive split: when the SamplePushdown rule + // hands us a residual fraction `remaining`, choose the + // row-group / row split based on the row-group count we + // just observed. With ≥ 2 row groups split as + // `sqrt(remaining)` at both axes; with 1 row group skip + // row-group sampling and apply the full residual at the + // row level. Without this adaptation a single-file / + // single-row-group scan would only reach `cbrt(remaining)` + // of the rows (~46% for SYSTEM(10)). + let n_rg = rg_metadata.len(); + let mut adapted = prepared.sampling.clone(); + if n_rg >= 2 { + let q = remaining.sqrt(); + adapted.row_group_fraction = Some(q); + adapted.row_fraction = Some(q); + } else { + adapted.row_group_fraction = None; + adapted.row_fraction = Some(remaining); + } + adapted.apply_row_group_sampling( + &mut initial_plan, + n_rg, + prepared.partition_index, + ); + adapted.apply_row_fraction_sampling( + &mut initial_plan, + rg_metadata, + prepared.partition_index, + ); + } else { + // Legacy direct-builder path: each method is a no-op when + // its corresponding fraction is `None`. + prepared.sampling.apply_row_group_sampling( + &mut initial_plan, + rg_metadata.len(), + prepared.partition_index, + ); + prepared.sampling.apply_row_fraction_sampling( + &mut initial_plan, + rg_metadata, + prepared.partition_index, + ); + } let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan); @@ -1590,6 +1623,10 @@ fn create_initial_plan( Ok(ParquetAccessPlan::new_all(row_group_count)) } +// `apply_row_group_sampling` and `apply_row_fraction_sampling` live +// in `crate::sampling` so this file stays focused on the opener +// pipeline. + /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. @@ -1740,6 +1777,13 @@ mod test { self } + /// Set the partition_index plumbed through to the opener. + /// Sampling keys on this so different partitions decorrelate. + fn with_partition_index(mut self, partition_index: usize) -> Self { + self.partition_index = partition_index; + self + } + /// Set the object store (required for building). fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); @@ -2757,6 +2801,9 @@ mod test { ); } + // -- Sampling end-to-end (unit tests of the helpers themselves + // live in `crate::sampling`) ----------------------------------- + /// End-to-end: a parquet file with 4 row groups, scanned with /// `row_group_fraction = 0.5`, should return rows from exactly 2 /// of the 4 row groups. @@ -2864,4 +2911,224 @@ mod test { "row_fraction=0.1 should yield ~10-12 rows; got {num_rows}" ); } + + /// End-to-end for the SYSTEM-mode adaptive split on a *single* + /// row group: the file axis is fixed (handled at try_push_sample), + /// the row-group axis can't reduce, so the opener should apply the + /// full residual fraction at the row level. A naïve implementation + /// that always splits as `sqrt(remaining)` between row-group and + /// row would only achieve `sqrt(0.1) ≈ 32%` here. + #[tokio::test] + async fn system_target_remaining_single_row_group() { + let store = Arc::new(InMemory::new()) as Arc; + + let values: Vec> = (0..1000).map(Some).collect(); + let batch = record_batch!(("a", Int32, values)).unwrap(); + let schema = batch.schema(); + let data_len = + write_parquet(Arc::clone(&store), "tr_1rg.parquet", batch.clone()).await; + let file = PartitionedFile::new( + "tr_1rg.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let sampling = crate::sampling::ParquetSampling { + system_target_remaining: Some(0.1), + row_cluster_size: 4, + ..Default::default() + }; + + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_sampling(sampling) + .build(); + + let stream = open_file(&opener, file).await.unwrap(); + let (_num_batches, num_rows) = count_batches_and_rows(stream).await; + + // 10% of 1000 with small clusters = ~100 rows (window padding + // can push it slightly higher). The key invariant: NOT 32% + // (sqrt) and NOT 46% (cbrt). + assert!( + (50..=150).contains(&num_rows), + "single-RG SYSTEM(0.1) should hit ~100 rows; got {num_rows} \ + (would be ~316 if split as sqrt, ~464 if split as cbrt)" + ); + } + + /// End-to-end for the SYSTEM-mode adaptive split on multiple row + /// groups: the residual is split as `sqrt` between the row-group + /// and row axes, so the result is `sqrt(p) × sqrt(p) = p` of the + /// rows in expectation. + #[tokio::test] + async fn system_target_remaining_multi_row_group() { + let store = Arc::new(InMemory::new()) as Arc; + + // 4 row groups × 250 rows = 1000 rows. + let batches = (0..4) + .map(|g| { + let vals: Vec> = + ((g * 250)..(g * 250 + 250)).map(Some).collect(); + record_batch!(("a", Int32, vals)).unwrap() + }) + .collect::>(); + let schema = batches[0].schema(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(250)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "tr_4rg.parquet", + batches, + Some(props), + ) + .await; + let file = PartitionedFile::new( + "tr_4rg.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let sampling = crate::sampling::ParquetSampling { + system_target_remaining: Some(0.25), + row_cluster_size: 16, + ..Default::default() + }; + + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_sampling(sampling) + .build(); + + let stream = open_file(&opener, file).await.unwrap(); + let (_num_batches, num_rows) = count_batches_and_rows(stream).await; + + // sqrt(0.25) = 0.5 at each axis: keep 2 of 4 row groups + // (= 500 rows under scrutiny), then 50% of those rows + // = ~250 rows. Window padding can push it higher; assert the + // upper bound is well below the unsplit 1000. + assert!( + (100..=400).contains(&num_rows), + "multi-RG SYSTEM remaining=0.25 should hit ~250 rows; got {num_rows}" + ); + } + + /// Sampling must key on the execution `partition_index`, not the + /// on-disk file path: two parquet files with different names but + /// identical content opened at the same partition_index pick the + /// same rows. Different partition_index values must decorrelate. + /// This is what makes `TABLESAMPLE ... REPEATABLE(n)` reproducible + /// across environments without leaking object-store paths into the + /// sample seed. + #[tokio::test] + async fn system_target_remaining_sample_keys_on_partition_index() { + let store = Arc::new(InMemory::new()) as Arc; + + let values: Vec> = (0..100).map(Some).collect(); + let batch = record_batch!(("a", Int32, values)).unwrap(); + let schema = batch.schema(); + + // Two files with different names but identical content. + let len_a = + write_parquet(Arc::clone(&store), "first.parquet", batch.clone()).await; + let len_b = + write_parquet(Arc::clone(&store), "second_path.parquet", batch.clone()).await; + + let sampling = crate::sampling::ParquetSampling { + system_target_remaining: Some(0.5), + row_cluster_size: 4, + seed: Some(42), + ..Default::default() + }; + + // Same partition_index for both files -> same selection, + // regardless of file name. + let opener_p0 = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_partition_index(0) + .with_sampling(sampling.clone()) + .build(); + + let stream_a = open_file( + &opener_p0, + PartitionedFile::new( + "first.parquet".to_string(), + u64::try_from(len_a).unwrap(), + ), + ) + .await + .unwrap(); + let stream_b = open_file( + &opener_p0, + PartitionedFile::new( + "second_path.parquet".to_string(), + u64::try_from(len_b).unwrap(), + ), + ) + .await + .unwrap(); + + let rows_a = collect_values(stream_a).await; + let rows_b = collect_values(stream_b).await; + + assert_eq!( + rows_a, rows_b, + "sampling must key on partition_index, not file name" + ); + assert!( + !rows_a.is_empty() && rows_a.len() < 100, + "expected a strict subset; got {} rows", + rows_a.len() + ); + + // Different partition_index -> uncorrelated samples even with + // the same seed. Otherwise every file in a parallel scan would + // pick the same row indices, defeating file-axis randomisation. + let opener_p1 = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_partition_index(1) + .with_sampling(sampling) + .build(); + let stream_a_p1 = open_file( + &opener_p1, + PartitionedFile::new( + "first.parquet".to_string(), + u64::try_from(len_a).unwrap(), + ), + ) + .await + .unwrap(); + let rows_a_p1 = collect_values(stream_a_p1).await; + assert_ne!( + rows_a, rows_a_p1, + "different partition_index must produce different samples" + ); + } + + /// Helper: pull an `i32` column out of a sampled stream. + async fn collect_values( + mut stream: BoxStream<'static, Result>, + ) -> Vec { + use futures::StreamExt; + let mut out = Vec::new(); + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..col.len() { + out.push(col.value(i)); + } + } + out + } } diff --git a/datafusion/datasource-parquet/src/sampling.rs b/datafusion/datasource-parquet/src/sampling.rs index c89a9c3e5b8ae..552405b9d71cf 100644 --- a/datafusion/datasource-parquet/src/sampling.rs +++ b/datafusion/datasource-parquet/src/sampling.rs @@ -83,6 +83,23 @@ pub struct ParquetSampling { /// `ceil(target / row_cluster_size)` windows distributed across /// the row group with a random offset within each stride. pub row_cluster_size: usize, + /// Internal coordination channel between + /// `ParquetSource::try_push_sample` (the `FileSource` trait + /// method on [`crate::source::ParquetSource`]) and the parquet + /// opener. Not part of the public sampling API — direct callers + /// configure sampling via the per-axis fields above. See the + /// [`TABLESAMPLE clause`] section of the SQL reference for the + /// pushdown strategy this implements. + /// + /// [`TABLESAMPLE clause`]: https://datafusion.apache.org/user-guide/sql/select.html#tablesample-clause + pub(crate) system_target_remaining: Option, + /// Optional `REPEATABLE(seed)` value plumbed through from + /// `TABLESAMPLE`. When set, it is mixed into the per-row-group + /// seed so a user-supplied seed selects a different sample without + /// changing the keying on `file_index`. When unset, the same + /// `(file_index, row_group_index, fraction, cluster_size)` always + /// selects the same rows. + pub(crate) seed: Option, } impl Default for ParquetSampling { @@ -91,6 +108,8 @@ impl Default for ParquetSampling { row_group_fraction: None, row_fraction: None, row_cluster_size: 32_768, + system_target_remaining: None, + seed: None, } } } @@ -101,13 +120,16 @@ impl ParquetSampling { /// of the total. No-op if `row_group_fraction` is `None`, `>= 1.0`, /// or out of range. /// - /// Selection is deterministic given `(file_index, row_group_count, - /// fraction)`: we seed an `SmallRng` from a hash of those inputs - /// and use a partial Fisher-Yates shuffle. Same inputs → same - /// sample on re-runs. Different `file_index` values produce - /// uncorrelated samples even when row-group counts and fractions - /// match, so files in the same scan don't all keep the same - /// indices. + /// Selection is deterministic given `(self.seed, file_index, + /// row_group_count, fraction)`: we seed an `SmallRng` from a hash + /// of those inputs and use a partial Fisher-Yates shuffle. Same + /// inputs → same sample on re-runs. Different `file_index` values + /// produce uncorrelated samples even when row-group counts and + /// fractions match, so files in the same scan don't all keep the + /// same indices. `self.seed` (the `REPEATABLE(n)` value plumbed + /// from `TABLESAMPLE`) is mixed in so a user-supplied seed picks a + /// different sample without the planner-vs.-direct-builder paths + /// having to differ. pub(crate) fn apply_row_group_sampling( &self, plan: &mut ParquetAccessPlan, @@ -130,6 +152,7 @@ impl ParquetSampling { let seed = derive_seed( b"row-group", + self.seed, file_index, row_group_count, fraction, @@ -188,8 +211,14 @@ impl ParquetSampling { continue; } - let seed = - derive_seed(b"row-fraction", file_index, idx, fraction, cluster_size); + let seed = derive_seed( + b"row-fraction", + self.seed, + file_index, + idx, + fraction, + cluster_size, + ); let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); let target_rows = ((total_rows as f64) * fraction).ceil().max(1.0) as usize; @@ -216,6 +245,7 @@ impl ParquetSampling { /// data-integrity boundary. fn derive_seed( domain: &[u8], + repeatable_seed: Option, file_index: usize, secondary_index: usize, fraction: f64, @@ -224,6 +254,7 @@ fn derive_seed( use std::hash::{Hash, Hasher}; let mut hasher = std::collections::hash_map::DefaultHasher::new(); domain.hash(&mut hasher); + repeatable_seed.hash(&mut hasher); file_index.hash(&mut hasher); secondary_index.hash(&mut hasher); fraction.to_bits().hash(&mut hasher); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 4a318bf45f17f..e5890ad3ca60a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -688,6 +688,18 @@ impl FileSource for ParquetSource { write!(f, ", reverse_row_groups=true")?; } + // Surface the sampling config when set so that EXPLAIN + // reflects a TABLESAMPLE pushdown. + if let Some(target) = self.sampling.system_target_remaining { + write!(f, ", sample_system_target_remaining={target:.4}")?; + } + if let Some(rgf) = self.sampling.row_group_fraction { + write!(f, ", sample_row_group_fraction={rgf:.4}")?; + } + if let Some(rf) = self.sampling.row_fraction { + write!(f, ", sample_row_fraction={rf:.4}")?; + } + // Try to build the pruning predicates. // These are only generated here because it's useful to have *some* // idea of what pushdown is happening when viewing plans. @@ -901,6 +913,110 @@ impl FileSource for ParquetSource { Ok(tnr) } + + /// Absorb a `TABLESAMPLE`-shaped sample request into the parquet + /// scan: drop files, row groups, and rows in proportion to the + /// requested fraction, with no `SampleExec` left in the plan. + /// + /// `SYSTEM` sampling is the only supported method. The sampling + /// strategy (a hierarchical block-level reduction across files, + /// row groups, and rows) is described in the [`TABLESAMPLE clause`] + /// section of the SQL reference; it is intentionally not part of + /// the public sampling API and may evolve. + /// + /// [`TABLESAMPLE clause`]: https://datafusion.apache.org/user-guide/sql/select.html#tablesample-clause + fn try_push_sample( + &self, + spec: &datafusion_physical_plan::sample_pushdown::SampleSpec, + num_files: usize, + ) -> datafusion_common::Result + { + use datafusion_datasource::file::FileSourceSampleResult; + use datafusion_physical_plan::sample_pushdown::SampleMethod; + + // Implementation detail (not promised by the public API): the + // file axis uses ~p^(1/3) so the cube-root product across + // file × row-group × row stays at p; the opener handles the + // remaining row-group × row split adaptively based on the + // actual row-group count it sees per file. + + match spec.method { + SampleMethod::System => { + let p = spec.fraction.clamp(0.0, 1.0); + if p >= 1.0 { + // Trivial: no sampling needed. + return Ok(FileSourceSampleResult::Absorbed { + new_source: Arc::new(self.clone()), + keep_files: None, + }); + } + + let (keep_files, remaining_p) = if num_files <= 1 { + (None, p) + } else { + let q_file = p.cbrt(); + let target = ((num_files as f64) * q_file).ceil().max(1.0) as usize; + let target = target.min(num_files); + let actual_keep = (target as f64) / (num_files as f64); + let remaining = (p / actual_keep).clamp(0.0, 1.0); + let keep = if target == num_files { + None + } else { + Some(seeded_file_subset(num_files, target, spec.seed)) + }; + (keep, remaining) + }; + + // Hand off the residual fraction to the opener — it + // splits row-group × row adaptively once it knows how + // many row groups each file actually has. + let mut sampling = self.sampling.clone(); + sampling.system_target_remaining = Some(remaining_p); + // Plumb the user's REPEATABLE seed through to the + // opener so the row-group and row-fraction samplers + // can produce results that depend only on the seed, + // not on the file path. Without this the seed only + // governed the file-axis selection. + sampling.seed = spec.seed; + // Clear any explicit per-axis fractions so the opener's + // SYSTEM-mode adaptive split is the source of truth. + sampling.row_group_fraction = None; + sampling.row_fraction = None; + let new_source = ParquetSource { + sampling, + ..self.clone() + }; + + Ok(FileSourceSampleResult::Absorbed { + new_source: Arc::new(new_source), + keep_files, + }) + } + } + } +} + +/// Deterministic-random subset of `[0, num_files)` of size `target`. +/// `seed` (when set) is mixed in so the user's `REPEATABLE(seed)` +/// produces a reproducible selection across runs. +fn seeded_file_subset(num_files: usize, target: usize, seed: Option) -> Vec { + use rand::SeedableRng; + use rand::seq::SliceRandom; + use std::hash::{Hash, Hasher}; + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + "parquet-sample-files".hash(&mut hasher); + num_files.hash(&mut hasher); + target.hash(&mut hasher); + seed.unwrap_or(0xC0FFEE_F11ECAFE).hash(&mut hasher); + let rng_seed = hasher.finish(); + + let mut rng = rand::rngs::SmallRng::seed_from_u64(rng_seed); + let mut indices: Vec = (0..num_files).collect(); + indices.shuffle(&mut rng); + indices.truncate(target); + indices.sort_unstable(); + indices } #[cfg(test)] @@ -1001,4 +1117,146 @@ mod tests { assert!(source.reverse_row_groups()); assert!(source.filter().is_some()); } + + /// Helper: invoke `try_push_sample` on a fresh `ParquetSource` and + /// return the absorbed source plus `keep_files`. Panics if the + /// source returns `Unsupported` (it never should for SYSTEM). + fn push_system_sample( + num_files: usize, + fraction: f64, + seed: Option, + ) -> (Arc, Option>) { + use datafusion_datasource::file::FileSourceSampleResult; + use datafusion_physical_plan::sample_pushdown::{SampleMethod, SampleSpec}; + + let source = ParquetSource::new(Arc::new(Schema::empty())); + let spec = SampleSpec { + method: SampleMethod::System, + fraction, + seed, + }; + match source.try_push_sample(&spec, num_files).unwrap() { + FileSourceSampleResult::Absorbed { + new_source, + keep_files, + } => { + // Downcast back to ParquetSource for assertions. + let arc_any: Arc = new_source; + let parquet = arc_any + .downcast::() + .expect("absorbed source should be a ParquetSource"); + (parquet, keep_files) + } + FileSourceSampleResult::Unsupported { reason } => { + panic!("SYSTEM should always be supported; got Unsupported: {reason}") + } + } + } + + #[test] + fn try_push_sample_system_full_is_noop() { + let (source, keep) = push_system_sample(4, 1.0, None); + // Fraction >= 1.0: nothing to drop, sampling config stays default. + assert!(keep.is_none(), "no files should be dropped at fraction=1.0"); + assert!(source.sampling().system_target_remaining.is_none()); + assert!(source.sampling().row_group_fraction.is_none()); + assert!(source.sampling().row_fraction.is_none()); + } + + /// With one file the file axis can't reduce, so the *entire* + /// requested fraction must flow through to the opener as + /// `system_target_remaining`. Otherwise the cube-root math would + /// undershoot — `SYSTEM(10)` over a single file would only keep + /// `cbrt(0.1) ≈ 46%` of the rows. + #[test] + fn try_push_sample_system_single_file_passes_full_fraction() { + let (source, keep) = push_system_sample(1, 0.1, Some(42)); + assert!(keep.is_none(), "single-file scans should never drop files"); + let target = source + .sampling() + .system_target_remaining + .expect("system_target_remaining must be set"); + assert!( + (target - 0.1).abs() < 1e-12, + "single-file remaining must equal p, got {target}" + ); + // The legacy explicit fractions must NOT be set — they would + // confuse the opener's adaptive split. + assert!(source.sampling().row_group_fraction.is_none()); + assert!(source.sampling().row_fraction.is_none()); + } + + /// With multiple files we drop ~cbrt(p) of them and pass the + /// residual `p × num_files / target_files` to the opener so the + /// expected output stays at `p × N_total`. + #[test] + fn try_push_sample_system_multi_file_residual_compensates_rounding() { + // 8 files, p=0.125 → q_file=cbrt(0.125)=0.5 → keep ⌈8 × 0.5⌉ = 4. + // remaining = 0.125 × 8 / 4 = 0.25 = p^(2/3). + let (source, keep) = push_system_sample(8, 0.125, Some(7)); + let kept = keep.expect("multi-file scan should produce keep_files"); + assert_eq!(kept.len(), 4, "expected 4 files kept, got {kept:?}"); + let target = source + .sampling() + .system_target_remaining + .expect("system_target_remaining must be set"); + let expected = 0.125_f64 * 8.0 / 4.0; + assert!( + (target - expected).abs() < 1e-12, + "remaining must compensate for rounding: expected {expected}, got {target}" + ); + } + + #[test] + fn try_push_sample_system_drops_files_for_multi_file_scan() { + // 8 files, p=0.125 → q=cbrt(0.125)=0.5 → keep ⌈8 * 0.5⌉ = 4 files. + let (_, keep) = push_system_sample(8, 0.125, Some(42)); + let kept = keep.expect("multi-file scan should produce keep_files"); + assert_eq!(kept.len(), 4, "expected 4 files kept, got {kept:?}"); + for w in kept.windows(2) { + assert!(w[0] < w[1], "kept indices must be strictly sorted"); + } + assert!(kept.iter().all(|&i| i < 8), "indices must be < num_files"); + } + + #[test] + fn try_push_sample_system_keeps_at_least_one_file() { + // 100 files, p=1e-9 → q ≈ 0.001, target = ⌈100 * 0.001⌉ = 1. + let (_, keep) = push_system_sample(100, 1e-9, None); + let kept = keep.expect("multi-file scan should produce keep_files"); + assert_eq!(kept.len(), 1, "must always keep at least one file"); + } + + #[test] + fn try_push_sample_system_repeatable_seed_is_deterministic() { + let (_, a) = push_system_sample(16, 0.3, Some(99)); + let (_, b) = push_system_sample(16, 0.3, Some(99)); + assert_eq!( + a, b, + "REPEATABLE seed must produce identical file selection" + ); + + let (_, c) = push_system_sample(16, 0.3, Some(100)); + assert_ne!(a, c, "different seeds should produce different selections"); + } + + #[test] + fn try_push_sample_system_target_clamped_to_num_files() { + // p=0.99 → q=cbrt(0.99)≈0.9967 → ⌈4 * 0.9967⌉ = 4 (= num_files). + // When target == num_files we still set system_target_remaining + // (the opener still needs to sample within files), but + // keep_files is None so the caller skips the file_groups + // rebuild. + let (source, keep) = push_system_sample(4, 0.99, None); + assert!( + keep.is_none(), + "target == num_files must short-circuit to None" + ); + let target = source + .sampling() + .system_target_remaining + .expect("system_target_remaining must be set"); + // 0.99 × 4 / 4 = 0.99 + assert!((target - 0.99).abs() < 1e-12); + } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9b4ae5827ae8b..e42151b3cb36f 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -29,6 +29,29 @@ use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; + +/// Result of a [`FileSource::try_push_sample`] call. +pub enum FileSourceSampleResult { + /// The source has absorbed the sample. The + /// [`crate::source::DataSourceExec`] caller should: + /// + /// 1. Replace the file source on its [`FileScanConfig`] with + /// [`Self::Absorbed::new_source`]. + /// 2. Drop any files whose flat-list index is *not* in + /// [`Self::Absorbed::keep_files`] (None = keep all files). + Absorbed { + new_source: Arc, + /// Optional file-level subset (flat-list indices of + /// `PartitionedFile`s to keep). `None` means keep every file. + keep_files: Option>, + }, + /// The source can't sample. The pushdown rule will surface the + /// reason as a planning error today. + Unsupported { + /// Human-readable reason; included in the error message. + reason: String, + }, +} use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; @@ -340,6 +363,32 @@ pub trait FileSource: Any + Send + Sync { None } + /// Try to push a TABLESAMPLE-shaped sample request into this file + /// source. + /// + /// Sources that can configure native sampling (e.g. parquet, via + /// row-group / row-window selection in the opener) return + /// [`FileSourceSampleResult::Absorbed`] with the new source and an + /// optional list of `PartitionedFile` indices the rule should keep + /// (drop the rest at the `FileScanConfig` level). For SYSTEM + /// sampling parquet uses a hierarchical "cube-root" split across + /// files, row groups, and rows, so the rule has to drop files at + /// the outer layer where this trait can't reach. + /// + /// Default implementation returns `Unsupported`. + fn try_push_sample( + &self, + _spec: &datafusion_physical_plan::sample_pushdown::SampleSpec, + _num_files: usize, + ) -> Result { + Ok(FileSourceSampleResult::Unsupported { + reason: format!( + "{}: file source sample pushdown is not implemented", + self.file_type() + ), + }) + } + /// Apply a function to all physical expressions used by this file source. /// /// This includes: diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..7c5061aa80a7a 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -37,8 +37,9 @@ use datafusion_physical_plan::{ }; use itertools::Itertools; -use crate::file::FileSource; -use crate::file_scan_config::FileScanConfig; +use crate::file::{FileSource, FileSourceSampleResult}; +use crate::file_groups::FileGroup; +use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Constraints, Result, Statistics}; @@ -563,6 +564,61 @@ impl ExecutionPlan for DataSourceExec { new_exec.execution_state = Arc::new(OnceLock::new()); Ok(Arc::new(new_exec)) } + + fn try_push_sample( + self: Arc, + spec: &datafusion_physical_plan::sample_pushdown::SampleSpec, + ) -> Result { + use datafusion_physical_plan::sample_pushdown::SamplePushdownResult; + + // Only file-scan sources have a place to plug sampling in. + let Some(file_cfg) = self.data_source.downcast_ref::() else { + return Ok(SamplePushdownResult::Unsupported { + reason: "DataSourceExec: non-file source has no native sampling support" + .to_string(), + }); + }; + + let num_files: usize = file_cfg.file_groups.iter().map(|g| g.len()).sum(); + match file_cfg.file_source.try_push_sample(spec, num_files)? { + FileSourceSampleResult::Unsupported { reason } => { + Ok(SamplePushdownResult::Unsupported { reason }) + } + FileSourceSampleResult::Absorbed { + new_source, + keep_files, + } => { + let mut builder = + FileScanConfigBuilder::from(file_cfg.clone()).with_source(new_source); + if let Some(keep) = keep_files { + let kept: std::collections::HashSet = + keep.into_iter().collect(); + let mut new_groups: Vec = Vec::new(); + let mut idx = 0usize; + for group in &file_cfg.file_groups { + let kept_files: Vec<_> = group + .iter() + .filter(|_| { + let here = idx; + idx += 1; + kept.contains(&here) + }) + .cloned() + .collect(); + if !kept_files.is_empty() { + new_groups.push(FileGroup::new(kept_files)); + } + } + builder = builder.with_file_groups(new_groups); + } + let new_cfg = builder.build(); + let new_exec = Self::new(Arc::new(new_cfg)); + Ok(SamplePushdownResult::Absorbed { + inner: Arc::new(new_exec) as Arc, + }) + } + } + } } impl DataSourceExec { diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index c2b01868c97f3..d69db90522edb 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -23,6 +23,7 @@ mod extension; pub(crate) mod invariants; pub use invariants::{InvariantLevel, assert_expected_schema, check_subquery_expr}; mod plan; +pub mod sample; mod statement; pub mod tree_node; @@ -56,3 +57,4 @@ pub use datafusion_common::format::ExplainFormat; pub use display::display_schema; pub use extension::{UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; +pub use sample::{Sample, SampleMethod, sample_plan}; diff --git a/datafusion/expr/src/logical_plan/sample.rs b/datafusion/expr/src/logical_plan/sample.rs new file mode 100644 index 0000000000000..30059dae0b13f --- /dev/null +++ b/datafusion/expr/src/logical_plan/sample.rs @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logical [`Sample`] extension node, the lowering target for +//! `TABLESAMPLE` (SQL) and friends. This is intentionally an +//! [`UserDefinedLogicalNodeCore`] rather than a first-class +//! [`LogicalPlan`] variant — promoting it to a built-in variant +//! is a follow-up touching every visitor / serializer / optimizer +//! that match-arms `LogicalPlan`. +//! +//! Issue: + +use std::cmp::Ordering; +use std::fmt::{self, Debug, Display}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use datafusion_common::{DFSchemaRef, plan_err}; + +use crate::logical_plan::extension::UserDefinedLogicalNodeCore; +use crate::{Expr, LogicalPlan}; + +/// SQL TABLESAMPLE method. +/// +/// Currently only [`SampleMethod::System`] is supported end-to-end. +/// `BERNOULLI` parses but is rejected at planning time until a +/// generic post-scan filter implementation lands. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SampleMethod { + /// Block-level sampling (Postgres `SYSTEM`, Hive `BLOCK`). + /// Implemented as a hierarchical hybrid across files, row + /// groups, and rows so the IO win at small fractions doesn't + /// concentrate at a single granularity. + System, +} + +impl Display for SampleMethod { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SampleMethod::System => write!(f, "SYSTEM"), + } + } +} + +/// Logical extension node representing a TABLESAMPLE clause. +/// +/// `fraction` is in `(0.0, 1.0]`. `seed` is the optional `REPEATABLE` +/// seed; selection is deterministic when seeded. +/// +/// Schema is the same as the input — sampling drops rows but never +/// changes the column shape. +#[derive(Debug, Clone)] +pub struct Sample { + pub input: Arc, + pub method: SampleMethod, + pub fraction: f64, + pub seed: Option, + schema: DFSchemaRef, +} + +impl Sample { + /// Construct a [`Sample`] over `input`. Validates that + /// `fraction` is in `(0.0, 1.0]`. + pub fn try_new( + input: Arc, + method: SampleMethod, + fraction: f64, + seed: Option, + ) -> datafusion_common::Result { + if !fraction.is_finite() || fraction <= 0.0 || fraction > 1.0 { + return plan_err!( + "TABLESAMPLE fraction must be in (0.0, 1.0]; got {fraction}" + ); + } + let schema = Arc::clone(input.schema()); + Ok(Self { + input, + method, + fraction, + seed, + schema, + }) + } +} + +// `UserDefinedLogicalNodeCore` requires PartialEq + Eq + Hash + PartialOrd. +// Floats don't implement Eq/Hash/Ord directly; we hash + compare on the +// bit pattern of `fraction`, which is consistent with the way DataFusion +// hashes float ScalarValues elsewhere. + +impl PartialEq for Sample { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.input, &other.input) + && self.method == other.method + && self.fraction.to_bits() == other.fraction.to_bits() + && self.seed == other.seed + } +} + +impl Eq for Sample {} + +impl Hash for Sample { + fn hash(&self, state: &mut H) { + // Use Arc address for input; downstream is opaque. + (Arc::as_ptr(&self.input) as usize).hash(state); + self.method.hash(state); + self.fraction.to_bits().hash(state); + self.seed.hash(state); + } +} + +impl PartialOrd for Sample { + fn partial_cmp(&self, other: &Self) -> Option { + // Order by (method, fraction-bits, seed). Inputs aren't + // ordered (LogicalPlan doesn't impl Ord); ignore for ordering. + Some( + self.method + .cmp(&other.method) + .then(self.fraction.to_bits().cmp(&other.fraction.to_bits())) + .then(self.seed.cmp(&other.seed)), + ) + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![self.input.as_ref()] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + Vec::new() + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Sample: {method}({pct:.4}%)", + method = self.method, + pct = self.fraction * 100.0, + )?; + if let Some(seed) = self.seed { + write!(f, " REPEATABLE({seed})")?; + } + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + exprs: Vec, + mut inputs: Vec, + ) -> datafusion_common::Result { + if !exprs.is_empty() { + return plan_err!("Sample takes no expressions; got {}", exprs.len()); + } + if inputs.len() != 1 { + return plan_err!("Sample takes exactly one input; got {}", inputs.len()); + } + let input = Arc::new(inputs.swap_remove(0)); + Self::try_new(input, self.method, self.fraction, self.seed) + } + + fn supports_limit_pushdown(&self) -> bool { + // LIMIT(SAMPLE(x)) ≠ SAMPLE(LIMIT(x)) — pushing limit below + // a sample changes the population we sample from. + false + } +} + +/// Convenience: wrap a `LogicalPlan` in a `Sample` extension node. +pub fn sample_plan( + input: Arc, + method: SampleMethod, + fraction: f64, + seed: Option, +) -> datafusion_common::Result { + let sample = Sample::try_new(input, method, fraction, seed)?; + Ok(LogicalPlan::Extension(crate::logical_plan::Extension { + node: Arc::new(sample), + })) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::logical_plan::{EmptyRelation, LogicalPlan}; + use datafusion_common::DFSchema; + + fn empty_plan() -> Arc { + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + })) + } + + /// `fmt::Formatter::new` is nightly-only; route through Display. + struct Explain<'a>(&'a Sample); + impl Display for Explain<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt_for_explain(f) + } + } + fn explain(s: &Sample) -> String { + format!("{}", Explain(s)) + } + + #[test] + fn sample_validates_fraction_range() { + let p = empty_plan(); + assert!( + Sample::try_new(Arc::clone(&p), SampleMethod::System, 0.0, None).is_err() + ); + assert!( + Sample::try_new(Arc::clone(&p), SampleMethod::System, 1.5, None).is_err() + ); + assert!( + Sample::try_new(Arc::clone(&p), SampleMethod::System, f64::NAN, None) + .is_err() + ); + assert!(Sample::try_new(Arc::clone(&p), SampleMethod::System, 0.5, None).is_ok()); + assert!(Sample::try_new(Arc::clone(&p), SampleMethod::System, 1.0, None).is_ok()); + } + + #[test] + fn sample_explain_format() { + let p = empty_plan(); + let s = Sample::try_new(p, SampleMethod::System, 0.1, Some(42)).unwrap(); + assert_eq!(explain(&s), "Sample: SYSTEM(10.0000%) REPEATABLE(42)"); + } + + #[test] + fn sample_explain_format_no_seed() { + let p = empty_plan(); + let s = Sample::try_new(p, SampleMethod::System, 0.05, None).unwrap(); + assert_eq!(explain(&s), "Sample: SYSTEM(5.0000%)"); + } + + #[test] + fn sample_with_exprs_and_inputs_rebuilds() { + let p = empty_plan(); + let s = + Sample::try_new(Arc::clone(&p), SampleMethod::System, 0.1, Some(7)).unwrap(); + let rebuilt = s.with_exprs_and_inputs(vec![], vec![(*p).clone()]).unwrap(); + assert_eq!(rebuilt.method, SampleMethod::System); + assert_eq!(rebuilt.fraction, 0.1); + assert_eq!(rebuilt.seed, Some(7)); + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 5fac8948b7f04..b6524c43392cd 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -41,6 +41,7 @@ pub mod projection_pushdown; pub use datafusion_pruning as pruning; pub mod hash_join_buffering; pub mod pushdown_sort; +pub mod sample_pushdown; pub mod sanity_checker; pub mod topk_aggregation; pub mod topk_repartition; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 05df642f8446b..8cee6100045d7 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -39,6 +39,7 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::hash_join_buffering::HashJoinBuffering; use crate::limit_pushdown_past_window::LimitPushPastWindows; use crate::pushdown_sort::PushdownSort; +use crate::sample_pushdown::SamplePushdown; use crate::window_topn::WindowTopN; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; @@ -225,6 +226,13 @@ impl PhysicalOptimizer { Arc::new(ProjectionPushdown::new()), // PushdownSort: Detect sorts that can be pushed down to data sources. Arc::new(PushdownSort::new()), + // SamplePushdown: Push TABLESAMPLE into the source. Must run + // after FilterPushdown / ProjectionPushdown so that scan nodes + // are in their final shape, and after PushdownSort so a + // fetched SortExec — which blocks sample passthrough — has + // already absorbed its limit. Errors at planning time if + // pushdown is blocked. + Arc::new(SamplePushdown), Arc::new(EnsureCooperative::new()), // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. // Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. diff --git a/datafusion/physical-optimizer/src/sample_pushdown.rs b/datafusion/physical-optimizer/src/sample_pushdown.rs new file mode 100644 index 0000000000000..e7a6ef37c607f --- /dev/null +++ b/datafusion/physical-optimizer/src/sample_pushdown.rs @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `SamplePushdown` — push a [`SampleExec`] into the source it sits +//! above. +//! +//! The rule walks the physical plan top-down. For each `SampleExec` +//! it finds, it asks the immediate child via +//! [`ExecutionPlan::try_push_sample`] what to do: +//! +//! * [`SamplePushdownResult::Absorbed`]: the child has incorporated +//! the sample. Replace the `SampleExec` with the new child. +//! * [`SamplePushdownResult::Passthrough`]: the child is row-preserving +//! for sampling (filter, projection, coalesce, repartition, +//! non-fetch sort). Recurse into the child's single input and, if +//! that recursion succeeds, rebuild the child with the new +//! grandchild and drop the original `SampleExec`. +//! * [`SamplePushdownResult::Unsupported`]: pushdown stops here. +//! Today this is a planning error — a generic post-scan filter +//! exec is a follow-up. +//! +//! Run order: this rule must come *after* the rules that may +//! introduce or rewrite scan nodes (filter pushdown, projection +//! pushdown). It runs before `SanityCheckPlan` so any leftover +//! `SampleExec` produces a clean error. +//! +//! [`SampleExec`]: datafusion_physical_plan::sample::SampleExec +//! [`SamplePushdownResult`]: datafusion_physical_plan::sample_pushdown::SamplePushdownResult + +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::sample::SampleExec; +use datafusion_physical_plan::sample_pushdown::{SamplePushdownResult, SampleSpec}; + +use crate::PhysicalOptimizerRule; + +/// Optimizer rule that attempts to push every `SampleExec` into the +/// source below it. +#[derive(Default, Debug)] +pub struct SamplePushdown; + +impl PhysicalOptimizerRule for SamplePushdown { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_down(|node| { + let Some(sample) = node.as_ref().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + let spec = sample.spec(); + let child = Arc::clone(sample.input()); + match push_into(child, &spec)? { + Pushdown::Pushed(new_child) => Ok(Transformed::yes(new_child)), + Pushdown::Failed(reason) => { + datafusion_common::plan_err!( + "TABLESAMPLE could not be pushed down: {reason}. \ + A generic post-scan SampleExec is not yet implemented; \ + see https://github.com/apache/datafusion/issues/16533" + ) + } + } + }) + .data() + } + + fn name(&self) -> &str { + "SamplePushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +enum Pushdown { + /// Sample fully absorbed; the surrounding `SampleExec` should be + /// replaced with this node. + Pushed(Arc), + /// Pushdown blocked at some descendant; planning should error. + Failed(String), +} + +fn push_into(node: Arc, spec: &SampleSpec) -> Result { + match Arc::clone(&node).try_push_sample(spec)? { + SamplePushdownResult::Absorbed { inner } => Ok(Pushdown::Pushed(inner)), + SamplePushdownResult::Passthrough => { + // Single-child commute. Multi-child nodes shouldn't return + // Passthrough — guard defensively. + let children = node.children(); + if children.len() != 1 { + return Ok(Pushdown::Failed(format!( + "{}: Passthrough returned but node has {} children", + node.name(), + children.len() + ))); + } + let child = Arc::clone(children[0]); + match push_into(child, spec)? { + Pushdown::Pushed(new_grandchild) => { + let rebuilt = node.with_new_children(vec![new_grandchild])?; + Ok(Pushdown::Pushed(rebuilt)) + } + Pushdown::Failed(reason) => Ok(Pushdown::Failed(reason)), + } + } + SamplePushdownResult::Unsupported { reason } => Ok(Pushdown::Failed(reason)), + } +} diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 2bf046f03b6cf..15ccd32f03ef4 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -288,6 +288,14 @@ impl ExecutionPlan for CoalesceBatchesExec { ) as Arc) }) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // Coalescing batches is row-preserving and commutes with sampling. + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 9290d725165e9..54891e80e720e 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -345,6 +345,14 @@ impl ExecutionPlan for CoalescePartitionsExec { } }) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // Merging partitions is row-preserving; sampling commutes. + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..a7028f956cad8 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -22,6 +22,7 @@ use crate::filter_pushdown::{ }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; +use crate::sample_pushdown::{SamplePushdownResult, SampleSpec}; use crate::sort_pushdown::SortOrderPushdownResult; pub use crate::stream::EmptyRecordBatchStream; @@ -789,6 +790,36 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Try to push a TABLESAMPLE-shaped sample request through this + /// node. Used by the `SamplePushdown` physical optimizer rule to + /// turn a `SampleExec(child)` into `child'` with sampling + /// configured on (or absorbed by) the source. + /// + /// Per-node responsibilities: + /// * Source-like nodes that can natively sample (e.g. + /// `DataSourceExec(ParquetSource)`) return + /// [`SamplePushdownResult::Absorbed`] with a new node configured + /// to produce the requested sample. + /// * Row-preserving nodes (filter, projection, coalesce, + /// repartition, non-fetch sort) that commute with row-level + /// sampling return [`SamplePushdownResult::Passthrough`]; the + /// rule recurses into the child and rebuilds this node. + /// * All other nodes return [`SamplePushdownResult::Unsupported`]. + /// + /// The default implementation returns `Unsupported` so adding new + /// node types stays a compile-time-safe opt-in. + fn try_push_sample( + self: Arc, + _spec: &SampleSpec, + ) -> Result { + Ok(SamplePushdownResult::Unsupported { + reason: format!( + "{}: sample pushdown is not implemented for this node", + self.name(), + ), + }) + } } impl dyn ExecutionPlan { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 50efe8f5092e8..5cfcc73914e66 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -787,6 +787,17 @@ impl ExecutionPlan for FilterExec { .ok() }) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // `sample(filter(x))` ≡ `filter(sample(x))` distributionally + // (sampling and filtering commute), and pushing the sample + // below the filter is also strictly better for IO — rows are + // dropped before the predicate sees them. + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 3005e975424b4..94a7ab3064ba0 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -48,6 +48,7 @@ pub use crate::execution_plan::{ }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; +pub use crate::sample_pushdown::{SampleMethod, SamplePushdownResult, SampleSpec}; pub use crate::sort_pushdown::SortOrderPushdownResult; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; @@ -85,6 +86,8 @@ pub mod placeholder_row; pub mod projection; pub mod recursive_query; pub mod repartition; +pub mod sample; +pub mod sample_pushdown; pub mod scalar_subquery; pub mod sort_pushdown; pub mod sorts; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e5b91fbb1c5d4..328576fbb5291 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -506,6 +506,15 @@ impl ExecutionPlan for ProjectionExec { .ok() }) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // Column projection commutes with row sampling — selecting + // which columns survive is independent of which rows do. + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } } impl ProjectionStream { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..7eca29131d6de 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1328,6 +1328,15 @@ impl ExecutionPlan for RepartitionExec { cache: new_properties.into(), }))) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // Repartitioning preserves the row set (just redistributes + // across partitions); sampling commutes. + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sample.rs b/datafusion/physical-plan/src/sample.rs new file mode 100644 index 0000000000000..748f04e6356f4 --- /dev/null +++ b/datafusion/physical-plan/src/sample.rs @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical [`SampleExec`] node — the lowering target of a logical +//! `Sample` extension. The optimizer rule `SamplePushdown` walks the +//! plan and pushes the sampling into the source (or through transparent +//! intermediate nodes) where possible. +//! +//! Today, executing a `SampleExec` directly returns a `not implemented` +//! error: the rule must successfully push it down for the query to run. +//! When a generic post-scan filter implementation lands, this will +//! become the default fallback for plan shapes pushdown can't handle. + +use std::any::Any; +use std::fmt::{self, Formatter}; +use std::sync::Arc; + +use crate::execution_plan::ExecutionPlanProperties; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; +use crate::sample_pushdown::{SampleMethod, SampleSpec}; +use crate::sort_pushdown::SortOrderPushdownResult; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::not_impl_err; +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; + +/// Passthrough physical node carrying TABLESAMPLE intent. +/// +/// Output schema, partitioning, and ordering are inherited unchanged +/// from the input — sampling drops rows but nothing else. +#[derive(Debug, Clone)] +pub struct SampleExec { + input: Arc, + method: SampleMethod, + fraction: f64, + seed: Option, + cache: Arc, +} + +impl SampleExec { + /// Construct a [`SampleExec`] over `input`. Caller is expected to + /// have validated `fraction` upstream (the logical `Sample` node + /// does this in `try_new`). + pub fn new( + input: Arc, + method: SampleMethod, + fraction: f64, + seed: Option, + ) -> Self { + let cache = Arc::new(Self::compute_properties(&input)); + Self { + input, + method, + fraction, + seed, + cache, + } + } + + /// Convenience: construct from a [`SampleSpec`]. + pub fn from_spec(input: Arc, spec: &SampleSpec) -> Self { + Self::new(input, spec.method, spec.fraction, spec.seed) + } + + /// Sampling method. + pub fn method(&self) -> SampleMethod { + self.method + } + /// Target fraction in `(0.0, 1.0]`. + pub fn fraction(&self) -> f64 { + self.fraction + } + /// Optional `REPEATABLE(seed)`. + pub fn seed(&self) -> Option { + self.seed + } + /// Input plan. + pub fn input(&self) -> &Arc { + &self.input + } + /// The [`SampleSpec`] this node represents. + pub fn spec(&self) -> SampleSpec { + SampleSpec { + method: self.method, + fraction: self.fraction, + seed: self.seed, + } + } + + fn compute_properties(input: &Arc) -> PlanProperties { + // Sampling preserves equivalence properties (subset of rows + // satisfies the same equivalences) and partitioning/ordering. + // Bounded if the input is bounded; otherwise sampling + // semantics over an unbounded stream aren't well-defined here + // and pushdown will reject it before we reach execution. + PlanProperties::new( + input.equivalence_properties().clone(), + input.output_partitioning().clone(), + input.pipeline_behavior(), + input.boundedness(), + ) + } +} + +impl DisplayAs for SampleExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let method = match self.method { + SampleMethod::System => "SYSTEM", + }; + write!(f, "SampleExec: {method}({:.4}%)", self.fraction * 100.0)?; + if let Some(seed) = self.seed { + write!(f, " REPEATABLE({seed})")?; + } + Ok(()) + } + DisplayFormatType::TreeRender => write!(f, "SampleExec"), + } + } +} + +impl ExecutionPlan for SampleExec { + fn name(&self) -> &'static str { + "SampleExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 1 { + return Err(datafusion_common::DataFusionError::Internal(format!( + "SampleExec wrong number of children: {}", + children.len() + ))); + } + let mut child = children; + Ok(Arc::new(SampleExec::new( + child.swap_remove(0), + self.method, + self.fraction, + self.seed, + ))) + } + + // ---- Make SampleExec transparent to other pushdown rules so they ---- + // ---- aren't blocked when a Sample sits between them and the source. ---- + // + // Filter pushdown: `sample(filter(x))` ≡ `filter(sample(x))`, so any + // filters the parent wants to push past us can pass through. + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } + + // Sort pushdown: sampling preserves ordering, so a sort requirement + // can pass through us. We'd never get a `SortExec(SampleExec(..))` + // shape in practice (sort + sample is unusual), but providing this + // keeps the rule from giving up unnecessarily. + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + self.input.try_pushdown_sort(order)?.try_map(|new_input| { + Ok(Arc::new(SampleExec::new( + new_input, + self.method, + self.fraction, + self.seed, + )) as Arc) + }) + } + + // Limit pushdown: deliberately NOT supported. `LIMIT(SAMPLE(x))` + // (sample first, then take first N) is not the same as + // `SAMPLE(LIMIT(x))` (take first N, then sample). Default impl + // returns false; we leave it explicit here for documentation. + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + // The pushdown rule should have either (a) absorbed this node + // into the source or (b) emitted a planning error. If we + // reach `execute()`, neither happened — that's a planner bug + // until a generic SampleExec filter lands. + not_impl_err!( + "SampleExec could not be pushed into the source and a generic \ + post-scan sample filter is not yet implemented; see \ + https://github.com/apache/datafusion/issues/16533" + ) + } +} + +impl SampleExec { + /// Allow downstream code (mostly the pushdown rule) to treat + /// `&dyn ExecutionPlan` as `Self` ergonomically. + pub fn downcast_arc(plan: &Arc) -> Option<&SampleExec> { + plan.as_ref().downcast_ref::() + } + + /// `as_any` companion for `dyn ExecutionPlan`. + pub fn as_any_ref(&self) -> &dyn Any { + self + } +} diff --git a/datafusion/physical-plan/src/sample_pushdown.rs b/datafusion/physical-plan/src/sample_pushdown.rs new file mode 100644 index 0000000000000..cce37fc73175a --- /dev/null +++ b/datafusion/physical-plan/src/sample_pushdown.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sample (TABLESAMPLE) pushdown types. +//! +//! Used by [`ExecutionPlan::try_push_sample`] to communicate per-node +//! pushdown intent. The optimizer rule walking the tree decides what +//! to do with the answer: +//! +//! * [`SamplePushdownResult::Absorbed`] — the node has absorbed the +//! sample and the rule should drop the surrounding `SampleExec`. +//! * [`SamplePushdownResult::Passthrough`] — the node is row-preserving +//! (or otherwise commutes with the requested sampling); the rule +//! should recurse into its single child and rebuild the node from +//! the result. +//! * [`SamplePushdownResult::Unsupported`] — pushdown stops here. Today +//! the rule errors; once a generic `SampleExec` filter exists, this +//! becomes "leave the SampleExec in place." +//! +//! [`ExecutionPlan::try_push_sample`]: crate::ExecutionPlan::try_push_sample + +use std::sync::Arc; + +use crate::ExecutionPlan; + +/// SQL TABLESAMPLE method, mirrored from +/// [`datafusion_expr::SampleMethod`] so the physical layer doesn't +/// have to depend on `datafusion-expr`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SampleMethod { + /// Block-level sampling (Postgres `SYSTEM`, Hive `BLOCK`). + System, +} + +/// Specification of a sample request. Crossed the trait boundary for +/// `try_push_sample`. +#[derive(Debug, Clone)] +pub struct SampleSpec { + /// Sampling method. + pub method: SampleMethod, + /// Target fraction of rows to keep, in `(0.0, 1.0]`. + pub fraction: f64, + /// Optional `REPEATABLE(seed)` for deterministic selection. + pub seed: Option, +} + +/// Outcome of a `try_push_sample` call on a single node. +#[derive(Debug, Clone)] +pub enum SamplePushdownResult { + /// The node has absorbed the sample. The rule should treat + /// `inner` as the replacement for the original node *and* drop + /// the surrounding `SampleExec`. + Absorbed { + /// The new node, configured with sampling. + inner: Arc, + }, + /// The node is row-preserving for the requested sampling. The + /// rule should: + /// + /// 1. Recurse into the node's single child with the same + /// [`SampleSpec`]. + /// 2. If the child returns `Absorbed`/`Passthrough`, rebuild + /// this node with the new child via `with_new_children` and + /// drop the surrounding `SampleExec`. + Passthrough, + /// Pushdown stops here. Today the rule treats this as a + /// planning error; in the future it will leave a generic + /// `SampleExec` in place. + Unsupported { + /// Human-readable reason; included in the error message. + reason: String, + }, +} + +impl SamplePushdownResult { + /// Convenience: an `Unsupported` with a static reason. + pub fn unsupported_static(reason: &'static str) -> Self { + Self::Unsupported { + reason: reason.to_string(), + } + } +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..a640d6556702c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1410,6 +1410,25 @@ impl ExecutionPlan for SortExec { updated_node: Some(new_sort), }) } + + fn try_push_sample( + self: Arc, + _spec: &crate::sample_pushdown::SampleSpec, + ) -> Result { + // A pure SortExec is row-preserving and commutes with row + // sampling. A SortExec WITH a fetch (TopK) is row-LIMITING — + // pushing the sample below would change which rows survive + // the topk and the result would no longer be a sample of the + // top-N. In that case we stop pushdown. + if self.fetch().is_some() { + Ok(crate::sample_pushdown::SamplePushdownResult::Unsupported { + reason: "SortExec with fetch (TopK): SAMPLE(TopK(x)) ≠ TopK(SAMPLE(x))" + .to_string(), + }) + } else { + Ok(crate::sample_pushdown::SamplePushdownResult::Passthrough) + } + } } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2e8a65385541e..541cbfecffd45 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -249,6 +249,7 @@ physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after SamplePushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -332,6 +333,7 @@ physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSP physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after SamplePushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -379,6 +381,7 @@ physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSP physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after SamplePushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -624,6 +627,7 @@ physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after SamplePushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE From 0929f1db575785e25de35503a028ff78bda6ee02 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 09:27:46 -0500 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20TableSampleSystemPlanner=20?= =?UTF-8?q?=E2=80=94=20TABLESAMPLE=20SYSTEM=20works=20out=20of=20the=20box?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires SQL `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the infrastructure from the previous commit so it works on a default `SessionContext` (and therefore in `datafusion-cli` and the sqllogictest harness) without any extra registration. - New `datafusion_sql::sample::TableSampleSystemPlanner` (`RelationPlanner`). Lifts `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` to the core `Sample` extension node. Other forms (`BERNOULLI`, `ROW` count, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected at planning time with errors that point at registering a custom `RelationPlanner` ahead of this one. - New public `SamplePhysicalPlanner` (`ExtensionPlanner`) in `datafusion::physical_planner`. Lowers `Sample` to `SampleExec`. Pre-registered in `DefaultPhysicalPlanner::default()` so the default query planner handles it. - `SessionStateDefaults::default_relation_planners()` returns the built-in planner; `SessionStateBuilder::with_default_features()` installs it. Both gated behind the `sql` feature so `--no-default-features` builds keep working. - `register_relation_planner` already prepends to the chain, so any user-supplied planner runs first and can return `Original` to fall through to the built-in for SYSTEM. That composition is the intended override mechanism. End-to-end coverage: - New `datafusion/sqllogictest/test_files/tablesample.slt` exercises the path a user gets out of the box: `SYSTEM(100)`, `SYSTEM(50) REPEATABLE(42)` deterministic count, EXPLAIN absorbed into ParquetSource, every rejected form, and the planning-time error for sources that don't implement `try_push_sample` (CSV). Docs: - `docs/source/user-guide/sql/select.md` gains a `TABLESAMPLE clause` section explaining what it is, the SYSTEM vs BERNOULLI tradeoff, the parquet implementation strategy, deterministic seeds, the EXPLAIN format, and the list of rejected forms. - `docs/source/library-user-guide/extending-sql.md` reframes the existing TABLESAMPLE example as the way to add additional flavours on top of the built-in SYSTEM planner. - `datafusion-examples/examples/relation_planner/main.rs` carries a matching note in its module docs. - `datafusion-examples/README.md` regenerated. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion-examples/README.md | 10 +- .../examples/relation_planner/main.rs | 9 +- .../core/src/execution/session_state.rs | 5 + .../src/execution/session_state_defaults.rs | 15 ++ datafusion/core/src/physical_planner.rs | 85 +++++++- datafusion/sql/src/lib.rs | 1 + datafusion/sql/src/sample.rs | 181 ++++++++++++++++++ .../sqllogictest/test_files/tablesample.slt | 115 +++++++++++ .../library-user-guide/extending-sql.md | 19 +- docs/source/user-guide/sql/select.md | 122 +++++++++++- 10 files changed, 552 insertions(+), 10 deletions(-) create mode 100644 datafusion/sql/src/sample.rs create mode 100644 datafusion/sqllogictest/test_files/tablesample.slt diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 073f269d4a35d..03d0f5270518e 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -193,11 +193,11 @@ cargo run --example dataframe -- dataframe #### Category: Single Process -| Subcommand | File Path | Description | -| --------------- | ------------------------------------------------------------------------------------- | ------------------------------------------ | -| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching | -| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT | -| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE | +| Subcommand | File Path | Description | +| --------------- | ------------------------------------------------------------------------------------- | --------------------------------------------------------------------- | +| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching | +| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT | +| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling | ## SQL Ops Examples diff --git a/datafusion-examples/examples/relation_planner/main.rs b/datafusion-examples/examples/relation_planner/main.rs index babc0d3714f72..f509566da80ea 100644 --- a/datafusion-examples/examples/relation_planner/main.rs +++ b/datafusion-examples/examples/relation_planner/main.rs @@ -35,7 +35,14 @@ //! (file: pivot_unpivot.rs, desc: Implement PIVOT / UNPIVOT) //! //! - `table_sample` -//! (file: table_sample.rs, desc: Implement TABLESAMPLE) +//! (file: table_sample.rs, desc: Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling) +//! +//! Note: `TABLESAMPLE SYSTEM(p%)` is supported out of the box by the +//! built-in `datafusion_sql::sample::TableSampleSystemPlanner`, which is +//! auto-registered on a default `SessionContext`. The `table_sample` +//! example below shows how to register a *different* planner for the +//! row-level forms (`BERNOULLI`, `ROW`, `BUCKET`) that the built-in +//! intentionally does not handle. //! //! ## Snapshot Testing //! diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index de5e6b97c1af9..7f271b38f06c5 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1175,6 +1175,11 @@ impl SessionStateBuilder { .get_or_insert_with(Vec::new) .extend(SessionStateDefaults::default_expr_planners()); + #[cfg(feature = "sql")] + self.relation_planners + .get_or_insert_with(Vec::new) + .extend(SessionStateDefaults::default_relation_planners()); + self.scalar_functions .get_or_insert_with(Vec::new) .extend(SessionStateDefaults::default_scalar_functions()); diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 5e85c1bbc5e9e..1cec617624b37 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -36,6 +36,8 @@ use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::planner::ExprPlanner; +#[cfg(feature = "sql")] +use datafusion_expr::planner::RelationPlanner; use datafusion_expr::registry::ExtensionTypeRegistrationRef; use datafusion_expr::{AggregateUDF, HigherOrderUDF, ScalarUDF, WindowUDF}; use std::collections::HashMap; @@ -82,6 +84,19 @@ impl SessionStateDefaults { default_catalog } + /// Returns the list of default [`RelationPlanner`]s installed by + /// [`Self::default_relation_planners`]. Currently this is just the + /// built-in `TableSampleSystemPlanner`, which lifts + /// `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the core `Sample` + /// extension node so the `SamplePushdown` rule can absorb it into + /// the scan. Other `TABLESAMPLE` flavors are rejected at planning + /// time — register a `RelationPlanner` ahead of this one to add + /// custom semantics. + #[cfg(feature = "sql")] + pub fn default_relation_planners() -> Vec> { + vec![Arc::new(datafusion_sql::sample::TableSampleSystemPlanner)] + } + /// returns the list of default [`ExprPlanner`]s pub fn default_expr_planners() -> Vec> { let expr_planners: Vec> = vec![ diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3b2c7a78e898e..399b8acf6abee 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -237,6 +237,79 @@ pub trait ExtensionPlanner { } } +/// [`ExtensionPlanner`] that lowers the [`Sample`] logical extension +/// node into a [`SampleExec`] physical node. +/// +/// The pushdown machinery (cube-root absorption into `ParquetSource`, +/// the `SamplePushdown` optimizer rule, per-node `Passthrough` +/// overrides) is wired into the default optimizer pipeline, so once +/// a `Sample` reaches the physical planner it will be pushed into +/// the source — but it has to *get* there first. Register this +/// planner on a [`SessionStateBuilder`] / [`DefaultPhysicalPlanner`] +/// alongside whichever [`RelationPlanner`] (or other front-end) you +/// use to emit the `Sample` logical node: +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use datafusion::physical_planner::{DefaultPhysicalPlanner, SamplePhysicalPlanner}; +/// +/// let planner = DefaultPhysicalPlanner::with_extension_planners(vec![ +/// Arc::new(SamplePhysicalPlanner), +/// ]); +/// ``` +/// +/// `SamplePhysicalPlanner` is registered automatically in +/// [`DefaultPhysicalPlanner::default`] so that `TABLESAMPLE SYSTEM` +/// works out of the box on a default `SessionContext`. Callers who +/// supply their own list via [`DefaultPhysicalPlanner::with_extension_planners`] +/// **replace** the defaults — re-add `SamplePhysicalPlanner` to the +/// front of their list if they want sampling support. +/// +/// [`Sample`]: datafusion_expr::logical_plan::sample::Sample +/// [`SampleExec`]: datafusion_physical_plan::sample::SampleExec +/// [`RelationPlanner`]: datafusion_expr::planner::RelationPlanner +/// [`SessionStateBuilder`]: crate::execution::session_state::SessionStateBuilder +#[derive(Debug, Default)] +pub struct SamplePhysicalPlanner; + +#[async_trait] +impl ExtensionPlanner for SamplePhysicalPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + let Some(sample) = node + .as_any() + .downcast_ref::() + else { + return Ok(None); + }; + if physical_inputs.len() != 1 { + return plan_err!( + "Sample expects exactly one input; got {}", + physical_inputs.len() + ); + } + let method = match sample.method { + datafusion_expr::logical_plan::sample::SampleMethod::System => { + datafusion_physical_plan::sample_pushdown::SampleMethod::System + } + }; + Ok(Some(Arc::new( + datafusion_physical_plan::sample::SampleExec::new( + Arc::clone(&physical_inputs[0]), + method, + sample.fraction, + sample.seed, + ), + ))) + } +} + /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. /// @@ -255,11 +328,21 @@ pub trait ExtensionPlanner { /// execute concurrently. /// /// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency -#[derive(Default)] pub struct DefaultPhysicalPlanner { extension_planners: Vec>, } +impl Default for DefaultPhysicalPlanner { + /// Constructs a planner with [`SamplePhysicalPlanner`] pre-registered + /// so the core `Sample` extension node lowers without any extra + /// wiring on a default `SessionContext`. + fn default() -> Self { + Self { + extension_planners: vec![Arc::new(SamplePhysicalPlanner)], + } + } +} + #[async_trait] impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical plan from a logical plan diff --git a/datafusion/sql/src/lib.rs b/datafusion/sql/src/lib.rs index 7fef670933f9a..496d1ded30bc1 100644 --- a/datafusion/sql/src/lib.rs +++ b/datafusion/sql/src/lib.rs @@ -49,6 +49,7 @@ pub mod planner; mod query; mod relation; pub mod resolve; +pub mod sample; mod select; mod set_expr; mod stack; diff --git a/datafusion/sql/src/sample.rs b/datafusion/sql/src/sample.rs new file mode 100644 index 0000000000000..cd5502e669403 --- /dev/null +++ b/datafusion/sql/src/sample.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Built-in [`RelationPlanner`] for `TABLESAMPLE SYSTEM(p%)`. +//! +//! Auto-registered via [`SessionStateDefaults::default_relation_planners`] +//! so SQL `TABLESAMPLE SYSTEM (10) [REPEATABLE (n)]` works out of the +//! box on any default `SessionContext`. Other `TABLESAMPLE` flavours +//! (`BERNOULLI`, `ROW`, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected +//! at planning time — implementing those is left to a downstream +//! `RelationPlanner` (see `datafusion-examples/examples/relation_planner/`). +//! +//! `SessionStateBuilder::register_relation_planner` inserts new planners +//! at the front of the chain, so a downstream planner that returns +//! `Planned` for the same `TABLESAMPLE` syntax wins. Returning +//! `Original` falls through to this default. +//! +//! [`SessionStateDefaults::default_relation_planners`]: ../../datafusion/execution/session_state/struct.SessionStateDefaults.html + +use std::sync::Arc; + +use datafusion_common::{Result, not_impl_err, plan_datafusion_err, plan_err}; +use datafusion_expr::logical_plan::sample::{SampleMethod, sample_plan}; +use datafusion_expr::planner::{ + PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning, +}; +use sqlparser::ast::{ + self, TableFactor, TableSampleKind, TableSampleMethod, TableSampleUnit, +}; + +/// Built-in `RelationPlanner` that lifts `TABLESAMPLE SYSTEM(p%)` +/// (with optional `REPEATABLE(seed)`) into the core +/// [`Sample`](datafusion_expr::logical_plan::sample::Sample) extension +/// node so the `SamplePushdown` optimizer rule can absorb the sample +/// into the scan. +/// +/// Rejects every other form of `TABLESAMPLE` with a `not_impl_err`. To +/// support `BERNOULLI`, row counts, or `BUCKET`, register your own +/// `RelationPlanner` ahead of this one — `register_relation_planner` +/// pushes to the front and the first `Planned` wins. +#[derive(Debug, Default)] +pub struct TableSampleSystemPlanner; + +impl RelationPlanner for TableSampleSystemPlanner { + fn plan_relation( + &self, + relation: TableFactor, + context: &mut dyn RelationPlannerContext, + ) -> Result { + // Only act on Table relations carrying a `TABLESAMPLE` clause. + // Everything else (derived, function, unnest, join) falls + // through to the next planner / DataFusion's default logic. + let TableFactor::Table { + sample: Some(sample), + alias, + name, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + index_hints, + } = relation + else { + return Ok(RelationPlanning::Original(Box::new(relation))); + }; + + let ts = match sample { + TableSampleKind::BeforeTableAlias(s) + | TableSampleKind::AfterTableAlias(s) => *s, + }; + + if ts.bucket.is_some() { + return not_impl_err!( + "TABLESAMPLE BUCKET is not supported (only SYSTEM PERCENT). \ + Register a custom RelationPlanner before the built-in \ + TableSampleSystemPlanner to handle other forms." + ); + } + if ts.offset.is_some() { + return not_impl_err!( + "TABLESAMPLE OFFSET is not supported (only SYSTEM PERCENT)" + ); + } + match ts.name { + // The built-in planner only handles SYSTEM (and BLOCK as an + // alias for SYSTEM, matching Hive). An unspecified method + // is rejected rather than silently picking SYSTEM, since + // the right default differs by engine (PostgreSQL requires + // an explicit method; Spark defaults to block-level). + // Anything else is a semantics commitment we don't want to + // make in core. + Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) => {} + None => { + return not_impl_err!( + "TABLESAMPLE without an explicit method is not supported; \ + write TABLESAMPLE SYSTEM (...) (or register a custom \ + RelationPlanner before the built-in TableSampleSystemPlanner \ + to define a default)." + ); + } + Some(other) => { + return not_impl_err!( + "TABLESAMPLE method {other} is not supported (only SYSTEM). \ + Register a custom RelationPlanner before the built-in \ + TableSampleSystemPlanner to handle other methods." + ); + } + } + + let quantity = ts.quantity.ok_or_else(|| { + plan_datafusion_err!("TABLESAMPLE without a quantity is not supported") + })?; + let raw = match &quantity.value { + ast::Expr::Value(vs) => match &vs.value { + ast::Value::Number(n, _) => n.parse::().map_err(|_| { + plan_datafusion_err!("invalid TABLESAMPLE quantity: {n}") + })?, + v => return plan_err!("TABLESAMPLE quantity must be numeric; got {v:?}"), + }, + other => { + return plan_err!("TABLESAMPLE quantity must be a literal; got {other}"); + } + }; + let fraction = match quantity.unit { + Some(TableSampleUnit::Percent) | None => raw / 100.0, + Some(TableSampleUnit::Rows) => { + return not_impl_err!( + "TABLESAMPLE with ROWS count is not supported (only SYSTEM PERCENT)" + ); + } + }; + + let seed = ts + .seed + .map(|s| match s.value { + ast::Value::Number(n, _) => n + .parse::() + .map_err(|_| plan_datafusion_err!("invalid REPEATABLE seed: {n}")), + v => Err(plan_datafusion_err!( + "REPEATABLE seed must be an integer; got {v:?}" + )), + }) + .transpose()?; + + // Replan the bare table without the sample clause, then wrap + // the resulting plan in a `Sample` extension node. + let bare = TableFactor::Table { + sample: None, + alias: alias.clone(), + name, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + index_hints, + }; + let input = context.plan(bare)?; + let plan = sample_plan(Arc::new(input), SampleMethod::System, fraction, seed)?; + Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new( + plan, alias, + )))) + } +} diff --git a/datafusion/sqllogictest/test_files/tablesample.slt b/datafusion/sqllogictest/test_files/tablesample.slt new file mode 100644 index 0000000000000..bf3ded960f71c --- /dev/null +++ b/datafusion/sqllogictest/test_files/tablesample.slt @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# End-to-end coverage for the built-in TABLESAMPLE SYSTEM planner + +# adaptive cube-root pushdown into ParquetSource. The planner +# (`datafusion_sql::sample::TableSampleSystemPlanner`) is auto- +# registered by `SessionStateBuilder::with_default_features`, so this +# fixture exercises the path a user gets out of the box from +# `datafusion-cli`. + +# Single-file / single-row-group parquet. The adaptive logic detects +# both axes can't reduce and pushes the full requested fraction down +# to the row level, so we get ~p*N rows out instead of ~cbrt(p)*N. +statement ok +COPY (SELECT i AS id FROM generate_series(1, 1024) t(i)) +TO 'test_files/scratch/tablesample/sample_table.parquet' +STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE sample_table +STORED AS PARQUET +LOCATION 'test_files/scratch/tablesample/sample_table.parquet'; + +# 100% returns every row. +query I +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (100); +---- +1024 + +# REPEATABLE seed produces the same count on re-run. With one file and +# one row group the adaptive split skips both file and row-group axes +# and applies the full 0.5 at the row level, so the count is ~512 (the +# exact number depends on cluster-window packing). +query I +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (50) REPEATABLE (42); +---- +512 + +query I +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (50) REPEATABLE (42); +---- +512 + +# Different seed → still bounded. +query B +SELECT count(*) > 0 AND count(*) <= 1024 FROM sample_table TABLESAMPLE SYSTEM (10) REPEATABLE (7); +---- +true + +# EXPLAIN — the SamplePushdown rule should have absorbed the sample +# into ParquetSource; no SampleExec must remain in the physical plan. +# `sample_system_target_remaining` is the residual fraction the opener +# applies adaptively based on the actual row-group count it observes. +query TT +EXPLAIN SELECT id FROM sample_table TABLESAMPLE SYSTEM (50) REPEATABLE (42); +---- +logical_plan +01)Sample: SYSTEM(50.0000%) REPEATABLE(42) +02)--TableScan: sample_table projection=[id] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/tablesample/sample_table.parquet]]}, projection=[id], file_type=parquet, sample_system_target_remaining=0.5000 + +# --- Rejected forms (the built-in planner only supports SYSTEM PERCENT). --- + +statement error TABLESAMPLE method BERNOULLI is not supported +SELECT count(*) FROM sample_table TABLESAMPLE BERNOULLI (10); + +statement error TABLESAMPLE without an explicit method is not supported +SELECT count(*) FROM sample_table TABLESAMPLE (10); + +statement error TABLESAMPLE with ROWS count is not supported +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (100 ROWS); + +statement error TABLESAMPLE fraction must be in +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (150); + +statement error TABLESAMPLE fraction must be in +SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (0); + +# Sources that don't implement `try_push_sample` (e.g. CSV) cause the +# `SamplePushdown` rule to error at planning time, since there is no +# generic post-scan SampleExec yet — the rule is the only thing that +# currently turns a `Sample` into a runnable plan. +statement ok +COPY (SELECT i AS id FROM generate_series(1, 16) t(i)) +TO 'test_files/scratch/tablesample/sample_table.csv' +STORED AS CSV; + +statement ok +CREATE EXTERNAL TABLE sample_csv +STORED AS CSV +LOCATION 'test_files/scratch/tablesample/sample_table.csv' +OPTIONS ('format.has_header' 'true'); + +statement error TABLESAMPLE could not be pushed down +SELECT count(*) FROM sample_csv TABLESAMPLE SYSTEM (50); + +statement ok +DROP TABLE sample_csv; + +statement ok +DROP TABLE sample_table; diff --git a/docs/source/library-user-guide/extending-sql.md b/docs/source/library-user-guide/extending-sql.md index eea5b3b1acfc9..d896dc1e47b0a 100644 --- a/docs/source/library-user-guide/extending-sql.md +++ b/docs/source/library-user-guide/extending-sql.md @@ -341,13 +341,28 @@ approach: ### TABLESAMPLE (Custom Logical and Physical Nodes) -The [table_sample.rs] example shows a complete end-to-end implementation of how to -support queries such as: +DataFusion ships with a built-in `RelationPlanner` for +`TABLESAMPLE SYSTEM(p%)` (block-level sampling), auto-registered on +any default `SessionContext` and pushed into `ParquetSource` by the +`SamplePushdown` optimizer rule. See the [`TABLESAMPLE` section] of +the SQL reference for a full description of the semantics and the +parquet implementation. + +The [table_sample.rs] example shows how to register a _custom_ +`RelationPlanner` ahead of the built-in one to add other forms — row- +level `BERNOULLI`, `ROW` count limits, Hive-style `BUCKET`, etc. — that +DataFusion intentionally doesn't ship in core because the semantics +vary across systems. Because `register_relation_planner` prepends +to the chain, the custom planner runs first; returning +`RelationPlanning::Original` falls through to the built-in +`SYSTEM` planner. ```sql SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT) REPEATABLE(42) ``` +[`tablesample` section]: ../user-guide/sql/select.md#tablesample-clause + ### PIVOT/UNPIVOT (Rewrite Strategy) The [pivot_unpivot.rs] example demonstrates rewriting custom syntax to standard SQL diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md index 3564884b041ad..afd1875a97c40 100644 --- a/docs/source/user-guide/sql/select.md +++ b/docs/source/user-guide/sql/select.md @@ -30,7 +30,7 @@ DataFusion supports the following syntax for queries: [ [WITH](#with-clause) with_query [, ...] ]
[SELECT](#select-clause) [ ALL | DISTINCT ] select_expr [, ...]
-[ [FROM](#from-clause) from_item [, ...] ]
+[ [FROM](#from-clause) from_item [ [TABLESAMPLE](#tablesample-clause) ... ] [, ...] ]
[ [JOIN](#join-clause) join_item [, ...] ]
[ [WHERE](#where-clause) condition ]
[ [GROUP BY](#group-by-clause) grouping_element [, ...] ]
@@ -76,6 +76,126 @@ Example: SELECT t.a FROM table AS t ``` +## TABLESAMPLE clause + +`TABLESAMPLE` returns a random subset of rows from a table. It's +useful for ad-hoc data exploration ("give me roughly 1% of this +table"), bounded `EXPLAIN ANALYZE` runs against representative data, +and any analytics workload where an approximate answer is acceptable +in exchange for reading less data. + +```sql +SELECT * FROM table TABLESAMPLE SYSTEM (10); -- ~10% of the table +SELECT * FROM table TABLESAMPLE SYSTEM (5) REPEATABLE (42); -- deterministic +``` + +The percentage is in the range `(0, 100]`. `REPEATABLE(seed)` makes +the sample deterministic — the same seed against the same data always +returns the same rows. + +### What `SYSTEM` means + +`SYSTEM` is **block-level** sampling: instead of evaluating a +per-row coin flip, the scan keeps or drops whole blocks of rows +chosen at random. This is the same behaviour PostgreSQL documents +for `TABLESAMPLE SYSTEM` and what Hive calls `BLOCK` sampling +(DataFusion accepts `BLOCK` as an alias for `SYSTEM`). + +The trade-off vs. row-level sampling (`BERNOULLI`): + +- **`SYSTEM`** is fast — the scan can skip blocks entirely, so it + reads less I/O proportional to the requested fraction. Rows + inside each kept block are correlated, so it's statistically + lossier than per-row sampling. +- **`BERNOULLI`** evaluates `random() < p` per row, so every row is + read but only some are kept. Statistically tighter, but no I/O + saving. + +DataFusion only ships `SYSTEM` out of the box. To add `BERNOULLI` or +other forms, register a [`RelationPlanner`] extension; see the +[extending SQL] guide and the [`relation_planner` example]. + +[`relationplanner`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/planner/trait.RelationPlanner.html +[extending sql]: ../../library-user-guide/extending-sql.md +[`relation_planner` example]: https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/relation_planner + +### How `SYSTEM` is implemented for Parquet + +For a `ParquetSource`, `TABLESAMPLE SYSTEM(p%)` is pushed all the way +into the scan rather than evaluated as a post-scan filter. The plan +contains no `SampleExec` — instead, `ParquetSource` itself drops +files, row groups, and row-clusters in proportion to `p`. + +The selection uses an **adaptive cube-root hybrid** that splits the +budget across three independent levels — file, row-group, and row +— and collapses the split when an axis can't reduce: + +1. **File level** (in the `SamplePushdown` rule, where the file count + is known): with `n_files ≥ 2`, keep `⌈n_files * cbrt(p)⌉` files + chosen by a seeded shuffle. With `n_files = 1`, the file axis + can't reduce, so the full budget flows to the opener. +2. **Row-group level** (in the parquet opener, after the footer is + loaded so the row-group count is known): with multiple row groups + in a file, the residual fraction is split as `sqrt(remaining)` at + the row-group and row axes. With a single row group, the row-group + axis is skipped and the full residual is applied at the row level. +3. **Row level** — within each kept row group, the kept fraction is + materialised as a small number of contiguous `RowSelection` + windows so the parquet reader can use the page index to skip data + pages entirely. + +The product across all axes is always `p`, so the expected result +size is `p × N` rows regardless of how many files or row groups +the scan happens to have. Spreading the reduction across all three +axes means the I/O win at small fractions does not concentrate at +a single granularity: dropping 90% of files (1/0.1 ≈ 10× fewer files) +produces a coarser sample than dropping 90% across all axes evenly. +Small inputs degenerate gracefully — a single-file scan still hits +the requested `p`; a single-file / single-row-group scan reduces to +pure row-level sampling. + +`REPEATABLE(seed)` mixes the seed into every random draw, so all +levels produce the same selection across runs. The selection also +depends on the execution `partition_index` of each file (a stable +per-file id assigned by the scan, independent of the on-disk path), +the row-group index within the file, and the cluster size, so +different files don't accidentally see correlated samples and the +sample is reproducible across environments. + +The sampling is visible in `EXPLAIN`: + +```text +DataSourceExec: file_groups={...}, projection=[...], + file_type=parquet, + sample_system_target_remaining=0.5000 +``` + +`sample_system_target_remaining` is the residual fraction handed off +to the opener after the file axis has been applied. The opener then +splits adaptively across the row-group / row axes based on what it +sees in each file's footer — that decision isn't visible in the plan +text since it varies per file at scan time. + +There is no `SampleExec` in the physical plan — the `SamplePushdown` +optimizer rule absorbed the sample into the source. If pushdown is +not possible (for example, against a non-Parquet source that does +not implement `try_push_sample`), the rule errors at planning time +with `TABLESAMPLE is not supported for this source`. + +### Limitations + +The built-in planner accepts only `TABLESAMPLE SYSTEM(p%)` with an +optional `REPEATABLE(seed)`. The following forms error at planning +time: + +- `TABLESAMPLE BERNOULLI(...)` — register a custom `RelationPlanner`. +- `TABLESAMPLE (N ROWS)` — use `LIMIT N` instead, or a custom planner. +- `TABLESAMPLE BUCKET m OUT OF n` — Hive bucket sampling is not + supported. +- `TABLESAMPLE ... OFFSET ...` — ClickHouse-style offset sampling is + not supported. +- Fractions outside `(0, 100]`. + ## WHERE clause Example: From d1def87f6933c1fc8c10fb69b736dda0c8ba7cca Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 09:36:01 -0500 Subject: [PATCH 4/4] example: rework TABLESAMPLE example to compose with built-in SYSTEM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The relation_planner/table_sample.rs example previously errored on TABLESAMPLE methods other than BERNOULLI / ROW. After commit 3 of this stack registers a built-in TableSampleSystemPlanner by default, the example's planner runs *first* in the chain, so its error short-circuits SYSTEM and prevents the built-in from handling it. Reshape the example to use the chain composition pattern instead: - The planner now returns `RelationPlanning::Original` for methods it doesn't implement (SYSTEM, BLOCK, anything else), so the built-in picks up the slack. **No SYSTEM reimplementation in the example.** - The example's `TableSampleQueryPlanner` registers both `TableSampleExtensionPlanner` (this example's `TableSamplePlanNode` → its own `SampleExec`) **and** `SamplePhysicalPlanner` (core `Sample` → core `SampleExec`), since `with_extension_planners(...)` replaces the defaults. - Adds a parquet-backed `sample_data_parquet` table so SYSTEM has a source it can push into (the existing in-memory `sample_data` doesn't implement `try_push_sample`). - New Example 7 demonstrates `TABLESAMPLE SYSTEM (50) REPEATABLE (42)` against the parquet table; the row count is asserted (the exact rows shift with the tempdir path that gets hashed into the per-file seed, but the count from the cube-root math is stable). - Module docstring rewritten to lead with the composition story. The 3 existing relation_planner example tests (match_recognize, pivot_unpivot, table_sample) still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/relation_planner/table_sample.rs | 181 +++++++++++++++--- 1 file changed, 153 insertions(+), 28 deletions(-) diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 42342e5f1a641..6ccfdfd385c84 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -15,35 +15,48 @@ // specific language governing permissions and limitations // under the License. -//! # TABLESAMPLE Example +//! # TABLESAMPLE Example — adding row-level sampling on top of the built-in //! -//! This example demonstrates implementing SQL `TABLESAMPLE` support using -//! DataFusion's extensibility APIs. +//! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the +//! core `Sample` extension node by the auto-registered +//! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the +//! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW` +//! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension, +//! which is what this example demonstrates. //! -//! This is a working `TABLESAMPLE` implementation that can serve as a starting -//! point for your own projects. It also works as a template for adding other -//! custom SQL operators, covering the full pipeline from parsing to execution. +//! The key composition pattern: when our planner sees `TABLESAMPLE`, it +//! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`) +//! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because +//! `register_relation_planner` prepends to the chain, our planner runs +//! first and the built-in handles whatever we don't. **No SYSTEM +//! reimplementation required.** //! //! It shows how to: //! //! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`] //! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`]) //! 3. **Execute** sampling via a custom physical operator ([`SampleExec`]) +//! 4. **Compose** with the built-in SYSTEM planner by returning +//! `RelationPlanning::Original` for methods we don't implement //! //! ## Supported Syntax //! //! ```sql -//! -- Bernoulli sampling (each row has N% chance of selection) +//! -- Bernoulli sampling (each row has N% chance of selection) — this example //! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT) //! -//! -- Fractional sampling (0.0 to 1.0) +//! -- Fractional sampling (0.0 to 1.0) — this example //! SELECT * FROM table TABLESAMPLE (0.1) //! -//! -- Row count limit +//! -- Row count limit — this example //! SELECT * FROM table TABLESAMPLE (100 ROWS) //! //! -- Reproducible sampling with a seed //! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42) +//! +//! -- SYSTEM (block-level) sampling — handled by the built-in planner +//! -- WITHOUT any code in this example +//! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42) //! ``` //! //! ## Architecture @@ -81,6 +94,7 @@ use std::{ fmt::{self, Debug, Formatter}, + fs::File, hash::{Hash, Hasher}, pin::Pin, sync::Arc, @@ -98,9 +112,12 @@ use futures::{ stream::{Stream, StreamExt}, }; use rand::{Rng, SeedableRng, rngs::StdRng}; +use tempfile::TempDir; use tonic::async_trait; use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::properties::WriterProperties; use datafusion::{ execution::{ RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder, @@ -111,7 +128,9 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput}, }, - physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, + physical_planner::{ + DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, SamplePhysicalPlanner, + }, prelude::*, }; use datafusion_common::{ @@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> { let ctx = SessionContext::new_with_state(state); - // Register custom relation planner for logical planning + // Register custom relation planner for logical planning. It's prepended + // to the chain (so it runs *before* the auto-registered built-in + // `TableSampleSystemPlanner`); when our planner returns + // `RelationPlanning::Original`, the built-in handles it. ctx.register_relation_planner(Arc::new(TableSamplePlanner))?; register_sample_data(&ctx)?; + // Register a parquet-backed copy of the same data so we can demonstrate + // SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources; + // the in-memory `sample_data` table can't absorb a `Sample` node. + let _parquet_dir = register_sample_data_parquet(&ctx).await?; + println!("TABLESAMPLE Example"); println!("===================\n"); @@ -273,6 +300,34 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> { +---------+---------+---------+---------+ "); + // Example 7: SYSTEM sampling — handled by the built-in + // `TableSampleSystemPlanner`, **not** by this example's planner. + // Our planner returns `Original` for SYSTEM, so the chain falls + // through. Routed against the parquet-backed copy of the table so + // the `SamplePushdown` rule can absorb the sample into the scan. + // `REPEATABLE(42)` makes the rows deterministic across runs and + // across machines: the parquet sampler keys on the seed plus the + // execution `partition_index` (a stable per-file id), never on the + // on-disk path, so the same query against the same data picks the + // same rows everywhere. + let results = run_example( + ctx, + "Example 7: SYSTEM (handled by the built-in, not this example)", + "SELECT * FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)", + ) + .await?; + assert_snapshot!(results, @r" + +---------+---------+ + | column1 | column2 | + +---------+---------+ + | 6 | row_6 | + | 7 | row_7 | + | 8 | row_8 | + | 9 | row_9 | + | 10 | row_10 | + +---------+---------+ + "); + Ok(()) } @@ -301,6 +356,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> { Ok(()) } +/// Register the same data as `sample_data_parquet`, backed by a tempfile +/// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has +/// a `ParquetSource` to push into. Returns the `TempDir` so the caller +/// can keep it alive for the duration of the queries. +async fn register_sample_data_parquet(ctx: &SessionContext) -> Result { + let dir = TempDir::new().map_err(|e| plan_datafusion_err!("tempdir: {e}"))?; + let path = dir.path().join("sample_data.parquet"); + + let column1: ArrayRef = Arc::new(Int32Array::from((1..=10).collect::>())); + let column2: ArrayRef = Arc::new(StringArray::from( + (1..=10).map(|i| format!("row_{i}")).collect::>(), + )); + let batch = + RecordBatch::try_from_iter(vec![("column1", column1), ("column2", column2)])?; + let file = File::create(&path).map_err(|e| plan_datafusion_err!("create: {e}"))?; + let mut writer = ArrowWriter::try_new( + file, + batch.schema(), + Some(WriterProperties::builder().build()), + ) + .map_err(|e| plan_datafusion_err!("ArrowWriter: {e}"))?; + writer + .write(&batch) + .map_err(|e| plan_datafusion_err!("write: {e}"))?; + writer + .close() + .map_err(|e| plan_datafusion_err!("close: {e}"))?; + + ctx.register_parquet( + "sample_data_parquet", + path.to_str().unwrap(), + Default::default(), + ) + .await?; + Ok(dir) +} + // ============================================================================ // Logical Planning: TableSamplePlanner + TableSamplePlanNode // ============================================================================ @@ -318,7 +410,7 @@ impl RelationPlanner for TableSamplePlanner { ) -> Result { // Only handle Table relations with TABLESAMPLE clause let TableFactor::Table { - sample: Some(sample), + sample: Some(kind), alias, name, args, @@ -333,23 +425,48 @@ impl RelationPlanner for TableSamplePlanner { return Ok(RelationPlanning::Original(Box::new(relation))); }; - // Extract sample spec (handles both before/after alias positions) - let sample = match sample { + // Inspect the sample without consuming `kind` yet — we may need to + // hand the relation back unchanged for methods this example doesn't + // implement. + let inspect = match &kind { ast::TableSampleKind::BeforeTableAlias(s) - | ast::TableSampleKind::AfterTableAlias(s) => s, + | ast::TableSampleKind::AfterTableAlias(s) => s.as_ref(), }; - // Validate sampling method - if let Some(method) = &sample.name - && *method != TableSampleMethod::Bernoulli - && *method != TableSampleMethod::Row - { - return not_impl_err!( - "Sampling method {} is not supported (only BERNOULLI and ROW)", - method + // This example handles BERNOULLI / ROW (row-level coin flip) and + // BUCKET (Hive-style modulo). Anything else — most importantly + // SYSTEM / BLOCK — falls through to the next planner so the + // built-in `TableSampleSystemPlanner` (auto-registered via + // `SessionStateBuilder::with_default_features`) can handle it + // with parquet pushdown. **No SYSTEM reimplementation here.** + let we_handle = inspect.bucket.is_some() + || matches!( + inspect.name, + Some(TableSampleMethod::Bernoulli) | Some(TableSampleMethod::Row) | None ); + if !we_handle { + // Reconstruct the original relation and pass it on. + let original = TableFactor::Table { + sample: Some(kind), + alias, + name, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + index_hints, + }; + return Ok(RelationPlanning::Original(Box::new(original))); } + // Extract sample spec (handles both before/after alias positions) + let sample = match kind { + ast::TableSampleKind::BeforeTableAlias(s) + | ast::TableSampleKind::AfterTableAlias(s) => s, + }; + // Offset sampling (ClickHouse-style) not supported if sample.offset.is_some() { return not_impl_err!( @@ -553,8 +670,15 @@ impl Hash for HashableF64 { // Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner // ============================================================================ -/// Custom query planner that registers [`TableSampleExtensionPlanner`] to -/// convert [`TableSamplePlanNode`] into [`SampleExec`]. +/// Custom query planner that registers [`TableSampleExtensionPlanner`] +/// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`]) +/// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core +/// `Sample` extension node to its `SampleExec`). Both extension planners +/// coexist: each only handles its own logical node type and returns +/// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here, +/// `TABLESAMPLE SYSTEM` queries that fall through to the built-in +/// `TableSampleSystemPlanner` would fail to plan because +/// `with_extension_planners(...)` *replaces* the defaults. #[derive(Debug)] struct TableSampleQueryPlanner; @@ -565,9 +689,10 @@ impl QueryPlanner for TableSampleQueryPlanner { logical_plan: &LogicalPlan, session_state: &SessionState, ) -> Result> { - let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( - TableSampleExtensionPlanner, - )]); + let planner = DefaultPhysicalPlanner::with_extension_planners(vec![ + Arc::new(TableSampleExtensionPlanner), + Arc::new(SamplePhysicalPlanner), + ]); planner .create_physical_plan(logical_plan, session_state) .await