Skip to content

Commit bccc42b

Browse files
committed
feat: enable file reorder and RG reorder for all TopK queries
Previously file reorder and RG reorder only worked with sort pushdown (Inexact path, WITH ORDER). Now they extract sort info from DynamicFilterPhysicalExpr.sort_options in the predicate, which is set by SortExec for ALL TopK queries regardless of WITH ORDER. This means ORDER BY col DESC LIMIT K on any parquet table benefits from file reorder (best file first in shared queue), RG reorder (best RG first within file), and stats init (threshold before I/O).
1 parent 3996178 commit bccc42b

2 files changed

Lines changed: 141 additions & 23 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,17 +1165,41 @@ impl RowGroupsPrunedParquetOpen {
11651165
//
11661166
// For sorted data: reorder is a no-op, reverse gives perfect DESC.
11671167
// For unsorted data: reorder fixes the order, reverse flips for DESC.
1168+
// Build reorder optimizer from sort_order_for_reorder (Inexact path)
1169+
// or from DynamicFilterPhysicalExpr sort_options (any TopK query).
11681170
let reorder_optimizer: Option<
11691171
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1170-
> = prepared.sort_order_for_reorder.as_ref().map(|sort_order| {
1171-
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1172-
sort_order.clone(),
1173-
)) as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1174-
});
1172+
> = if let Some(sort_order) = &prepared.sort_order_for_reorder {
1173+
Some(
1174+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1175+
sort_order.clone(),
1176+
))
1177+
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1178+
)
1179+
} else if let Some(predicate) = &prepared.predicate {
1180+
// For non-Inexact TopK queries: extract sort info from the
1181+
// DynamicFilterPhysicalExpr to enable RG reorder.
1182+
extract_sort_order_from_predicate(predicate).map(|sort_order| {
1183+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1184+
sort_order,
1185+
))
1186+
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1187+
})
1188+
} else {
1189+
None
1190+
};
11751191

