Skip to content

Commit 3700464

Browse files
committed
feat: reorder row groups by statistics during sort pushdown
When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes #21317
1 parent d68373e commit 3700464

3 files changed

Lines changed: 355 additions & 2 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
// under the License.
1717

1818
use crate::sort::reverse_row_selection;
19+
use arrow::datatypes::Schema;
1920
use datafusion_common::{Result, assert_eq_or_internal_err};
21+
use datafusion_physical_expr::expressions::Column;
22+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
23+
use log::debug;
24+
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
2025
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
2126
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
2227

@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
377382
})
378383
}
379384

385+
/// Reorder row groups by their min statistics for the given sort order.
386+
///
387+
/// This helps TopK queries find optimal values first. For ASC sort,
388+
/// row groups with the smallest min values come first. For DESC sort,
389+
/// row groups with the largest min values come first.
390+
///
391+
/// Gracefully skips reordering when:
392+
/// - There is a row_selection (too complex to remap)
393+
/// - 0 or 1 row groups (nothing to reorder)
394+
/// - Sort expression is not a simple column reference
395+
/// - Statistics are unavailable
396+
pub(crate) fn reorder_by_statistics(
397+
mut self,
398+
sort_order: &LexOrdering,
399+
file_metadata: &ParquetMetaData,
400+
arrow_schema: &Schema,
401+
) -> Result<Self> {
402+
// Skip if row_selection present (too complex to remap)
403+
if self.row_selection.is_some() {
404+
debug!("Skipping RG reorder: row_selection present");
405+
return Ok(self);
406+
}
407+
408+
// Nothing to reorder
409+
if self.row_group_indexes.len() <= 1 {
410+
return Ok(self);
411+
}
412+
413+
// Get the first sort expression
414+
// LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr
415+
let first_sort_expr = sort_order.first();
416+
417+
// Extract column name from sort expression
418+
let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::<Column>()
419+
{
420+
Some(col) => col,
421+
None => {
422+
debug!("Skipping RG reorder: sort expr is not a simple column");
423+
return Ok(self);
424+
}
425+
};
426+
427+
let descending = first_sort_expr.options.descending;
428+
429+
// Build statistics converter for this column
430+
let converter = match StatisticsConverter::try_new(
431+
column.name(),
432+
arrow_schema,
433+
file_metadata.file_metadata().schema_descr(),
434+
) {
435+
Ok(c) => c,
436+
Err(e) => {
437+
debug!("Skipping RG reorder: cannot create stats converter: {e}");
438+
return Ok(self);
439+
}
440+
};
441+
442+
// Get min values for the selected row groups
443+
let rg_metadata: Vec<&RowGroupMetaData> = self
444+
.row_group_indexes
445+
.iter()
446+
.map(|&idx| file_metadata.row_group(idx))
447+
.collect();
448+
449+
let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
450+
Ok(vals) => vals,
451+
Err(e) => {
452+
debug!("Skipping RG reorder: cannot get min values: {e}");
453+
return Ok(self);
454+
}
455+
};
456+
457+
// Sort indices by min values
458+
let sort_options = arrow::compute::SortOptions {
459+
descending,
460+
nulls_first: first_sort_expr.options.nulls_first,
461+
};
462+
let sorted_indices = match arrow::compute::sort_to_indices(
463+
&min_values,
464+
Some(sort_options),
465+
None,
466+
) {
467+
Ok(indices) => indices,
468+
Err(e) => {
469+
debug!("Skipping RG reorder: sort failed: {e}");
470+
return Ok(self);
471+
}
472+
};
473+
474+
// Apply the reordering
475+
let original_indexes = self.row_group_indexes.clone();
476+
self.row_group_indexes = sorted_indices
477+
.values()
478+
.iter()
479+
.map(|&i| original_indexes[i as usize])
480+
.collect();
481+
482+
Ok(self)
483+
}
484+
380485
/// Reverse the access plan for reverse scanning
381486
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
382487
// Get the row group indexes before reversing
@@ -614,4 +719,225 @@ mod test {
614719
.unwrap();
615720
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
616721
}
722+
723+
// ---- reorder_by_statistics tests ----
724+
725+
use arrow::datatypes::{DataType, Field, Schema};
726+
use datafusion_physical_expr::expressions::Column;
727+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
728+
use parquet::basic::Type as PhysicalType;
729+
use parquet::file::metadata::FileMetaData;
730+
use parquet::file::statistics::Statistics;
731+
use parquet::schema::types::Type as SchemaType;
732+
733+
/// Create ParquetMetaData with row groups that have Int32 min/max stats
734+
fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData {
735+
let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
736+
.build()
737+
.unwrap();
738+
let schema = SchemaType::group_type_builder("schema")
739+
.with_fields(vec![Arc::new(field)])
740+
.build()
741+
.unwrap();
742+
let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema)));
743+
744+
let row_groups: Vec<RowGroupMetaData> = min_max_pairs
745+
.iter()
746+
.map(|(min, max)| {
747+
let stats =
748+
Statistics::int32(Some(*min), Some(*max), None, Some(100), false);
749+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
750+
.set_num_values(100)
751+
.set_statistics(stats)
752+
.build()
753+
.unwrap();
754+
RowGroupMetaData::builder(schema_descr.clone())
755+
.set_num_rows(100)
756+
.set_column_metadata(vec![column])
757+
.build()
758+
.unwrap()
759+
})
760+
.collect();
761+
762+
let file_meta = FileMetaData::new(
763+
1,
764+
min_max_pairs.len() as i64 * 100,
765+
None,
766+
None,
767+
schema_descr,
768+
None,
769+
);
770+
ParquetMetaData::new(file_meta, row_groups)
771+
}
772+
773+
fn make_sort_order_asc() -> LexOrdering {
774+
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
775+
"id", 0,
776+
)))])
777+
.unwrap()
778+
}
779+
780+
fn make_sort_order_desc() -> LexOrdering {
781+
use arrow::compute::SortOptions;
782+
LexOrdering::new(vec![PhysicalSortExpr::new(
783+
Arc::new(Column::new("id", 0)),
784+
SortOptions {
785+
descending: true,
786+
nulls_first: false,
787+
},
788+
)])
789+
.unwrap()
790+
}
791+
792+
fn make_arrow_schema() -> Schema {
793+
Schema::new(vec![Field::new("id", DataType::Int32, false)])
794+
}
795+
796+
#[test]
797+
fn test_reorder_by_statistics_asc() {
798+
// RGs in wrong order: [50-99, 200-299, 1-30]
799+
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
800+
let schema = make_arrow_schema();
801+
let sort_order = make_sort_order_asc();
802+
803+
let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
804+
let plan = plan
805+
.reorder_by_statistics(&sort_order, &metadata, &schema)
806+
.unwrap();
807+
808+
// Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299)
809+
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
810+
}
811+
812+
#[test]
813+
fn test_reorder_by_statistics_desc() {
814+
// RGs: [50-99, 200-299, 1-30]
815+
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
816+
let schema = make_arrow_schema();
817+
let sort_order = make_sort_order_desc();
818+
819+
let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
820+
let plan = plan
821+
.reorder_by_statistics(&sort_order, &metadata, &schema)
822+
.unwrap();
823+
824+
// DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30)
825+
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
826+
}
827+
828+
#[test]
829+
fn test_reorder_by_statistics_single_rg() {
830+
let metadata = make_metadata_with_stats(&[(1, 100)]);
831+
let schema = make_arrow_schema();
832+
let sort_order = make_sort_order_asc();
833+
834+
let plan = PreparedAccessPlan::new(vec![0], None).unwrap();
835+
let plan = plan
836+
.reorder_by_statistics(&sort_order, &metadata, &schema)
837+
.unwrap();
838+
839+
// Single RG, no reorder
840+
assert_eq!(plan.row_group_indexes, vec![0]);
841+
}
842+
843+
#[test]
844+
fn test_reorder_by_statistics_with_skipped_rgs() {
845+
// 4 RGs but only 0, 2, 3 are selected (RG1 was pruned)
846+
let metadata =
847+
make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]);
848+
let schema = make_arrow_schema();
849+
let sort_order = make_sort_order_asc();
850+
851+
let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap();
852+
let plan = plan
853+
.reorder_by_statistics(&sort_order, &metadata, &schema)
854+
.unwrap();
855+
856+
// Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400)
857+
assert_eq!(plan.row_group_indexes, vec![2, 3, 0]);
858+
}
859+
860+
#[test]
861+
fn test_reorder_by_statistics_skips_with_row_selection() {
862+
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
863+
let schema = make_arrow_schema();
864+
let sort_order = make_sort_order_asc();
865+
866+
let selection = RowSelection::from(vec![
867+
RowSelector::select(50),
868+
RowSelector::skip(50),
869+
RowSelector::select(100),
870+
]);
871+
872+
let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap();
873+
let plan = plan
874+
.reorder_by_statistics(&sort_order, &metadata, &schema)
875+
.unwrap();
876+
877+
// Should NOT reorder because row_selection is present
878+
assert_eq!(plan.row_group_indexes, vec![0, 1]);
879+
}
880+
881+
#[test]
882+
fn test_reorder_by_statistics_already_sorted() {
883+
// Already in correct ASC order
884+
let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]);
885+
let schema = make_arrow_schema();
886+
let sort_order = make_sort_order_asc();
887+
888+
let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
889+
let plan = plan
890+
.reorder_by_statistics(&sort_order, &metadata, &schema)
891+
.unwrap();
892+
893+
// Already sorted, order preserved
894+
assert_eq!(plan.row_group_indexes, vec![0, 1, 2]);
895+
}
896+
897+
#[test]
898+
fn test_reorder_by_statistics_skips_non_column_expr() {
899+
use datafusion_expr::Operator;
900+
use datafusion_physical_expr::expressions::BinaryExpr;
901+
902+
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
903+
let schema = make_arrow_schema();
904+
905+
// Sort expression is a binary expression (id + 1), not a simple column
906+
let expr = Arc::new(BinaryExpr::new(
907+
Arc::new(Column::new("id", 0)),
908+
Operator::Plus,
909+
Arc::new(datafusion_physical_expr::expressions::Literal::new(
910+
datafusion_common::ScalarValue::Int32(Some(1)),
911+
)),
912+
));
913+
let sort_order =
914+
LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap();
915+
916+
let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
917+
let plan = plan
918+
.reorder_by_statistics(&sort_order, &metadata, &schema)
919+
.unwrap();
920+
921+
// Should NOT reorder because sort expr is not a simple column
922+
assert_eq!(plan.row_group_indexes, vec![0, 1]);
923+
}
924+
925+
#[test]
926+
fn test_reorder_by_statistics_skips_missing_column() {
927+
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
928+
// Schema has "id" but sort order references "nonexistent"
929+
let schema = make_arrow_schema();
930+
let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
931+
Column::new("nonexistent", 99),
932+
))])
933+
.unwrap();
934+
935+
let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
936+
let plan = plan
937+
.reorder_by_statistics(&sort_order, &metadata, &schema)
938+
.unwrap();
939+
940+
// Should NOT reorder because column not found in schema
941+
assert_eq!(plan.row_group_indexes, vec![0, 1]);
942+
}
617943
}

0 commit comments

Comments
 (0)