Skip to content

Commit a7b4e25

Browse files
committed
fix: stats init requires sort pushdown + no WHERE clause
Stats init is only safe when: 1. Sort pushdown active (sorted, non-overlapping RGs) 2. Predicate is DynamicFilter only (no WHERE clause) WHERE clause narrows qualifying rows below what raw statistics suggest, making the threshold potentially unsafe even on sorted data. Type coercion (CastExpr) issues also need resolution for general TopK support — tracked as follow-up work. Includes type cast fix (parquet stats type → column type) and surviving-rows safety check for future use.
1 parent 5f4c54d commit a7b4e25

1 file changed

Lines changed: 18 additions & 21 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -835,13 +835,19 @@ impl MetadataLoadedParquetOpen {
835835
prepared.physical_file_schema = Arc::clone(&physical_file_schema);
836836

837837
// Initialize TopK threshold from RG statistics BEFORE building
838-
// PruningPredicate. Safe when:
839-
// 1. Sort pushdown path (sorted, non-overlapping RGs), OR
840-
// 2. Predicate is ONLY the DynamicFilter (no WHERE clause that
841-
// could narrow qualifying rows below what raw stats suggest).
842-
// The surviving-rows check prevents over-pruning for overlapping RGs.
838+
// PruningPredicate. Only for sort pushdown path where data is sorted
839+
// and RGs are non-overlapping, making max(min)/min(max) a safe bound.
840+
// Extending to all TopK requires handling type coercion (CastExpr),
841+
// WHERE clause interaction, and overlapping RG edge cases.
842+
// Also require predicate to be DynamicFilter-only (no WHERE clause).
843+
// WHERE narrows qualifying rows, making raw stats-based threshold unsafe.
843844
if prepared.sort_order_for_reorder.is_some()
844-
|| is_dynamic_filter_only_predicate(&prepared.predicate)
845+
&& prepared.predicate.as_ref().is_some_and(|p| {
846+
let any_ref: &dyn std::any::Any = p.as_ref();
847+
any_ref
848+
.downcast_ref::<DynamicFilterPhysicalExpr>()
849+
.is_some()
850+
})
845851
{
846852
let file_metadata = reader_metadata.metadata();
847853
let rg_metadata = file_metadata.row_groups();
@@ -1468,10 +1474,15 @@ fn try_init_topk_threshold(
14681474
Operator::Lt
14691475
};
14701476

1477+
// Cast threshold to match column data type (parquet stats may use a
1478+
// different type than the table schema, e.g. Int32 stats vs Int64 column).
1479+
let col_data_type = col_expr.data_type(arrow_schema)?;
1480+
let threshold_casted = threshold.cast_to(&col_data_type)?;
1481+
14711482
let comparison: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
14721483
Arc::clone(&col_expr),
14731484
op,
1474-
lit(threshold.clone()),
1485+
lit(threshold_casted),
14751486
));
14761487

14771488
let filter_expr: Arc<dyn PhysicalExpr> = if nulls_first {
@@ -1495,20 +1506,6 @@ fn try_init_topk_threshold(
14951506
Ok(())
14961507
}
14971508

1498-
/// Check if the predicate consists ONLY of a [`DynamicFilterPhysicalExpr`]
1499-
/// (no WHERE clause filters combined with it).
1500-
fn is_dynamic_filter_only_predicate(predicate: &Option<Arc<dyn PhysicalExpr>>) -> bool {
1501-
match predicate {
1502-
Some(pred) => {
1503-
let any_ref: &dyn std::any::Any = pred.as_ref();
1504-
any_ref
1505-
.downcast_ref::<DynamicFilterPhysicalExpr>()
1506-
.is_some()
1507-
}
1508-
None => false,
1509-
}
1510-
}
1511-
15121509
/// Find a [`DynamicFilterPhysicalExpr`] in the predicate tree.
15131510
///
15141511
/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by

0 commit comments

Comments
 (0)