diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..19262a8dab0c8 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -210,6 +210,18 @@ pub struct FileScanConfig { /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Desired-but-not-required ordering for the shared work queue that + /// feeds [`SharedWorkSource`]. + /// + /// Populated by Inexact sort pushdown so that workers pull files in + /// globally best-first order (e.g., highest-max for DESC TopK), letting + /// dynamic filters tighten fastest across the whole scan. + /// + /// Distinct from [`Self::output_ordering`]: `output_ordering` is a + /// contract the scan guarantees, while this is only a scheduling hint. + /// Has no effect when `SharedWorkSource` is not used (i.e. when + /// `preserve_order` or `partitioned_by_file_group` is true). + pub(crate) work_order_hint: Option, } /// A builder for [`FileScanConfig`]'s. @@ -280,6 +292,7 @@ pub struct FileScanConfigBuilder { batch_size: Option, expr_adapter_factory: Option>, partitioned_by_file_group: bool, + work_order_hint: Option, } impl FileScanConfigBuilder { @@ -306,6 +319,7 @@ impl FileScanConfigBuilder { batch_size: None, expr_adapter_factory: None, partitioned_by_file_group: false, + work_order_hint: None, } } @@ -527,6 +541,7 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, partitioned_by_file_group, + work_order_hint, } = self; let constraints = constraints.unwrap_or_default(); @@ -552,6 +567,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + work_order_hint, } } } @@ -571,6 +587,7 @@ impl From for FileScanConfigBuilder { batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, partitioned_by_file_group: config.partitioned_by_file_group, + work_order_hint: config.work_order_hint, } } } @@ -2667,6 +2684,97 @@ mod tests { Ok(()) } + #[test] + fn sort_pushdown_inexact_populates_work_order_hint() -> Result<()> { + // Inexact pushdown (via Unsupported → try_sort_file_groups_by_statistics) + // must leave a sort hint on the config so SharedWorkSource can + // seed its queue in globally best-first order. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + // Two groups whose min/max values interleave — per-group sort from + // #21182 is not sufficient to give workers globally best-first files. + // Intra-group ordering must be non-natural so that + // `try_sort_file_groups_by_statistics` reorders → Inexact. + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("g0_hi", 50.0, 60.0), + make_file_with_stats("g0_mid", 30.0, 40.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("g1_top", 70.0, 80.0), + make_file_with_stats("g1_lo", 10.0, 20.0), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(std::slice::from_ref(&sort_expr))?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result, got {result:?}"); + }; + let pushed = inner + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // The hint must be populated and match the requested ordering. + let hint = pushed + .work_order_hint + .as_ref() + .expect("work_order_hint must be set on Inexact"); + assert_eq!(hint.len(), 1); + assert_eq!(hint[0], sort_expr); + + // Partition count must be unchanged — groups drive output_partitioning. + assert_eq!(pushed.file_groups.len(), 2); + Ok(()) + } + + #[test] + fn sort_pushdown_rebuild_reverse_leaves_hint_none() -> Result<()> { + // When the requested order is the reverse of the scan's declared + // output_ordering, rebuild_with_source reverses files per group + // instead of stats-sorting. A globally-ascending hint would defeat + // that reversal, so the hint must stay None on this path. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source: Arc = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("f1", 0.0, 9.0), + make_file_with_stats("f2", 10.0, 19.0), + make_file_with_stats("f3", 20.0, 30.0), + ])]; + + let asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + // Use `.reverse()` so both `descending` and `nulls_first` flip, + // which is what `LexOrdering::is_reverse` requires. + let desc = asc.reverse(); + let declared: LexOrdering = [asc.clone()].into(); + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![declared]) + .build(); + + // is_exact=false → Inexact path, the one that could populate the hint. + let rebuilt = config.rebuild_with_source( + Arc::clone(&config.file_source), + false, + &[desc], + )?; + assert!(rebuilt.work_order_hint.is_none()); + Ok(()) + } + #[test] fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> { let file_schema = diff --git a/datafusion/datasource/src/file_scan_config/sort_pushdown.rs b/datafusion/datasource/src/file_scan_config/sort_pushdown.rs index af08ed71b9a6d..ecaeb998f0899 100644 --- a/datafusion/datasource/src/file_scan_config/sort_pushdown.rs +++ b/datafusion/datasource/src/file_scan_config/sort_pushdown.rs @@ -25,6 +25,7 @@ //! core configuration and data-source plumbing. use super::FileScanConfig; +use crate::PartitionedFile; use crate::file::FileSource; use crate::file_groups::FileGroup; use crate::source::DataSource; @@ -110,9 +111,11 @@ impl FileScanConfig { new_config.file_source = new_file_source; + let sort_order = LexOrdering::new(order.iter().cloned()); + // Sort files within groups by statistics when not reversing let all_non_overlapping = if !reverse_file_groups { - if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + if let Some(sort_order) = sort_order.as_ref() { let projected_schema = new_config.projected_schema()?; let projection_indices = new_config .file_source @@ -121,7 +124,7 @@ impl FileScanConfig { .and_then(|p| ordered_column_indices_from_projection(p)); let result = sort_files_within_groups_by_statistics( &new_config.file_groups, - &sort_order, + sort_order, &projected_schema, projection_indices.as_deref(), ); @@ -164,6 +167,19 @@ impl FileScanConfig { // sits idle the entire time, losing the parallelism benefit. } else { new_config.output_ordering = vec![]; + // Inexact: SortExec remains above and a SharedWorkSource may be + // active for this scan. Stash the requested sort order so the + // shared queue can be seeded in globally best-first order — + // dynamic filters (TopK, etc.) then tighten fastest across the + // whole scan. Skip on the reverse path: the within-group reversal + // must not be overridden by a global ascending sort (a globally + // descending hint is a possible follow-up). + // + // TODO: populate a reversed hint in the `reverse_file_groups` + // case for symmetry (see lines ~99-109 above). + if !reverse_file_groups { + new_config.work_order_hint = sort_order; + } } Ok(new_config) @@ -249,6 +265,11 @@ impl FileScanConfig { } new_config.output_ordering = vec![]; + // Inexact: see `rebuild_with_source` for rationale. This branch is + // always reached via the non-reversing path (`try_sort_file_groups_by_statistics` + // only sorts in the natural direction), so no reverse-scan gating + // is needed here. + new_config.work_order_hint = Some(sort_order); Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_config), }) @@ -343,6 +364,60 @@ pub(crate) fn sort_files_within_groups_by_statistics( } } +/// Flatten all files across groups and sort them globally by min value of the +/// sort key. Used to seed the [`SharedWorkSource`](crate::file_stream::work_source::SharedWorkSource) +/// queue in the Inexact sort-pushdown case so workers pull files in globally +/// best-first order — e.g., lowest-min first for ASC, which after the +/// reverse-scan flip is highest-max first for DESC TopK. Tightens the TopK +/// dynamic filter threshold faster across the whole scan. +/// +/// Returns `None` when stats are unusable (e.g., any file missing statistics, +/// or there are no files). Callers should fall back to their existing flat +/// order in that case. +pub(crate) fn sort_files_globally_by_statistics( + file_groups: &[FileGroup], + sort_order: &LexOrdering, + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, +) -> Option> { + let files: Vec<&PartitionedFile> = + file_groups.iter().flat_map(FileGroup::iter).collect(); + if files.len() <= 1 { + return None; + } + + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + log::trace!( + "Cannot globally sort files by statistics: {e}. Falling back to flat order." + ); + return None; + } + }; + + let sorted_indices = statistics.min_values_sorted(); + let already_sorted = sorted_indices + .iter() + .enumerate() + .all(|(pos, (idx, _))| pos == *idx); + if already_sorted { + return None; + } + + Some( + sorted_indices + .iter() + .map(|(idx, _)| files[*idx].clone()) + .collect(), + ) +} + /// Check if any file in any group has nulls in the sort columns. pub(crate) fn any_file_has_nulls_in_sort_columns( file_groups: &[FileGroup], diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 7f31dacca9592..7cc180413e0d5 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -85,7 +85,32 @@ impl SharedWorkSource { } /// Create a shared work source for the unopened files in `config`. + /// + /// When `config.work_order_hint` is set (populated by Inexact sort + /// pushdown), files are seeded in globally stats-sorted order so + /// workers see best-first files first — letting dynamic filters + /// (TopK, etc.) tighten across the whole scan. Falls back to the + /// flat group-order iteration when no hint is present or when stats + /// are unusable. pub(crate) fn from_config(config: &FileScanConfig) -> Self { + if let Some(hint) = config.work_order_hint.as_ref() + && let Ok(projected_schema) = config.projected_schema() + { + let projection_indices = + config.file_source.projection().as_ref().and_then(|p| { + crate::file_scan_config::sort_pushdown::ordered_column_indices_from_projection(p) + }); + if let Some(sorted) = + crate::file_scan_config::sort_pushdown::sort_files_globally_by_statistics( + &config.file_groups, + hint, + &projected_schema, + projection_indices.as_deref(), + ) + { + return Self::new(sorted); + } + } Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) } @@ -96,3 +121,138 @@ impl SharedWorkSource { self.inner.files.lock().pop_front() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::PartitionedFile; + use crate::file_groups::FileGroup; + use crate::file_scan_config::FileScanConfigBuilder; + use crate::test_util::MockSource; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use std::sync::Arc; + + use crate::table_schema::TableSchema; + + fn file_with_min_max(name: &str, min: f64, max: f64) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + fn file_no_stats(name: &str) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024) + } + + fn config_with_groups( + groups: Vec, + hint: Option, + ) -> FileScanConfig { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(file_schema, vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + let mut cfg = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(groups) + .build(); + cfg.work_order_hint = hint; + cfg + } + + fn drain_names(source: SharedWorkSource) -> Vec { + let mut out = Vec::new(); + while let Some(f) = source.pop_front() { + out.push(f.object_meta.location.as_ref().to_string()); + } + out + } + + #[test] + fn from_config_without_hint_preserves_flat_group_order() { + let groups = vec![ + FileGroup::new(vec![ + file_with_min_max("g0_a", 30.0, 40.0), + file_with_min_max("g0_b", 50.0, 60.0), + ]), + FileGroup::new(vec![ + file_with_min_max("g1_a", 10.0, 20.0), + file_with_min_max("g1_b", 70.0, 80.0), + ]), + ]; + let cfg = config_with_groups(groups, None); + let source = SharedWorkSource::from_config(&cfg); + // No hint → today's flat order is preserved. + assert_eq!( + drain_names(source), + vec!["g0_a", "g0_b", "g1_a", "g1_b"] + .into_iter() + .map(String::from) + .collect::>(), + ); + } + + #[test] + fn from_config_with_hint_seeds_globally_sorted_queue() { + // Groups whose min values interleave: cross-group ordering is what + // the hint is supposed to fix. + let groups = vec![ + FileGroup::new(vec![ + file_with_min_max("g0_hi", 50.0, 60.0), + file_with_min_max("g0_mid", 30.0, 40.0), + ]), + FileGroup::new(vec![ + file_with_min_max("g1_lo", 10.0, 20.0), + file_with_min_max("g1_top", 70.0, 80.0), + ]), + ]; + let hint: LexOrdering = + [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(); + let cfg = config_with_groups(groups, Some(hint)); + let source = SharedWorkSource::from_config(&cfg); + // Sorted ascending by min: 10, 30, 50, 70. + assert_eq!( + drain_names(source), + vec!["g1_lo", "g0_mid", "g0_hi", "g1_top"] + .into_iter() + .map(String::from) + .collect::>(), + ); + } + + #[test] + fn from_config_falls_back_when_stats_missing() { + // One file without stats → global sort returns None, fallback path + // used. Queue should match flat group-order iteration. + let groups = vec![ + FileGroup::new(vec![file_with_min_max("g0_a", 30.0, 40.0)]), + FileGroup::new(vec![file_no_stats("g1_no_stats")]), + ]; + let hint: LexOrdering = + [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(); + let cfg = config_with_groups(groups, Some(hint)); + let source = SharedWorkSource::from_config(&cfg); + assert_eq!( + drain_names(source), + vec!["g0_a", "g1_no_stats"] + .into_iter() + .map(String::from) + .collect::>(), + ); + } +}