Skip to content

Commit 18939c9

Browse files
committed
feat: enable stats init for ALL TopK queries + fix fuzz test tiebreaker
Stats init now fires for all TopK queries, not just sort pushdown path. The null-aware filter (IS NULL OR col > threshold for NULLS FIRST) ensures correctness when NULLs are present. Fix fuzz test: add remaining columns as ASC NULLS LAST tiebreakers to ORDER BY, making the sort fully deterministic. This is the correct approach since SQL doesn't guarantee tie-breaking order, and any optimization that changes RG read order may produce different but equally valid results for tied rows.
1 parent 8fb7b00 commit 18939c9

2 files changed

Lines changed: 24 additions & 10 deletions

File tree

datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,21 +305,36 @@ async fn test_fuzz_topk_filter_pushdown() {
305305
}
306306

307307
let mut queries = vec![];
308+
let all_columns = ["id", "name", "department"];
308309

309310
for limit in [1, 10] {
310311
for num_order_by_columns in [1, 2, 3] {
311-
for order_columns in ["id", "name", "department"]
312-
.iter()
313-
.combinations(num_order_by_columns)
314-
{
312+
for order_columns in all_columns.iter().combinations(num_order_by_columns) {
315313
for orderings in order_columns
316314
.iter()
317315
.map(|col| orders.get(**col).unwrap())
318316
.multi_cartesian_product()
319317
{
318+
// Add remaining columns as ASC tiebreakers to make
319+
// the ordering fully deterministic. Without this,
320+
// optimizations that change RG read order (e.g.
321+
// statistics-based pruning) may produce different
322+
// but equally valid tie-breaking results.
323+
let used: Vec<&str> = order_columns.iter().map(|c| **c).collect();
324+
let tiebreakers: Vec<String> = all_columns
325+
.iter()
326+
.filter(|c| !used.contains(*c))
327+
.map(|c| format!("{c} ASC NULLS LAST"))
328+
.collect();
329+
let mut all_orderings: Vec<&str> =
330+
orderings.iter().map(|s| s.as_str()).collect();
331+
let tiebreaker_refs: Vec<&str> =
332+
tiebreakers.iter().map(|s| s.as_str()).collect();
333+
all_orderings.extend(tiebreaker_refs);
334+
320335
let query = format!(
321336
"SELECT * FROM test_table ORDER BY {} LIMIT {}",
322-
orderings.into_iter().join(", "),
337+
all_orderings.join(", "),
323338
limit
324339
);
325340
queries.push(query);

datafusion/datasource-parquet/src/opener.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -838,11 +838,10 @@ impl MetadataLoadedParquetOpen {
838838
// BEFORE building the pruning predicate. The PruningPredicate compiles
839839
// the expression at build time, so the DynamicFilterPhysicalExpr must
840840
// already have the threshold set for pruning to be effective.
841-
// Only when sort pushdown is active (sort_order_for_reorder set).
842-
// For non-sort-pushdown TopK, pruning may change tie-breaking for
843-
// equal values across RGs. Future: extend to all TopK once
844-
// tie-breaking is handled or fuzz tests are updated.
845-
if prepared.sort_order_for_reorder.is_some() {
841+
// Works for ALL TopK queries. The filter is null-aware (NULLS FIRST
842+
// adds `col IS NULL OR ...`) to avoid incorrectly pruning RGs with
843+
// NULL values that belong in the result.
844+
{
846845
let file_metadata = reader_metadata.metadata();
847846
let rg_metadata = file_metadata.row_groups();
848847
let topk_limit = prepared.limit.unwrap_or(1);

0 commit comments

Comments
 (0)