Skip to content

Commit c94bdcc

Browse files
committed
fix: restrict RG reorder/reverse to sort pushdown path only
RG reorder and reverse must only trigger when sort pushdown is active (sort_order_for_reorder is set). Applying them to non-sort-pushdown TopK queries changes the RG read order, which alters tie-breaking for equal values (e.g. NULLs) and causes non-deterministic results. File reorder and stats init remain enabled for ALL TopK queries since they only affect pruning (which rows are skipped), not the relative order of rows within remaining RGs. Fixes fuzz_cases::topk_filter_pushdown::test_fuzz_topk_filter_pushdown
1 parent e4e11b2 commit c94bdcc

1 file changed

Lines changed: 12 additions & 68 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 12 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,37 +1168,21 @@ impl RowGroupsPrunedParquetOpen {
11681168
// or from DynamicFilterPhysicalExpr sort_options (any TopK query).
11691169
let reorder_optimizer: Option<
11701170
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1171-
> = if let Some(sort_order) = &prepared.sort_order_for_reorder {
1172-
Some(
1173-
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1174-
sort_order.clone(),
1175-
))
1176-
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1177-
)
1178-
} else if let Some(predicate) = &prepared.predicate {
1179-
// For non-Inexact TopK queries: extract sort info from the
1180-
// DynamicFilterPhysicalExpr to enable RG reorder.
1181-
extract_sort_order_from_predicate(predicate).map(|sort_order| {
1182-
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1183-
sort_order,
1184-
))
1185-
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1186-
})
1187-
} else {
1188-
None
1189-
};
1171+
// RG reorder only in sort pushdown path (sort_order_for_reorder set).
1172+
// Reordering RGs without sort pushdown changes tie-breaking for equal
1173+
// values, which can cause non-deterministic results.
1174+
> = prepared.sort_order_for_reorder.as_ref().map(|sort_order| {
1175+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1176+
sort_order.clone(),
1177+
)) as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1178+
});
11901179

1191-
// Reverse for DESC queries. Triggered by sort pushdown (reverse_row_groups)
1192-
// or by DynamicFilterPhysicalExpr sort_options indicating DESC.
1193-
let is_descending = prepared.reverse_row_groups
1194-
|| prepared
1195-
.predicate
1196-
.as_ref()
1197-
.and_then(extract_descending_from_predicate)
1198-
.unwrap_or(false);
1180+
// Reverse for DESC queries. ONLY triggered by sort pushdown
1181+
// (reverse_row_groups=true), which means the file is declared WITH ORDER
1182+
// and data is sorted. Reversing unsorted data would produce wrong results.
11991183
let reverse_optimizer: Option<
12001184
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1201-
> = if is_descending {
1185+
> = if prepared.reverse_row_groups {
12021186
Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
12031187
} else {
12041188
None
@@ -1503,46 +1487,6 @@ fn compute_best_threshold_from_stats(
15031487
Ok(best)
15041488
}
15051489

1506-
/// Extract a [`LexOrdering`] from a [`DynamicFilterPhysicalExpr`] in the predicate.
1507-
///
1508-
/// Returns a single-column ASC sort order if a DynamicFilterPhysicalExpr with
1509-
/// sort_options is found. The sort direction in the returned LexOrdering is
1510-
/// always ASC because `ReorderByStatistics` always sorts by min ASC; the
1511-
/// caller handles DESC via `ReverseRowGroups`.
1512-
fn extract_sort_order_from_predicate(
1513-
predicate: &Arc<dyn PhysicalExpr>,
1514-
) -> Option<LexOrdering> {
1515-
let df = find_dynamic_filter(predicate)?;
1516-
let sort_options = df.sort_options()?;
1517-
if sort_options.len() != 1 {
1518-
return None;
1519-
}
1520-
let children = df.children();
1521-
if children.is_empty() {
1522-
return None;
1523-
}
1524-
// Build ASC sort order (reorder always sorts ASC, reverse handles DESC)
1525-
let sort_expr = datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
1526-
expr: Arc::clone(children[0]),
1527-
options: arrow::compute::SortOptions {
1528-
descending: false,
1529-
nulls_first: sort_options[0].nulls_first,
1530-
},
1531-
};
1532-
LexOrdering::new(vec![sort_expr])
1533-
}
1534-
1535-
/// Check if a [`DynamicFilterPhysicalExpr`] in the predicate indicates DESC sort.
1536-
fn extract_descending_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Option<bool> {
1537-
let df = find_dynamic_filter(predicate)?;
1538-
let sort_options = df.sort_options()?;
1539-
if sort_options.len() == 1 {
1540-
Some(sort_options[0].descending)
1541-
} else {
1542-
None
1543-
}
1544-
}
1545-
15461490
/// State for a stream that decodes a single Parquet file using a push-based decoder.
15471491
///
15481492
/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests

0 commit comments

Comments
 (0)