1192+
// Reverse for DESC queries. Triggered by sort pushdown (reverse_row_groups)
1193+
// or by DynamicFilterPhysicalExpr sort_options indicating DESC.
1194+
let is_descending = prepared.reverse_row_groups
1195+
|| prepared
1196+
.predicate
1197+
.as_ref()
1198+
.and_then(extract_descending_from_predicate)
1199+
.unwrap_or(false);
11761200
let reverse_optimizer: Option<
11771201
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1178-
> = if prepared.reverse_row_groups {
1202+
> = if is_descending {
11791203
Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
11801204
} else {
11811205
None
@@ -1480,6 +1504,46 @@ fn compute_best_threshold_from_stats(
14801504
Ok(best)
14811505
}
14821506

1507+
/// Extract a [`LexOrdering`] from a [`DynamicFilterPhysicalExpr`] in the predicate.
1508+
///
1509+
/// Returns a single-column ASC sort order if a DynamicFilterPhysicalExpr with
1510+
/// sort_options is found. The sort direction in the returned LexOrdering is
1511+
/// always ASC because `ReorderByStatistics` always sorts by min ASC; the
1512+
/// caller handles DESC via `ReverseRowGroups`.
1513+
fn extract_sort_order_from_predicate(
1514+
predicate: &Arc<dyn PhysicalExpr>,
1515+
) -> Option<LexOrdering> {
1516+
let df = find_dynamic_filter(predicate)?;
1517+
let sort_options = df.sort_options()?;
1518+
if sort_options.len() != 1 {
1519+
return None;
1520+
}
1521+
let children = df.children();
1522+
if children.is_empty() {
1523+
return None;
1524+
}
1525+
// Build ASC sort order (reorder always sorts ASC, reverse handles DESC)
1526+
let sort_expr = datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
1527+
expr: Arc::clone(children[0]),
1528+
options: arrow::compute::SortOptions {
1529+
descending: false,
1530+
nulls_first: sort_options[0].nulls_first,
1531+
},
1532+
};
1533+
LexOrdering::new(vec![sort_expr])
1534+
}
1535+
1536+
/// Check if a [`DynamicFilterPhysicalExpr`] in the predicate indicates DESC sort.
1537+
fn extract_descending_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Option<bool> {
1538+
let df = find_dynamic_filter(predicate)?;
1539+
let sort_options = df.sort_options()?;
1540+
if sort_options.len() == 1 {
1541+
Some(sort_options[0].descending)
1542+
} else {
1543+
None
1544+
}
1545+
}
1546+
14831547
/// State for a stream that decodes a single Parquet file using a push-based decoder.
14841548
///
14851549
/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests

datafusion/datasource-parquet/src/source.rs

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,70 @@ impl ParquetSource {
486486
self.reverse_row_groups
487487
}
488488

489+
/// Extract TopK sort column name and direction from available sources.
490+
///
491+
/// Tries two sources in order:
492+
/// 1. DynamicFilterPhysicalExpr in the predicate (works for ALL TopK queries)
493+
/// 2. sort_order_for_reorder (only set in Inexact sort pushdown path)
494+
fn extract_topk_sort_info(&self) -> Option<(String, bool)> {
495+
// Try 1: from predicate's DynamicFilterPhysicalExpr
496+
if let Some(predicate) = &self.predicate
497+
&& let Some(info) = Self::sort_info_from_dynamic_filter(predicate)
498+
{
499+
return Some(info);
500+
}
501+
502+
// Try 2: fallback to sort_order_for_reorder (Inexact path)
503+
if let Some(sort_order) = &self.sort_order_for_reorder
504+
&& !sort_order.is_empty()
505+
{
506+
let first = sort_order.first();
507+
if let Some(col) = first
508+
.expr
509+
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
510+
{
511+
return Some((col.name().to_string(), self.reverse_row_groups));
512+
}
513+
}
514+
515+
None
516+
}
517+
518+
/// Try to extract sort column name and direction from a DynamicFilterPhysicalExpr
519+
/// in the predicate tree.
520+
fn sort_info_from_dynamic_filter(
521+
expr: &Arc<dyn PhysicalExpr>,
522+
) -> Option<(String, bool)> {
523+
// Try downcast to DynamicFilterPhysicalExpr
524+
let cloned = Arc::clone(expr);
525+
let any_arc: Arc<dyn std::any::Any + Send + Sync> = cloned;
526+
if let Ok(df) = Arc::downcast::<
527+
datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr,
528+
>(any_arc)
529+
{
530+
let sort_options = df.sort_options()?;
531+
if sort_options.len() != 1 {
532+
return None;
533+
}
534+
let children = df.children();
535+
if children.is_empty() {
536+
return None;
537+
}
538+
let col = children[0]
539+
.downcast_ref::<datafusion_physical_expr::expressions::Column>()?;
540+
return Some((col.name().to_string(), sort_options[0].descending));
541+
}
542+
543+
// Recursively check children
544+
for child in expr.children() {
545+
if let Some(info) = Self::sort_info_from_dynamic_filter(child) {
546+
return Some(info);
547+
}
548+
}
549+
550+
None
551+
}
552+
489553
/// Extract the sort key from a file's statistics for reordering.
490554
///
491555
/// For DESC sorts, returns the column's min_value (we want highest min first).
@@ -609,27 +673,17 @@ impl FileSource for ParquetSource {
609673
&self,
610674
mut files: Vec<datafusion_datasource::PartitionedFile>,
611675
) -> Vec<datafusion_datasource::PartitionedFile> {
612-
let sort_order = match &self.sort_order_for_reorder {
613-
Some(order) if !order.is_empty() => order,
614-
_ => return files,
676+
// Extract sort column and direction from either:
677+
// 1. The DynamicFilterPhysicalExpr in the predicate (works for ALL TopK)
678+
// 2. Fallback to sort_order_for_reorder (Inexact path only)
679+
let (col_name, descending) = match self.extract_topk_sort_info() {
680+
Some(info) => info,
681+
None => return files,
615682
};
616683

617-
// We only handle single-column sort for now
618-
let first_expr = sort_order.first();
619-
620-
// The sort expression must be a Column so we can look up statistics
621-
let col: &datafusion_physical_expr::expressions::Column =
622-
match first_expr.expr.downcast_ref() {
623-
Some(col) => col,
624-
None => return files,
625-
};
626-
627-
let col_name = col.name();
628-
let descending = self.reverse_row_groups;
629-
630684
// Find the column index in the table schema
631685
let table_schema = self.table_schema.table_schema();
632-
let col_idx = match table_schema.index_of(col_name) {
686+
let col_idx = match table_schema.index_of(&col_name) {
633687
Ok(idx) => idx,
634688
Err(_) => return files,
635689
};

0 commit comments

Comments
 (0)