Skip to content

Commit b122ae2

Browse files
committed
feat: TopK cumulative RG pruning after reorder (works with WHERE)
Instead of threshold-based pruning (which fails with WHERE clauses due to unknown qualifying row counts), use cumulative row counting: after reorder + reverse, accumulate rows from the front until we have enough for the TopK fetch limit (K), then prune the rest. This works for sort pushdown with or without WHERE because it only depends on row counts + RG ordering, not threshold values or types. Adds fetch field to DynamicFilterPhysicalExpr (set by SortExec) so the parquet reader knows the TopK K value. Keeps stats init for the no-WHERE sort pushdown case (20-58x speedup) as a complementary optimization that also helps cross-file pruning via the shared DynamicFilter.
1 parent a7b4e25 commit b122ae2

4 files changed

Lines changed: 62 additions & 0 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,22 @@ impl PreparedAccessPlan {
381381
})
382382
}
383383

384+
/// Return a reference to the row group indexes.
385+
pub(crate) fn row_group_indexes(&self) -> &[usize] {
386+
&self.row_group_indexes
387+
}
388+
389+
/// Keep only the first `count` row groups, dropping the rest.
390+
/// Used for TopK cumulative pruning after reorder + reverse.
391+
pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self {
392+
self.row_group_indexes.truncate(count);
393+
// Clear row_selection since it's tied to the original RG set
394+
if self.row_selection.is_some() {
395+
self.row_selection = None;
396+
}
397+
self
398+
}
399+
384400
/// Reorder row groups by their min statistics for the given sort order.
385401
///
386402
/// This helps TopK queries find optimal values first. For ASC sort,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,36 @@ impl RowGroupsPrunedParquetOpen {
12221222
)?;
12231223
}
12241224

1225+
// TopK cumulative pruning: after reorder + reverse, the RGs are in
1226+
// optimal order for the sort. Keep accumulating rows from the front
1227+
// until we have enough for the TopK fetch limit (K), then prune the
1228+
// rest. Works for sort pushdown with or without WHERE because it only
1229+
// depends on row counts + ordering, not threshold values.
1230+
if prepared.sort_order_for_reorder.is_some()
1231+
&& let Some(predicate) = &prepared.predicate
1232+
&& let Some(df) = find_dynamic_filter(predicate)
1233+
&& let Some(fetch) = df.fetch()
1234+
{
1235+
let rg_indexes = prepared_plan.row_group_indexes();
1236+
let mut cumulative = 0usize;
1237+
let mut keep_count = 0;
1238+
for &idx in rg_indexes {
1239+
cumulative += file_metadata.row_group(idx).num_rows() as usize;
1240+
keep_count += 1;
1241+
if cumulative >= fetch {
1242+
break;
1243+
}
1244+
}
1245+
if keep_count < rg_indexes.len() {
1246+
let pruned = rg_indexes.len() - keep_count;
1247+
debug!(
1248+
"TopK cumulative prune: keeping {keep_count} of {} RGs ({cumulative} rows >= fetch={fetch}), pruning {pruned}",
1249+
rg_indexes.len()
1250+
);
1251+
prepared_plan = prepared_plan.truncate_row_groups(keep_count);
1252+
}
1253+
}
1254+
12251255
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
12261256
let read_plan = build_projection_read_plan(
12271257
prepared.projection.expr_iter(),
@@ -3343,6 +3373,7 @@ mod test {
33433373
vec![Arc::clone(&col_expr)],
33443374
super::lit(true),
33453375
vec![desc_opts],
3376+
Some(10),
33463377
));
33473378
let predicate: Arc<dyn PhysicalExpr> = dynamic_filter.clone();
33483379

@@ -3389,6 +3420,7 @@ mod test {
33893420
vec![Arc::clone(&col_expr)],
33903421
super::lit(true),
33913422
vec![asc_opts],
3423+
Some(10),
33923424
));
33933425
let predicate: Arc<dyn PhysicalExpr> = dynamic_filter.clone();
33943426

@@ -3480,6 +3512,7 @@ mod test {
34803512
vec![col1, col2],
34813513
super::lit(true),
34823514
vec![opts, opts],
3515+
Some(10),
34833516
));
34843517
let predicate: Arc<dyn PhysicalExpr> = dynamic_filter.clone();
34853518

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ pub struct DynamicFilterPhysicalExpr {
8080
/// parquet reader can use these to initialize the filter threshold from
8181
/// column statistics before reading any data.
8282
sort_options: Option<Vec<SortOptions>>,
83+
/// Optional TopK fetch limit (K in LIMIT K).
84+
/// Used by the parquet reader for cumulative RG pruning after reorder.
85+
fetch: Option<usize>,
8386
}
8487

8588
#[derive(Debug)]
@@ -184,6 +187,7 @@ impl DynamicFilterPhysicalExpr {
184187
data_type: Arc::new(RwLock::new(None)),
185188
nullable: Arc::new(RwLock::new(None)),
186189
sort_options: None,
190+
fetch: None,
187191
}
188192
}
189193

@@ -196,9 +200,11 @@ impl DynamicFilterPhysicalExpr {
196200
children: Vec<Arc<dyn PhysicalExpr>>,
197201
inner: Arc<dyn PhysicalExpr>,
198202
sort_options: Vec<SortOptions>,
203+
fetch: Option<usize>,
199204
) -> Self {
200205
let mut this = Self::new(children, inner);
201206
this.sort_options = Some(sort_options);
207+
this.fetch = fetch;
202208
this
203209
}
204210

@@ -207,6 +213,11 @@ impl DynamicFilterPhysicalExpr {
207213
self.sort_options.as_deref()
208214
}
209215

216+
/// Returns the TopK fetch limit (K), if available.
217+
pub fn fetch(&self) -> Option<usize> {
218+
self.fetch
219+
}
220+
210221
fn remap_children(
211222
children: &[Arc<dyn PhysicalExpr>],
212223
remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
@@ -396,6 +407,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
396407
data_type: Arc::clone(&self.data_type),
397408
nullable: Arc::clone(&self.nullable),
398409
sort_options: self.sort_options.clone(),
410+
fetch: self.fetch,
399411
}))
400412
}
401413

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ impl SortExec {
925925
children,
926926
lit(true),
927927
sort_options,
928+
self.fetch,
928929
),
929930
))))
930931
}

0 commit comments

Comments
 (0)