diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index d6fd4d8d00ae..2f66aa3c3e20 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -255,8 +255,9 @@ fn test_prefix_match_through_transparent_nodes() { } #[test] -fn test_no_prefix_match_wrong_direction() { - // Test that prefix matching does NOT work if the direction is wrong +fn test_exact_prefix_match_same_direction() { + // Test that when the requested sort [a DESC] matches a prefix of the source's + // natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown). let schema = schema(); // Source has [a DESC, b ASC] ordering @@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() { let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - // Request [a DESC] - same direction as source, NOT a reverse prefix + // Request [a DESC] - same direction as source prefix, Sort should be eliminated let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); let plan = sort_exec(same_direction, source); @@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() { - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet output: Ok: - - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet " ); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 64a339009e9c..3a64137a2a3f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -743,19 +743,17 @@ impl FileSource for ParquetSource { /// /// With both pieces of information, ParquetSource can decide what optimizations to apply. /// - /// # Phase 1 Behavior (Current) - /// Returns `Inexact` when reversing the row group scan order would help satisfy the - /// requested ordering. We still need a Sort operator at a higher level because: - /// - We only reverse row group read order, not rows within row groups - /// - This provides approximate ordering that benefits limit pushdown - /// - /// # Phase 2 (Future) - /// Could return `Exact` when we can guarantee perfect ordering through techniques like: - /// - File reordering based on statistics - /// - Detecting already-sorted data - /// This would allow removing the Sort operator entirely. + /// # Behavior + /// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already + /// satisfies the requested ordering. This allows the Sort operator to be eliminated + /// if the files within each group are also non-overlapping (checked by FileScanConfig). + /// - Returns `Inexact` when reversing the row group scan order would help satisfy the + /// requested ordering. We still need a Sort operator at a higher level because: + /// - We only reverse row group read order, not rows within row groups + /// - This provides approximate ordering that benefits limit pushdown /// /// # Returns + /// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed) /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order /// - `Unsupported`: Cannot optimize for this ordering fn try_pushdown_sort( @@ -767,6 +765,16 @@ impl FileSource for ParquetSource { return Ok(SortOrderPushdownResult::Unsupported); } + // Check if the natural (non-reversed) ordering already satisfies the request. + // Parquet metadata guarantees within-file ordering, so if the ordering matches + // we can return Exact. FileScanConfig will verify that files within each group + // are non-overlapping before declaring the entire scan as Exact. + if eq_properties.ordering_satisfy(order.iter().cloned())? { + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }); + } + // Build new equivalence properties with the reversed ordering. // This allows us to check if the reversed ordering satisfies the request // by leveraging: @@ -811,11 +819,6 @@ impl FileSource for ParquetSource { Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) - - // TODO Phase 2: Add support for other optimizations: - // - File reordering based on min/max statistics - // - Detection of exact ordering (return Exact to remove Sort operator) - // - Partial sort pushdown for prefix matches } fn apply_expressions( diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 78da70402f93..976e1158f5eb 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -27,6 +27,7 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, @@ -900,28 +901,72 @@ impl DataSource for FileScanConfig { } } + /// Push sort requirements into file-based data sources. + /// + /// # Sort Pushdown Architecture + /// + /// When a partition (file group) contains multiple files in wrong order, + /// `validated_output_ordering()` strips the ordering and `EnforceSorting` + /// inserts a `SortExec`. This optimizer fixes the file order by sorting + /// files within each group by min/max statistics, enabling sort elimination. + /// + /// This applies to both single-partition and multi-partition plans — any + /// file group with multiple files in wrong order benefits. + /// + /// ```text + /// PushdownSort optimizer finds SortExec + /// │ + /// ▼ + /// FileScanConfig::try_pushdown_sort() + /// │ + /// ├─► FileSource returns Exact + /// │ (natural ordering satisfies request) + /// │ → rebuild_with_source: sort files by stats, verify non-overlapping + /// │ → SortExec removed, fetch (LIMIT) pushed to DataSourceExec + /// │ + /// ├─► FileSource returns Inexact + /// │ (reverse_row_groups=true) + /// │ → SortExec kept, scan optimized + /// │ + /// └─► FileSource returns Unsupported + /// (ordering stripped because files in wrong order) + /// → try_sort_file_groups_by_statistics(): + /// 1. Sort files within each group by min/max statistics + /// 2. Re-check: non-overlapping + ordering valid? + /// YES → Exact → SortExec removed + /// NO → Inexact (files reordered, Sort stays) + /// ``` fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], ) -> Result>> { - // Delegate to FileSource to see if it can optimize for the requested ordering. let pushdown_result = self .file_source .try_pushdown_sort(order, &self.eq_properties())?; match pushdown_result { SortOrderPushdownResult::Exact { inner } => { - Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true, order)?, - }) + let config = self.rebuild_with_source(inner, true, order)?; + // rebuild_with_source keeps output_ordering only when all groups + // are non-overlapping. If output_ordering was cleared, files + // overlap despite within-file ordering → downgrade to Inexact. + if config.output_ordering.is_empty() { + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(config), + }) + } else { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(config), + }) + } } SortOrderPushdownResult::Inexact { inner } => { Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false, order)?, + inner: Arc::new(self.rebuild_with_source(inner, false, order)?), }) } SortOrderPushdownResult::Unsupported => { - Ok(SortOrderPushdownResult::Unsupported) + self.try_sort_file_groups_by_statistics(order) } } } @@ -947,6 +992,13 @@ impl DataSource for FileScanConfig { } } +/// Result of sorting files within groups by their min/max statistics. +struct SortedFileGroups { + file_groups: Vec, + any_reordered: bool, + all_non_overlapping: bool, +} + impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. @@ -1247,26 +1299,42 @@ impl FileScanConfig { &self.file_source } - /// Helper: Rebuild FileScanConfig with new file source + /// Rebuild FileScanConfig after sort pushdown, applying file-level optimizations. + /// + /// This is the core of sort pushdown for file-based sources. It performs + /// three optimizations depending on the pushdown result: + /// + /// ```text + /// ┌─────────────────────────────────────────────────────────────┐ + /// │ rebuild_with_source │ + /// │ │ + /// │ 1. Reverse file groups (if DESC matches reversed ordering) │ + /// │ 2. Sort files within groups by min/max statistics │ + /// │ 3. If Exact + non-overlapping: │ + /// │ Keep output_ordering → SortExec eliminated │ + /// │ Otherwise: clear output_ordering → SortExec stays │ + /// └─────────────────────────────────────────────────────────────┘ + /// ``` + /// + /// # Why sort files by statistics? + /// + /// Files within a partition (file group) are read sequentially. By sorting + /// them so that file_i.max <= file_{i+1}.min, the combined output stream + /// is already in order — no SortExec needed for that partition. + /// + /// Even when files overlap (Inexact), statistics-based ordering helps + /// TopK/LIMIT queries: reading low-value files first lets dynamic filters + /// prune high-value files earlier. fn rebuild_with_source( &self, new_file_source: Arc, is_exact: bool, order: &[PhysicalSortExpr], - ) -> Result> { + ) -> Result { let mut new_config = self.clone(); // Reverse file order (within each group) if the caller is requesting a reversal of this // scan's declared output ordering. - // - // Historically this function always reversed `file_groups` because it was only reached - // via `FileSource::try_reverse_output` (where a reversal was the only supported - // optimization). - // - // Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other - // optimizations may become possible (e.g. already-sorted data, statistics-based file - // reordering). Therefore we only reverse files when it is known to help satisfy the - // requested ordering. let reverse_file_groups = if self.output_ordering.is_empty() { false } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) { @@ -1293,13 +1361,273 @@ impl FileScanConfig { new_config.file_source = new_file_source; - // Phase 1: Clear output_ordering for Inexact - // (we're only reversing row groups, not guaranteeing perfect ordering) - if !is_exact { + // Sort files within groups by statistics when not reversing + let all_non_overlapping = if !reverse_file_groups { + if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let result = Self::sort_files_within_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + new_config.file_groups = result.file_groups; + result.all_non_overlapping + } else { + false + } + } else { + // When reversing, files are already reversed above. We skip + // statistics-based sorting here because it would undo the reversal. + // Note: reverse path is always Inexact, so all_non_overlapping + // is not used (is_exact is false). + false + }; + + if is_exact && all_non_overlapping { + // Truly exact: within-file ordering guaranteed and files are non-overlapping. + // Keep output_ordering so SortExec can be eliminated for each partition. + // + // We intentionally do NOT redistribute files across groups here. + // The planning-phase bin-packing may interleave file ranges across groups: + // + // Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1 + // Group 1: [f2(11-20), f4(31-40)] + // + // This interleaving is actually beneficial because SPM pulls from both + // partitions concurrently, keeping parallel I/O active: + // + // SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40] + // ^^^^^^^^^^^^ ^^^^^^^^^^^^ + // both partitions scanning files simultaneously + // + // If we were to redistribute files consecutively: + // Group 0: [f1(1-10), f2(11-20)] ← all values < group 1 + // Group 1: [f3(21-30), f4(31-40)] + // + // SPM would read ALL of group 0 first (values always smaller), then group 1. + // This degrades to single-threaded sequential I/O — the other partition + // sits idle the entire time, losing the parallelism benefit. + } else { new_config.output_ordering = vec![]; } - Ok(Arc::new(new_config)) + Ok(new_config) + } + + /// Sort files within each file group by their min/max statistics. + /// + /// No files are moved between groups — parallelism and group composition + /// are unchanged. Groups where statistics are unavailable are kept as-is. + /// + /// ```text + /// Before: Group [file_c(20-30), file_a(0-9), file_b(10-19)] + /// After: Group [file_a(0-9), file_b(10-19), file_c(20-30)] + /// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + /// sorted by min value, non-overlapping → Exact + /// ``` + fn sort_files_within_groups_by_statistics( + file_groups: &[FileGroup], + sort_order: &LexOrdering, + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + ) -> SortedFileGroups { + let mut any_reordered = false; + let mut confirmed_non_overlapping: usize = 0; + let mut new_groups = Vec::with_capacity(file_groups.len()); + + for group in file_groups { + if group.len() <= 1 { + new_groups.push(group.clone()); + confirmed_non_overlapping += 1; + continue; + } + + let files: Vec<_> = group.iter().collect(); + + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + log::trace!( + "Cannot sort file group by statistics: {e}. Keeping original order." + ); + new_groups.push(group.clone()); + continue; + } + }; + + let sorted_indices = statistics.min_values_sorted(); + + let already_sorted = sorted_indices + .iter() + .enumerate() + .all(|(pos, (idx, _))| pos == *idx); + + let sorted_group: FileGroup = if already_sorted { + group.clone() + } else { + any_reordered = true; + sorted_indices + .iter() + .map(|(idx, _)| files[*idx].clone()) + .collect() + }; + + let sorted_files: Vec<_> = sorted_group.iter().collect(); + let is_non_overlapping = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + sorted_files.iter().copied(), + ) { + Ok(stats) => stats.is_sorted(), + Err(_) => false, + }; + + if is_non_overlapping { + confirmed_non_overlapping += 1; + } + + new_groups.push(sorted_group); + } + + SortedFileGroups { + file_groups: new_groups, + any_reordered, + all_non_overlapping: confirmed_non_overlapping == file_groups.len(), + } + } + + /// Last-resort optimization when FileSource returns `Unsupported`. + /// + /// FileSource may return `Unsupported` because `eq_properties` had no + /// ordering — which happens when `validated_output_ordering()` stripped + /// the ordering because files were in the wrong order. After sorting + /// files by statistics, the ordering may become valid again. + /// + /// This method: + /// 1. Sorts files within groups by min/max statistics + /// 2. Re-checks if the sorted file order makes `output_ordering` valid + /// 3. If valid AND non-overlapping → `Exact` (SortExec eliminated!) + /// 4. If files were reordered but ordering not valid → `Inexact` + /// 5. If no files were reordered → `Unsupported` + /// + /// This handles the key case where files have correct within-file ordering + /// (e.g., Parquet sorting_columns metadata) but were listed in wrong order + /// (e.g., alphabetical order doesn't match sort key order). + fn try_sort_file_groups_by_statistics( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else { + return Ok(SortOrderPushdownResult::Unsupported); + }; + + let projected_schema = self.projected_schema()?; + let projection_indices = self + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + + let result = Self::sort_files_within_groups_by_statistics( + &self.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + + if !result.any_reordered { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let mut new_config = self.clone(); + new_config.file_groups = result.file_groups; + + // Re-check: now that files are sorted, does output_ordering become valid? + // This handles the case where validated_output_ordering() previously + // stripped the ordering because files were in the wrong order. + // + // IMPORTANT: We cannot claim Exact if any file in a non-last position + // contains NULLs in the sort columns. With NULLS LAST, NULLs within + // a file are placed after all non-null values. If the next file has + // non-null values smaller than the previous file's max, those values + // would incorrectly appear after the NULLs. Similarly for NULLS FIRST. + // + // Conservative approach: if any file has nulls in the sort columns, + // do not claim Exact. The SortExec will handle NULL ordering correctly. + if result.all_non_overlapping + && !self.output_ordering.is_empty() + && !Self::any_file_has_nulls_in_sort_columns( + &new_config.file_groups, + order, + &projected_schema, + projection_indices.as_deref(), + ) + { + // Files are now non-overlapping, no NULLs in sort columns. + // Re-ask the FileSource if this ordering satisfies the request, + // using eq_properties computed from the NEW (sorted) file groups. + let new_eq_props = new_config.eq_properties(); + if new_eq_props.ordering_satisfy(order.iter().cloned())? { + // The sorted file order makes the ordering valid → Exact! + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(new_config), + }); + } + } + + new_config.output_ordering = vec![]; + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(new_config), + }) + } + + /// Check if any file in any group has nulls in the sort columns. + fn any_file_has_nulls_in_sort_columns( + file_groups: &[FileGroup], + order: &[PhysicalSortExpr], + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + ) -> bool { + let Some(sort_columns) = + sort_columns_from_physical_sort_exprs_nullable(order, projected_schema) + else { + return true; // Can't determine, assume nulls exist + }; + + for group in file_groups { + for file in group.iter() { + let Some(stats) = file.statistics.as_ref() else { + return true; // No stats, assume nulls exist + }; + for col in &sort_columns { + let stat_idx = projection_indices + .map(|p| p[col.index()]) + .unwrap_or_else(|| col.index()); + if stat_idx >= stats.column_statistics.len() { + return true; + } + let col_stats = &stats.column_statistics[stat_idx]; + match &col_stats.null_count { + Precision::Exact(0) => {} // No nulls, safe + Precision::Exact(_) => return true, // Has nulls + _ => return true, // Unknown null count, assume nulls + } + } + } + } + false } } @@ -1356,6 +1684,17 @@ fn ordered_column_indices_from_projection( .collect::>>() } +/// Extract Column references from sort expressions for null checking. +fn sort_columns_from_physical_sort_exprs_nullable( + order: &[PhysicalSortExpr], + _schema: &SchemaRef, +) -> Option> { + order + .iter() + .map(|expr| expr.expr.as_any().downcast_ref::().cloned()) + .collect() +} + /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -1460,6 +1799,22 @@ fn validate_orderings( /// /// DataSourceExec /// ``` +/// +/// **Exception**: When files within a partition are **non-overlapping** (verified +/// via min/max statistics) and each file is internally sorted, the combined +/// output is still correctly sorted. Sort pushdown +/// ([`FileScanConfig::try_pushdown_sort`]) detects this case and preserves +/// `output_ordering`, allowing `SortExec` to be eliminated entirely. +/// +/// ```text +/// Partition 1 (files sorted by stats, non-overlapping): +/// ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ +/// │ 1.parquet │ │ 2.parquet │ │ 3.parquet │ +/// │ A: [1..100] │ │ A: [101..200] │ │ A: [201..300] │ +/// │ Sort: A, B, C │ │ Sort: A, B, C │ │ Sort: A, B, C │ +/// └──────────────────┘ └──────────────────┘ └──────────────────┘ +/// max(1) <= min(2) ✓ max(2) <= min(3) ✓ → output_ordering preserved +/// ``` fn get_projected_output_ordering( base_config: &FileScanConfig, projected_schema: &SchemaRef, @@ -2611,4 +2966,626 @@ mod tests { Ok(()) } + + fn make_file_with_stats(name: &str, min: f64, max: f64) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + #[derive(Clone)] + struct ExactSortPushdownSource { + metrics: ExecutionPlanMetricsSet, + table_schema: TableSchema, + } + + impl ExactSortPushdownSource { + fn new(table_schema: TableSchema) -> Self { + Self { + metrics: ExecutionPlanMetricsSet::new(), + table_schema, + } + } + } + + impl FileSource for ExactSortPushdownSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "mock_exact" + } + + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + _eq_properties: &EquivalenceProperties, + ) -> Result>> { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }) + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + } + + #[test] + fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!(matches!(result, SortOrderPushdownResult::Unsupported)); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }, + ); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file3"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file1"); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 15.0), + make_file_with_stats("file2", 10.0, 25.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact (downgraded), got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_out_of_order_returns_exact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![make_file_with_stats("file1", 0.0, 9.0)]), + FileGroup::new(vec![make_file_with_stats("file2", 10.0, 19.0)]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Unsupported), + "Expected Unsupported for single-file groups" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("file_d", 30.0, 39.0), + make_file_with_stats("file_c", 20.0, 29.0), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_c"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_d"); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + FileGroup::new(vec![ + PartitionedFile::new("file_d".to_string(), 1024), + PartitionedFile::new("file_c".to_string(), 1024), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_d"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_c"); + Ok(()) + } + + #[test] + fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file1", 0.0, 9.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_multi_group_preserves_parallelism() -> Result<()> { + // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups. + // Groups should NOT be redistributed — interleaved groups allow SPM to + // pull from both partitions concurrently, keeping parallel I/O active. + // Redistributing consecutively would make SPM read one partition at a + // time (all values in group 0 < group 1), degrading to single-threaded I/O. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // 2 groups with interleaved ranges (simulating bin-packing result): + // Group 0: [file_01(0-9), file_03(20-29)] + // Group 1: [file_02(10-19), file_04(30-39)] + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_01", 0.0, 9.0), + make_file_with_stats("file_03", 20.0, 29.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("file_02", 10.0, 19.0), + make_file_with_stats("file_04", 30.0, 39.0), + ]), + ]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // 2 groups preserved (parallelism maintained) + assert_eq!(pushed_config.file_groups.len(), 2); + + // Files within each group are sorted by stats, but groups are NOT + // redistributed — interleaved assignment from bin-packing is kept + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_01"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_03"); + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_02"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_04"); + + // output_ordering preserved (Exact, each group internally non-overlapping) + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> { + // Reverse scan should reverse file order but NOT apply statistics-based + // sorting (which would undo the reversal). The result is Inexact. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Files with stats, in ASC order. Output ordering is [a ASC]. + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + // Request DESC → reverse path + let result = config.try_pushdown_sort(&[sort_expr.reverse()])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact for reverse scan, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Files should be reversed (not re-sorted by stats) + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file3"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file1"); + + // output_ordering cleared (Inexact) + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + /// Helper: create a PartitionedFile with stats including null count + fn make_file_with_null_stats( + name: &str, + min: f64, + max: f64, + null_count: usize, + ) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(null_count), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + #[test] + fn sort_pushdown_unsupported_with_nulls_does_not_upgrade_to_exact() -> Result<()> { + // Files are non-overlapping but one has NULLs. + // Should NOT upgrade to Exact — NULLs would appear in wrong position. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Files in wrong order (high min first) to trigger reordering + let file_groups = vec![FileGroup::new(vec![ + make_file_with_null_stats("b_no_nulls", 10.0, 19.0, 0), + make_file_with_null_stats("a_with_nulls", 0.0, 9.0, 5), // has NULLs + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + // Should be Inexact (not Exact) because of NULLs + assert!( + matches!(result, SortOrderPushdownResult::Inexact { .. }), + "Expected Inexact due to NULLs, got {result:?}" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_no_nulls_upgrades_to_exact() -> Result<()> { + // Files are non-overlapping, no NULLs → should upgrade to Exact + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_null_stats("b_high", 10.0, 19.0, 0), + make_file_with_null_stats("a_low", 0.0, 9.0, 0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Exact { .. }), + "Expected Exact (no NULLs), got {result:?}" + ); + Ok(()) + } } diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 20c53ef31645..308e91d0df14 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -35,19 +35,24 @@ //! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK) //! - `Unsupported`: No change //! -//! ## Current capabilities (Phase 1) +//! ## Capabilities //! -//! - Reverse scan optimization: when required sort is the reverse of the data source's +//! - **Sort elimination**: when a data source's natural ordering satisfies the +//! request, return `Exact` and remove the `SortExec` entirely. Preserves +//! `fetch` (LIMIT) from the eliminated `SortExec` for early termination. +//! - **Statistics-based file sorting**: sort files within each partition by +//! min/max statistics. When files are non-overlapping but listed in wrong +//! order (e.g., alphabetical order ≠ sort key order), this fixes the ordering +//! and enables sort elimination. Works for both single-partition and +//! multi-partition plans with multi-file groups. +//! - **Reverse scan optimization**: when required sort is the reverse of the data source's //! natural ordering, enable reverse scanning (reading row groups in reverse order) -//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs -//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement +//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs +//! [A DESC], the existing ordering satisfies the requirement (`Exact`). +//! If the query needs [A ASC] (reverse of the prefix), a reverse scan is +//! used (`Inexact`, `SortExec` retained) //! -//! TODO Issue: -//! ## Future enhancements (Phase 2), -//! -//! - File reordering based on statistics -//! - Return `Exact` when files are known to be perfectly sorted -//! - Complete Sort elimination when ordering is guaranteed +//! Related issue: use crate::PhysicalOptimizerRule; use datafusion_common::Result; @@ -55,9 +60,25 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::SortOrderPushdownResult; +use datafusion_physical_plan::buffer::BufferExec; use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use std::sync::Arc; +/// Per-partition buffer capacity (in bytes) inserted between SPM and +/// DataSourceExec when sort elimination removes the buffering SortExec. +/// +/// SortExec buffers all input data in memory (potentially GB per partition) +/// before outputting sorted results. When we eliminate SortExec, SPM reads +/// directly from I/O-bound sources. BufferExec compensates with bounded +/// buffering, allowing I/O to pipeline with merge computation. +/// +/// This is strictly less memory than the SortExec it replaces, and only +/// inserted when PushdownSort eliminates a SortExec — no impact on other +/// query plans. BufferExec also integrates with MemoryPool, so it respects +/// the global memory limit and won't cause OOM. +const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB + /// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. /// /// See module-level documentation for details. @@ -82,8 +103,54 @@ impl PhysicalOptimizerRule for PushdownSort { } // Use transform_down to find and optimize all SortExec nodes (including nested ones) + // Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated plan.transform_down(|plan: Arc| { - // Check if this is a SortExec + // Pattern 1: SPM → SortExec(preserve_partitioning) + // When we eliminate the SortExec, SPM loses its memory buffer and reads + // directly from I/O-bound sources. Insert a BufferExec to compensate. + if let Some(spm) = plan.downcast_ref::() + && let Some(sort_child) = spm.input().downcast_ref::() + && sort_child.preserve_partitioning() + { + let sort_input = Arc::clone(sort_child.input()); + let required_ordering = sort_child.expr(); + match sort_input.try_pushdown_sort(required_ordering)? { + SortOrderPushdownResult::Exact { inner } => { + let inner = if let Some(fetch) = sort_child.fetch() { + inner.with_fetch(Some(fetch)).unwrap_or(inner) + } else { + inner + }; + // Insert BufferExec to replace SortExec's buffering role. + // SortExec buffered all data in memory; BufferExec provides + // bounded buffering so SPM doesn't stall on I/O. + let buffered: Arc = Arc::new(BufferExec::new( + inner, + BUFFER_CAPACITY_AFTER_SORT_ELIMINATION, + )); + let new_spm = + SortPreservingMergeExec::new(spm.expr().clone(), buffered) + .with_fetch(spm.fetch()); + return Ok(Transformed::yes(Arc::new(new_spm))); + } + SortOrderPushdownResult::Inexact { inner } => { + let new_sort = SortExec::new(required_ordering.clone(), inner) + .with_fetch(sort_child.fetch()) + .with_preserve_partitioning(true); + let new_spm = SortPreservingMergeExec::new( + spm.expr().clone(), + Arc::new(new_sort), + ) + .with_fetch(spm.fetch()); + return Ok(Transformed::yes(Arc::new(new_spm))); + } + SortOrderPushdownResult::Unsupported => { + return Ok(Transformed::no(plan)); + } + } + } + + // Pattern 2: Standalone SortExec (no SPM parent) let Some(sort_exec) = plan.downcast_ref::() else { return Ok(Transformed::no(plan)); }; @@ -95,7 +162,14 @@ impl PhysicalOptimizerRule for PushdownSort { // Each node type defines its own pushdown behavior via try_pushdown_sort() match sort_input.try_pushdown_sort(required_ordering)? { SortOrderPushdownResult::Exact { inner } => { - // Data source guarantees perfect ordering - remove the Sort operator + // Data source guarantees perfect ordering - remove the Sort operator. + // Preserve the fetch (LIMIT) from the original SortExec so the + // data source can stop reading early. + let inner = if let Some(fetch) = sort_exec.fetch() { + inner.with_fetch(Some(fetch)).unwrap_or(inner) + } else { + inner + }; Ok(Transformed::yes(inner)) } SortOrderPushdownResult::Inexact { inner } => { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b77cced35504..13c28ccb1099 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -250,7 +250,7 @@ impl ExecutionPlan for SortPreservingMergeExec { metrics: self.metrics.clone(), fetch: limit, cache: Arc::clone(&self.cache), - enable_round_robin_repartition: true, + enable_round_robin_repartition: self.enable_round_robin_repartition, })) } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index c6d5df0d0dfe..e9d4e221e1dd 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1100,16 +1100,16 @@ CREATE EXTERNAL TABLE reversed_parquet(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/reversed/'; -# Test 4.1: SortExec must be present because files are not in inter-file order +# Test 4.1: PushdownSort reorders files by min/max statistics so they are +# already in correct sort order → non-overlapping → no SortExec needed. +# (files reordered from [a_high, b_mid, c_low] to [c_low, b_mid, a_high]) query TT EXPLAIN SELECT * FROM reversed_parquet ORDER BY id ASC; ---- logical_plan 01)Sort: reversed_parquet.id ASC NULLS LAST 02)--TableScan: reversed_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 4.2: Results must be correct query II @@ -1184,16 +1184,15 @@ STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/reversed/' WITH ORDER (id ASC); -# Test 6.1: SortExec must be present despite WITH ORDER +# Test 6.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# WITH ORDER declared + files reordered to correct order query TT EXPLAIN SELECT * FROM reversed_with_order_parquet ORDER BY id ASC; ---- logical_plan 01)Sort: reversed_with_order_parquet.id ASC NULLS LAST 02)--TableScan: reversed_with_order_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 6.2: Results must be correct query II @@ -1322,17 +1321,15 @@ STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/desc_reversed/' WITH ORDER (id DESC); -# Test 8.1: SortExec must be present — files are in wrong inter-file DESC order -# (a_low has 1-3, b_high has 7-9; for DESC, b_high should come first) +# Test 8.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# (files reordered: b_high(7-9) before a_low(1-3) for DESC order) query TT EXPLAIN SELECT * FROM desc_reversed_parquet ORDER BY id DESC; ---- logical_plan 01)Sort: desc_reversed_parquet.id DESC NULLS FIRST 02)--TableScan: desc_reversed_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], output_ordering=[id@0 DESC], file_type=parquet # Test 8.2: Results must be correct query II @@ -1775,7 +1772,7 @@ logical_plan 02)--TableScan: tb_overlap projection=[id, value] physical_plan 01)SortExec: TopK(fetch=5), expr=[id@0 DESC, value@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] query II SELECT * FROM tb_overlap ORDER BY id DESC, value DESC LIMIT 5; @@ -2092,6 +2089,188 @@ DROP TABLE te_src_c; statement ok DROP TABLE te_inferred_multi; +# =========================================================== +# Test F: NULL handling — sort pushdown must not eliminate Sort +# when files contain NULLs in sort columns, because NULL ordering +# (NULLS FIRST/LAST) across files requires a full sort. +# =========================================================== + +# Test F.1: NULLS LAST — file with NULL must not cause wrong ordering +statement ok +CREATE TABLE null_src_a(id INT) AS VALUES (1), (NULL); + +statement ok +CREATE TABLE null_src_b(id INT) AS VALUES (2), (3); + +query I +COPY (SELECT * FROM null_src_a ORDER BY id ASC NULLS LAST) +TO 'test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet'; +---- +2 + +query I +COPY (SELECT * FROM null_src_b ORDER BY id ASC NULLS LAST) +TO 'test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet'; +---- +2 + +statement ok +CREATE EXTERNAL TABLE tf_nulls_last(id INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tf_nulls/' +WITH ORDER (id ASC NULLS LAST); + +# With target_partitions=1, files end up in separate groups via +# split_groups_by_statistics. EnforceSorting eliminates SortExec, +# SPM merges the two sorted streams. +query TT +EXPLAIN SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST; +---- +logical_plan +01)Sort: tf_nulls_last.id ASC NULLS LAST +02)--TableScan: tf_nulls_last projection=[id] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Results must be correct: NULLs at the end +query I +SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST; +---- +1 +2 +3 +NULL + +# Test F.2: NULLS FIRST — NULLs should come first +query I +SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS FIRST; +---- +NULL +1 +2 +3 + +# Cleanup Test F +statement ok +DROP TABLE null_src_a; + +statement ok +DROP TABLE null_src_b; + +statement ok +DROP TABLE tf_nulls_last; + +# =========================================================== +# Test G: BufferExec insertion when sort elimination removes +# SortExec under SortPreservingMergeExec. +# +# When PushdownSort eliminates SortExec(preserve_partitioning=true), +# SPM loses SortExec's memory buffer. A BufferExec is inserted to +# compensate, allowing I/O pipelining with merge computation. +# =========================================================== + +# Create files with reversed naming: c_low has smallest values, +# a_high has largest — alphabetical order ≠ sort key order. +statement ok +CREATE TABLE tg_src_low(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE tg_src_mid(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600); + +statement ok +CREATE TABLE tg_src_high(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000); + +query I +COPY (SELECT * FROM tg_src_low ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet'; +---- +3 + +query I +COPY (SELECT * FROM tg_src_mid ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet'; +---- +3 + +query I +COPY (SELECT * FROM tg_src_high ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet'; +---- +4 + +# Use target_partitions=2 so files are split across 2 groups. +# Files are in wrong alphabetical order → validated_output_ordering strips ordering +# → EnforceSorting adds SortExec(preserve_partitioning) + SPM +# → PushdownSort eliminates SortExec and inserts BufferExec +statement ok +SET datafusion.execution.target_partitions = 2; + +statement ok +CREATE EXTERNAL TABLE tg_buffer(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tg_buffer/' +WITH ORDER (id ASC); + +# Test G.1: BufferExec appears between SPM and DataSourceExec +query TT +EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC; +---- +logical_plan +01)Sort: tg_buffer.id ASC NULLS LAST +02)--TableScan: tg_buffer projection=[id, value] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--BufferExec: capacity=67108864 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Verify correctness +query II +SELECT * FROM tg_buffer ORDER BY id ASC; +---- +1 100 +2 200 +3 300 +4 400 +5 500 +6 600 +7 700 +8 800 +9 900 +10 1000 + +# Test G.2: LIMIT query with BufferExec +query TT +EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: tg_buffer.id ASC NULLS LAST, fetch=3 +02)--TableScan: tg_buffer projection=[id, value] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 +02)--BufferExec: capacity=67108864 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3; +---- +1 100 +2 200 +3 300 + +# Cleanup Test G +statement ok +DROP TABLE tg_src_low; + +statement ok +DROP TABLE tg_src_mid; + +statement ok +DROP TABLE tg_src_high; + +statement ok +DROP TABLE tg_buffer; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4;