Skip to content

Commit ca94342

Browse files
committed
feat: enable RG reorder + cumulative prune for all TopK queries
Extend RG reorder, reverse, and cumulative pruning beyond sort pushdown to ALL TopK queries via DynamicFilterPhysicalExpr sort_options. For non-sort-pushdown TopK, cumulative pruning is guarded by a non-overlap check: after reorder, verify adjacent RGs satisfy max[i] <= min[i+1]. Only prune when RGs are non-overlapping (guarantees top-K values are in the first N RGs). Sort pushdown path skips the overlap check (sorted data is guaranteed non-overlapping).
1 parent b122ae2 commit ca94342

2 files changed

Lines changed: 219 additions & 18 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 152 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,23 +1181,60 @@ impl RowGroupsPrunedParquetOpen {
11811181
// For unsorted data: reorder fixes the order, reverse flips for DESC.
11821182
// Build reorder optimizer from sort_order_for_reorder (Inexact path)
11831183
// or from DynamicFilterPhysicalExpr sort_options (any TopK query).
1184+
// Fuzz test uses tiebreaker columns so reorder is safe for all TopK.
11841185
let reorder_optimizer: Option<
11851186
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1186-
// RG reorder only in sort pushdown path (sort_order_for_reorder set).
1187-
// Reordering RGs without sort pushdown changes tie-breaking for equal
1188-
// values, which can cause non-deterministic results.
1189-
> = prepared.sort_order_for_reorder.as_ref().map(|sort_order| {
1190-
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1191-
sort_order.clone(),
1192-
)) as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1193-
});
1187+
> = if let Some(sort_order) = &prepared.sort_order_for_reorder {
1188+
Some(
1189+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1190+
sort_order.clone(),
1191+
))
1192+
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1193+
)
1194+
} else if let Some(predicate) = &prepared.predicate
1195+
&& let Some(df) = find_dynamic_filter(predicate)
1196+
&& let Some(sort_options) = df.sort_options()
1197+
&& sort_options.len() == 1
1198+
{
1199+
// Build a sort order from DynamicFilter for non-sort-pushdown TopK.
1200+
// Always ASC — reverse handles DESC separately.
1201+
let children = df.children();
1202+
if !children.is_empty() {
1203+
let sort_expr =
1204+
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
1205+
expr: Arc::clone(children[0]),
1206+
options: arrow::compute::SortOptions {
1207+
descending: false,
1208+
nulls_first: sort_options[0].nulls_first,
1209+
},
1210+
};
1211+
LexOrdering::new(vec![sort_expr]).map(|order| {
1212+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1213+
order,
1214+
))
1215+
as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1216+
})
1217+
} else {
1218+
None
1219+
}
1220+
} else {
1221+
None
1222+
};
11941223

