Skip to content

Commit 8fb7b00

Browse files
committed
fix: null-aware filter + restrict stats init to sort pushdown path
Add null-aware filter for NULLS FIRST sort: `col IS NULL OR col > threshold` ensures RGs with NULLs are not incorrectly pruned. Stats init remains restricted to sort pushdown path because pruning changes tie-breaking for equal values across RGs, which causes non-deterministic results in non-sort-pushdown TopK queries. The null-aware filter is still useful for sort pushdown DESC NULLS FIRST.
1 parent 54f4fd4 commit 8fb7b00

1 file changed

Lines changed: 21 additions & 5 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -838,9 +838,10 @@ impl MetadataLoadedParquetOpen {
838838
// BEFORE building the pruning predicate. The PruningPredicate compiles
839839
// the expression at build time, so the DynamicFilterPhysicalExpr must
840840
// already have the threshold set for pruning to be effective.
841-
// Only initialize TopK threshold when sort pushdown is active.
842-
// For non-sort-pushdown TopK, pruning changes which RGs are read,
843-
// altering tie-breaking for equal values (e.g. NULLs).
841+
// Only when sort pushdown is active (sort_order_for_reorder set).
842+
// For non-sort-pushdown TopK, pruning may change tie-breaking for
843+
// equal values across RGs. Future: extend to all TopK once
844+
// tie-breaking is handled or fuzz tests are updated.
844845
if prepared.sort_order_for_reorder.is_some() {
845846
let file_metadata = reader_metadata.metadata();
846847
let rg_metadata = file_metadata.row_groups();
@@ -1400,19 +1401,34 @@ fn try_init_topk_threshold(
14001401
}
14011402
};
14021403

1403-
// Build the filter expression: col > threshold (DESC) or col < threshold (ASC)
1404+
// Build the filter expression with null-awareness.
1405+
// For NULLS FIRST: NULLs sort before all values, so the filter must
1406+
// preserve NULL rows: `col IS NULL OR col > threshold` (DESC) or
1407+
// `col IS NULL OR col < threshold` (ASC).
1408+
// For NULLS LAST: NULLs sort after all values, so a simple comparison
1409+
// suffices: `col > threshold` (DESC) or `col < threshold` (ASC).
1410+
let nulls_first = sort_options[0].nulls_first;
14041411
let op = if is_descending {
14051412
Operator::Gt
14061413
} else {
14071414
Operator::Lt
14081415
};
14091416

1410-
let filter_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1417+
let comparison: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
14111418
Arc::clone(&col_expr),
14121419
op,
14131420
lit(threshold.clone()),
14141421
));
14151422

1423+
let filter_expr: Arc<dyn PhysicalExpr> = if nulls_first {
1424+
// NULLS FIRST: preserve NULL rows (they sort before threshold)
1425+
use datafusion_physical_expr::expressions::is_null;
1426+
let null_check = is_null(Arc::clone(&col_expr))?;
1427+
Arc::new(BinaryExpr::new(null_check, Operator::Or, comparison))
1428+
} else {
1429+
comparison
1430+
};
1431+
14161432
debug!(
14171433
"Initializing TopK dynamic filter from statistics: {} {} {}",
14181434
col_name,

0 commit comments

Comments
 (0)