@@ -834,10 +834,15 @@ impl MetadataLoadedParquetOpen {
834834 }
835835 prepared. physical_file_schema = Arc :: clone ( & physical_file_schema) ;
836836
837- // For sort pushdown path: initialize TopK threshold BEFORE building
838- // PruningPredicate so that the compiled predicate includes the threshold.
839- // This is safe because sort pushdown data is sorted and non-overlapping.
840- if prepared. sort_order_for_reorder . is_some ( ) {
837+ // Initialize TopK threshold from RG statistics BEFORE building
838+ // PruningPredicate. Safe when:
839+ // 1. Sort pushdown path (sorted, non-overlapping RGs), OR
840+ // 2. Predicate is ONLY the DynamicFilter (no WHERE clause that
841+ // could narrow qualifying rows below what raw stats suggest).
842+ // The surviving-rows check prevents over-pruning for overlapping RGs.
843+ if prepared. sort_order_for_reorder . is_some ( )
844+ || is_dynamic_filter_only_predicate ( & prepared. predicate )
845+ {
841846 let file_metadata = reader_metadata. metadata ( ) ;
842847 let rg_metadata = file_metadata. row_groups ( ) ;
843848 let topk_limit = prepared. limit . unwrap_or ( 1 ) ;
@@ -965,12 +970,6 @@ impl FiltersPreparedParquetOpen {
965970 . add_matched ( remaining) ;
966971 }
967972
968- // Note: stats init for non-sort-pushdown TopK is not safe because
969- // max(min)/min(max) is only a valid threshold bound when RGs are
970- // non-overlapping (sorted data). For overlapping RGs, the top-K
971- // values may span multiple RGs and the threshold can over-prune.
972- // Future: use a safer algorithm (e.g. sum of qualifying rows).
973-
974973 Ok ( RowGroupsPrunedParquetOpen {
975974 prepared : self ,
976975 row_groups,
@@ -1402,6 +1401,60 @@ fn try_init_topk_threshold(
14021401 }
14031402 } ;
14041403
1404+ // Safety check: verify that enough rows survive after applying the
1405+ // threshold. Count rows in RGs that would NOT be pruned (i.e., their
1406+ // max >= threshold for DESC, or min <= threshold for ASC).
1407+ // If remaining rows < limit, the threshold is too tight — skip.
1408+ let converter_for_check =
1409+ StatisticsConverter :: try_new ( col_name, arrow_schema, parquet_schema)
1410+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1411+ let surviving_rows: usize = if is_descending {
1412+ // DESC: keep RGs where max >= threshold (they may have values > threshold)
1413+ let maxes = converter_for_check
1414+ . row_group_maxes ( rg_metadata. iter ( ) )
1415+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1416+ rg_metadata
1417+ . iter ( )
1418+ . enumerate ( )
1419+ . filter ( |( i, _) | {
1420+ if * i >= maxes. len ( ) || maxes. is_null ( * i) {
1421+ return true ; // keep RGs with unknown stats
1422+ }
1423+ match ScalarValue :: try_from_array ( maxes. as_ref ( ) , * i) {
1424+ Ok ( max_val) => max_val >= threshold,
1425+ Err ( _) => true ,
1426+ }
1427+ } )
1428+ . map ( |( _, rg) | rg. num_rows ( ) as usize )
1429+ . sum ( )
1430+ } else {
1431+ // ASC: keep RGs where min <= threshold
1432+ let mins = converter_for_check
1433+ . row_group_mins ( rg_metadata. iter ( ) )
1434+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
1435+ rg_metadata
1436+ . iter ( )
1437+ . enumerate ( )
1438+ . filter ( |( i, _) | {
1439+ if * i >= mins. len ( ) || mins. is_null ( * i) {
1440+ return true ;
1441+ }
1442+ match ScalarValue :: try_from_array ( mins. as_ref ( ) , * i) {
1443+ Ok ( min_val) => min_val <= threshold,
1444+ Err ( _) => true ,
1445+ }
1446+ } )
1447+ . map ( |( _, rg) | rg. num_rows ( ) as usize )
1448+ . sum ( )
1449+ } ;
1450+
1451+ if surviving_rows < limit {
1452+ debug ! (
1453+ "Skipping TopK threshold init: only {surviving_rows} rows would survive, need {limit}"
1454+ ) ;
1455+ return Ok ( ( ) ) ;
1456+ }
1457+
14051458 // Build the filter expression with null-awareness.
14061459 // For NULLS FIRST: NULLs sort before all values, so the filter must
14071460 // preserve NULL rows: `col IS NULL OR col > threshold` (DESC) or
@@ -1442,6 +1495,20 @@ fn try_init_topk_threshold(
14421495 Ok ( ( ) )
14431496}
14441497
1498+ /// Check if the predicate consists ONLY of a [`DynamicFilterPhysicalExpr`]
1499+ /// (no WHERE clause filters combined with it).
1500+ fn is_dynamic_filter_only_predicate ( predicate : & Option < Arc < dyn PhysicalExpr > > ) -> bool {
1501+ match predicate {
1502+ Some ( pred) => {
1503+ let any_ref: & dyn std:: any:: Any = pred. as_ref ( ) ;
1504+ any_ref
1505+ . downcast_ref :: < DynamicFilterPhysicalExpr > ( )
1506+ . is_some ( )
1507+ }
1508+ None => false ,
1509+ }
1510+ }
1511+
14451512/// Find a [`DynamicFilterPhysicalExpr`] in the predicate tree.
14461513///
14471514/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by
0 commit comments