@@ -27,7 +27,10 @@ use crate::{
2727use arrow:: array:: { Array , RecordBatch , RecordBatchOptions } ;
2828use arrow:: datatypes:: DataType ;
2929use datafusion_datasource:: morsel:: { Morsel , MorselPlan , MorselPlanner , Morselizer } ;
30- use datafusion_physical_expr:: expressions:: { Column , DynamicFilterPhysicalExpr } ;
30+ use datafusion_expr:: Operator ;
31+ use datafusion_physical_expr:: expressions:: {
32+ BinaryExpr , Column , DynamicFilterPhysicalExpr , lit,
33+ } ;
3134use datafusion_physical_expr:: projection:: { ProjectionExprs , Projector } ;
3235use datafusion_physical_expr:: utils:: reassign_expr_columns;
3336use datafusion_physical_expr_adapter:: replace_columns_with_literals;
@@ -831,10 +834,36 @@ impl MetadataLoadedParquetOpen {
831834 }
832835 prepared. physical_file_schema = Arc :: clone ( & physical_file_schema) ;
833836
834- // Note: stats init (try_init_topk_threshold) was removed in favor of
835- // cumulative RG pruning which is safer and works with WHERE clauses.
836- // Stats init had issues: Gt vs GtEq boundary, type coercion, and
837- // conflicting with cumulative prune when limit spans multiple RGs.
837+ // Initialize TopK threshold from RG statistics BEFORE building
838+ // PruningPredicate. Uses GtEq/LtEq to include boundary values.
839+ // Only for sort pushdown + no WHERE (pure DynamicFilter predicate).
840+ // Uses df.fetch() as limit so stats init skips when K spans multiple
841+ // RGs (no single RG has >= K rows), letting cumulative prune handle it.
842+ if prepared. sort_order_for_reorder . is_some ( )
843+ && let Some ( pred) = & prepared. predicate
844+ && let Some ( df) = find_dynamic_filter ( pred)
845+ && df. sort_options ( ) . is_some_and ( |opts| opts. len ( ) == 1 )
846+ && let Some ( fetch) = df. fetch ( )
847+ {
848+ // Only when predicate is pure DynamicFilter (no WHERE)
849+ let any_ref: & dyn std:: any:: Any = pred. as_ref ( ) ;
850+ if any_ref
851+ . downcast_ref :: < DynamicFilterPhysicalExpr > ( )
852+ . is_some ( )
853+ {
854+ let file_metadata = reader_metadata. metadata ( ) ;
855+ let rg_metadata = file_metadata. row_groups ( ) ;
856+ if let Err ( e) = try_init_topk_threshold (
857+ pred,
858+ fetch,
859+ rg_metadata,
860+ & physical_file_schema,
861+ reader_metadata. parquet_schema ( ) ,
862+ ) {
863+ debug ! ( "Skipping TopK threshold init: {e}" ) ;
864+ }
865+ }
866+ }
838867
839868 // Build predicates for this specific file
840869 let pruning_predicate = build_pruning_predicates (
@@ -1370,6 +1399,130 @@ impl RowGroupsPrunedParquetOpen {
13701399/// checking the predicate itself and recursively walking its children.
13711400/// Check if row groups in the prepared plan are non-overlapping on the
13721401/// sort column. Adjacent RGs must satisfy `max(i) <= min(i+1)`.
1402+ /// Initialize TopK dynamic filter threshold from row group statistics.
1403+ ///
1404+ /// For DESC: `threshold = max(min)` across RGs with `num_rows >= fetch`.
1405+ /// For ASC: `threshold = min(max)` across qualifying RGs.
1406+ /// Uses GtEq/LtEq to include boundary values.
1407+ fn try_init_topk_threshold (
1408+ predicate : & Arc < dyn PhysicalExpr > ,
1409+ fetch : usize ,
1410+ rg_metadata : & [ parquet:: file:: metadata:: RowGroupMetaData ] ,
1411+ arrow_schema : & Schema ,
1412+ parquet_schema : & parquet:: schema:: types:: SchemaDescriptor ,
1413+ ) -> Result < ( ) > {
1414+ let dynamic_filter = match find_dynamic_filter ( predicate) {
1415+ Some ( df) => df,
1416+ None => return Ok ( ( ) ) ,
1417+ } ;
1418+
1419+ if dynamic_filter. snapshot_generation ( ) > 1 {
1420+ return Ok ( ( ) ) ; // Already initialized
1421+ }
1422+
1423+ let sort_options = match dynamic_filter. sort_options ( ) {
1424+ Some ( opts) if opts. len ( ) == 1 => opts,
1425+ _ => return Ok ( ( ) ) ,
1426+ } ;
1427+
1428+ let is_descending = sort_options[ 0 ] . descending ;
1429+ let nulls_first = sort_options[ 0 ] . nulls_first ;
1430+
1431+ let column = match find_column_in_expr ( dynamic_filter. children ( ) [ 0 ] ) {
1432+ Some ( col) => col,
1433+ None => return Ok ( ( ) ) ,
1434+ } ;
1435+
1436+ let col_name = column. name ( ) ;
1437+ let converter = StatisticsConverter :: try_new ( col_name, arrow_schema, parquet_schema)
1438+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1439+
1440+ let threshold = if is_descending {
1441+ let mins = converter
1442+ . row_group_mins ( rg_metadata. iter ( ) )
1443+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1444+ compute_best_threshold ( & mins, rg_metadata, fetch, true ) ?
1445+ } else {
1446+ let maxes = converter
1447+ . row_group_maxes ( rg_metadata. iter ( ) )
1448+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1449+ compute_best_threshold ( & maxes, rg_metadata, fetch, false ) ?
1450+ } ;
1451+
1452+ let threshold = match threshold {
1453+ Some ( t) => t,
1454+ None => return Ok ( ( ) ) , // No RG with >= fetch rows
1455+ } ;
1456+
1457+ // Cast threshold to column type
1458+ let col_expr: Arc < dyn PhysicalExpr > = Arc :: clone ( dynamic_filter. children ( ) [ 0 ] ) ;
1459+ let col_data_type = col_expr. data_type ( arrow_schema) ?;
1460+ let threshold_casted = threshold. cast_to ( & col_data_type) ?;
1461+
1462+ // GtEq/LtEq: boundary value IS a valid top-K value
1463+ let op = if is_descending {
1464+ Operator :: GtEq
1465+ } else {
1466+ Operator :: LtEq
1467+ } ;
1468+
1469+ let comparison: Arc < dyn PhysicalExpr > = Arc :: new ( BinaryExpr :: new (
1470+ Arc :: clone ( & col_expr) ,
1471+ op,
1472+ lit ( threshold_casted) ,
1473+ ) ) ;
1474+
1475+ let filter_expr: Arc < dyn PhysicalExpr > = if nulls_first {
1476+ use datafusion_physical_expr:: expressions:: is_null;
1477+ let null_check = is_null ( Arc :: clone ( & col_expr) ) ?;
1478+ Arc :: new ( BinaryExpr :: new ( null_check, Operator :: Or , comparison) )
1479+ } else {
1480+ comparison
1481+ } ;
1482+
1483+ debug ! (
1484+ "TopK stats init: {col_name} {op} (fetch={fetch})" ,
1485+ op = if is_descending { ">=" } else { "<=" }
1486+ ) ;
1487+
1488+ dynamic_filter. update ( filter_expr) ?;
1489+ Ok ( ( ) )
1490+ }
1491+
1492+ /// Find the best threshold from RG statistics.
1493+ /// `want_max=true`: max of values (for DESC min). `want_max=false`: min of values (for ASC max).
1494+ fn compute_best_threshold (
1495+ stats : & arrow:: array:: ArrayRef ,
1496+ rg_metadata : & [ parquet:: file:: metadata:: RowGroupMetaData ] ,
1497+ fetch : usize ,
1498+ want_max : bool ,
1499+ ) -> Result < Option < ScalarValue > > {
1500+ let mut best: Option < ScalarValue > = None ;
1501+ for ( i, rg) in rg_metadata. iter ( ) . enumerate ( ) {
1502+ if ( rg. num_rows ( ) as usize ) < fetch {
1503+ continue ;
1504+ }
1505+ if i >= stats. len ( ) || stats. is_null ( i) {
1506+ continue ;
1507+ }
1508+ let value = ScalarValue :: try_from_array ( stats. as_ref ( ) , i) ?;
1509+ if value. is_null ( ) {
1510+ continue ;
1511+ }
1512+ best = Some ( match best {
1513+ None => value,
1514+ Some ( current) => {
1515+ if want_max {
1516+ if value > current { value } else { current }
1517+ } else {
1518+ if value < current { value } else { current }
1519+ }
1520+ }
1521+ } ) ;
1522+ }
1523+ Ok ( best)
1524+ }
1525+
13731526fn rgs_are_non_overlapping (
13741527 plan : & crate :: access_plan:: PreparedAccessPlan ,
13751528 file_metadata : & parquet:: file:: metadata:: ParquetMetaData ,
0 commit comments