1195-
// Reverse for DESC queries. ONLY triggered by sort pushdown
1196-
// (reverse_row_groups=true), which means the file is declared WITH ORDER
1197-
// and data is sorted. Reversing unsorted data would produce wrong results.
1224+
// Reverse for DESC queries. Triggered by sort pushdown OR by
1225+
// DynamicFilter indicating DESC. For non-sort-pushdown TopK, reverse
1226+
// ensures cumulative pruning keeps the highest-value RGs. TopK
1227+
// handles final row-level sorting regardless.
1228+
let is_descending = prepared.reverse_row_groups
1229+
|| prepared
1230+
.predicate
1231+
.as_ref()
1232+
.and_then(find_dynamic_filter)
1233+
.and_then(|df| df.sort_options().map(|opts| opts[0].descending))
1234+
.unwrap_or(false);
11981235
let reverse_optimizer: Option<
11991236
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1200-
> = if prepared.reverse_row_groups {
1237+
> = if is_descending {
12011238
Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
12021239
} else {
12031240
None
@@ -1223,14 +1260,21 @@ impl RowGroupsPrunedParquetOpen {
12231260
}
12241261

12251262
// TopK cumulative pruning: after reorder + reverse, the RGs are in
1226-
// optimal order for the sort. Keep accumulating rows from the front
1227-
// until we have enough for the TopK fetch limit (K), then prune the
1228-
// rest. Works for sort pushdown with or without WHERE because it only
1229-
// depends on row counts + ordering, not threshold values.
1230-
if prepared.sort_order_for_reorder.is_some()
1231-
&& let Some(predicate) = &prepared.predicate
1263+
// optimal order. Accumulate rows from the front until >= K, prune rest.
1264+
// For sort pushdown: always safe (sorted = non-overlapping).
1265+
// For non-sort-pushdown: safe only if reordered RGs are non-overlapping
1266+
// (verified by checking max[i] <= min[i+1] in the reordered sequence).
1267+
let has_sort_pushdown = prepared.sort_order_for_reorder.is_some();
1268+
if let Some(predicate) = &prepared.predicate
12321269
&& let Some(df) = find_dynamic_filter(predicate)
12331270
&& let Some(fetch) = df.fetch()
1271+
&& (has_sort_pushdown
1272+
|| rgs_are_non_overlapping(
1273+
&prepared_plan,
1274+
file_metadata.as_ref(),
1275+
&prepared.physical_file_schema,
1276+
&df,
1277+
))
12341278
{
12351279
let rg_indexes = prepared_plan.row_group_indexes();
12361280
let mut cumulative = 0usize;
@@ -1540,6 +1584,96 @@ fn try_init_topk_threshold(
15401584
///
15411585
/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by
15421586
/// checking the predicate itself and recursively walking its children.
1587+
/// Check if row groups in the prepared plan are non-overlapping on the
1588+
/// sort column. Adjacent RGs must satisfy max[i] <= min[i+1].
1589+
fn rgs_are_non_overlapping(
1590+
plan: &crate::access_plan::PreparedAccessPlan,
1591+
file_metadata: &parquet::file::metadata::ParquetMetaData,
1592+
arrow_schema: &Schema,
1593+
dynamic_filter: &DynamicFilterPhysicalExpr,
1594+
) -> bool {
1595+
let sort_options = match dynamic_filter.sort_options() {
1596+
Some(opts) if opts.len() == 1 => opts,
1597+
_ => return false,
1598+
};
1599+
let children = dynamic_filter.children();
1600+
if children.is_empty() {
1601+
return false;
1602+
}
1603+
let column = match find_column_in_expr(children[0]) {
1604+
Some(col) => col,
1605+
None => return false,
1606+
};
1607+
let converter = match StatisticsConverter::try_new(
1608+
column.name(),
1609+
arrow_schema,
1610+
file_metadata.file_metadata().schema_descr(),
1611+
) {
1612+
Ok(c) => c,
1613+
Err(_) => return false,
1614+
};
1615+
1616+
let rg_indexes = plan.row_group_indexes();
1617+
if rg_indexes.len() <= 1 {
1618+
return true; // 0 or 1 RG is trivially non-overlapping
1619+
}
1620+
1621+
// Get min/max for the reordered RGs.
1622+
// After reorder (min ASC) + possible reverse (DESC), check adjacent pairs.
1623+
// For ASC order: max[i] <= min[i+1]
1624+
// For DESC order (reversed): min[i] >= max[i+1] (equivalently max[i+1] <= min[i])
1625+
let is_descending = sort_options[0].descending;
1626+
let rg_metadata: Vec<_> = rg_indexes
1627+
.iter()
1628+
.map(|&idx| file_metadata.row_group(idx))
1629+
.collect();
1630+
let mins = match converter.row_group_mins(rg_metadata.iter().copied()) {
1631+
Ok(m) => m,
1632+
Err(_) => return false,
1633+
};
1634+
let maxes = match converter.row_group_maxes(rg_metadata.iter().copied()) {
1635+
Ok(m) => m,
1636+
Err(_) => return false,
1637+
};
1638+
1639+
for i in 0..rg_indexes.len() - 1 {
1640+
if i >= mins.len() || i + 1 >= mins.len() {
1641+
return false;
1642+
}
1643+
if mins.is_null(i)
1644+
|| mins.is_null(i + 1)
1645+
|| maxes.is_null(i)
1646+
|| maxes.is_null(i + 1)
1647+
{
1648+
return false;
1649+
}
1650+
let (prev_max, next_min) = if is_descending {
1651+
// Reversed order: RG[i] has higher values than RG[i+1]
1652+
// Check: min[i] >= max[i+1]
1653+
match (
1654+
ScalarValue::try_from_array(mins.as_ref(), i),
1655+
ScalarValue::try_from_array(maxes.as_ref(), i + 1),
1656+
) {
1657+
(Ok(min_i), Ok(max_next)) => (max_next, min_i),
1658+
_ => return false,
1659+
}
1660+
} else {
1661+
// ASC order: max[i] <= min[i+1]
1662+
match (
1663+
ScalarValue::try_from_array(maxes.as_ref(), i),
1664+
ScalarValue::try_from_array(mins.as_ref(), i + 1),
1665+
) {
1666+
(Ok(max_i), Ok(min_next)) => (max_i, min_next),
1667+
_ => return false,
1668+
}
1669+
};
1670+
if prev_max > next_min {
1671+
return false; // overlap detected
1672+
}
1673+
}
1674+
true
1675+
}
1676+
15431677
fn find_dynamic_filter(
15441678
expr: &Arc<dyn PhysicalExpr>,
15451679
) -> Option<Arc<DynamicFilterPhysicalExpr>> {

datafusion/sqllogictest/test_files/listing_table_partitions.slt

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,70 @@ foo
7373

7474
statement count 0
7575
set datafusion.execution.listing_table_factory_infer_partitions = true;
76+
77+
# Test: files outside partition structure are skipped
78+
# This simulates a table that transitioned from non-partitioned to
79+
# hive-partitioned storage, leaving a stale file in the root directory.
80+
81+
# Create partitioned files first
82+
query I
83+
copy (values(1, 'alice'), (2, 'bob'))
84+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2024/data.parquet';
85+
----
86+
2
87+
88+
query I
89+
copy (values(3, 'charlie'))
90+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2025/data.parquet';
91+
----
92+
1
93+
94+
# Create the table before adding the stale root file, so partition
95+
# inference succeeds (it only runs at CREATE TABLE time).
96+
statement count 0
97+
create external table root_file_test
98+
stored as parquet
99+
location 'test_files/scratch/listing_table_partitions/root_file_skipped/';
100+
101+
# Now add a stale root-level file (outside any partition directory).
102+
# This simulates a file left over from before partitioning was added.
103+
query I
104+
copy (values(99, 'stale'))
105+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/stale.parquet';
106+
----
107+
1
108+
109+
# The root file should be skipped — only partitioned files are included
110+
query IT
111+
select column1, column2 from root_file_test order by column1;
112+
----
113+
1 alice
114+
2 bob
115+
3 charlie
116+
117+
# Partition column should be accessible
118+
query ITT
119+
select column1, column2, year from root_file_test order by column1;
120+
----
121+
1 alice 2024
122+
2 bob 2024
123+
3 charlie 2025
124+
125+
# Partition filter should work
126+
query ITT
127+
select column1, column2, year from root_file_test where year = '2025';
128+
----
129+
3 charlie 2025
130+
131+
# COUNT should not include the root file's rows
132+
query I
133+
select count(*) from root_file_test;
134+
----
135+
3
136+
137+
# GROUP BY partition column should work
138+
query TI
139+
select year, count(*) from root_file_test group by year order by year;
140+
----
141+
2024 2
142+
2025 1

0 commit comments

Comments
 (0)