feat: statistics-driven TopK optimization for parquet (file reorder + RG reorder + threshold init + cumulative prune)#21580
Conversation
3700464 to
a013bf6
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR improves TopK performance for Parquet scans when sort pushdown is Inexact by enabling row-group reordering based on statistics, so likely “best” row groups are read earlier and dynamic filters can tighten sooner.
Changes:
- Thread an optional
LexOrderingfromParquetSource::try_pushdown_sortthroughParquetMorselizerto the access-plan preparation step. - Add
PreparedAccessPlan::reorder_by_statisticsto reorderrow_group_indexesusing Parquet statistics. - Add unit tests covering reorder/skip behavior for multiple edge cases.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/source.rs | Plumbs sort ordering into the file source for later row-group reordering. |
| datafusion/datasource-parquet/src/opener.rs | Carries optional sort ordering into the opener and applies reorder_by_statistics during plan preparation. |
| datafusion/datasource-parquet/src/access_plan.rs | Implements row-group reordering by statistics and adds focused unit tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let sort_order = LexOrdering::new(order.iter().cloned()); | ||
| let mut new_source = self.clone().with_reverse_row_groups(true); | ||
| new_source.sort_order_for_reorder = sort_order; |
There was a problem hiding this comment.
LexOrdering::new(...) appears to return a Result<LexOrdering, _> (as used with .unwrap() in the new unit tests), but here it’s assigned directly without ?/unwrap, and then assigned to sort_order_for_reorder: Option<LexOrdering> without wrapping in Some(...). This should be changed to construct a LexOrdering with error propagation and store it as Some(sort_order) (or skip setting the field on error). Otherwise this won’t compile.
| let sort_order = LexOrdering::new(order.iter().cloned()); | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = sort_order; | |
| let sort_order = LexOrdering::new(order.iter().cloned())?; | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = Some(sort_order); |
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | ||
| let first_sort_expr = sort_order.first(); |
There was a problem hiding this comment.
sort_order.first() (if LexOrdering is Vec-like) returns Option<&PhysicalSortExpr>, but the code uses it as if it were &PhysicalSortExpr (first_sort_expr.expr...). This is likely a compile error. A concrete fix is to obtain the first element via iteration and handle the empty case (e.g., early-return Ok(self) if no sort expressions), then use the returned &PhysicalSortExpr.
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | |
| let first_sort_expr = sort_order.first(); | |
| let first_sort_expr = match sort_order.iter().next() { | |
| Some(expr) => expr, | |
| None => { | |
| debug!("Skipping RG reorder: empty sort order"); | |
| return Ok(self); | |
| } | |
| }; |
| } | ||
| }; | ||
|
|
||
| let descending = first_sort_expr.options.descending; |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | ||
| let rg_metadata: Vec<&RowGroupMetaData> = self | ||
| .row_group_indexes | ||
| .iter() | ||
| .map(|&idx| file_metadata.row_group(idx)) | ||
| .collect(); | ||
|
|
||
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | ||
| Ok(vals) => vals, | ||
| Err(e) => { | ||
| debug!("Skipping RG reorder: cannot get min values: {e}"); | ||
| return Ok(self); | ||
| } | ||
| }; | ||
|
|
||
| // Sort indices by min values | ||
| let sort_options = arrow::compute::SortOptions { | ||
| descending, | ||
| nulls_first: first_sort_expr.options.nulls_first, | ||
| }; | ||
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by min values | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &min_values, | |
| // Get values for the selected row groups: mins for ASC, maxs for DESC | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let sort_values = match if descending { | |
| converter.row_group_maxs(rg_metadata.iter().copied()) | |
| } else { | |
| converter.row_group_mins(rg_metadata.iter().copied()) | |
| } { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min/max values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by the statistics that best match the requested order | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &sort_values, |
There was a problem hiding this comment.
Yes, this is a good point.
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, | ||
| Some(sort_options), | ||
| None, | ||
| ) { |
There was a problem hiding this comment.
If multiple row groups share the same min (or max) statistic, sort_to_indices may not guarantee a deterministic/stable tie-breaker across platforms/versions. Since row-group order can affect scan reproducibility and performance debugging, consider adding a stable secondary key (e.g., original row group index) when statistics are equal.
| /// - 0 or 1 row groups (nothing to reorder) | ||
| /// - Sort expression is not a simple column reference | ||
| /// - Statistics are unavailable | ||
| pub(crate) fn reorder_by_statistics( |
There was a problem hiding this comment.
I think @adriangb had the great idea to also order by grouping keys which can
- reduce cardinality within partitions (partition-local state can be smaller)
- allow for better cache locality (row groups with more equal keys are grouped together)
Doesn't have to be in this PR but perhaps we can think about how it fits in.
There was a problem hiding this comment.
Thanks @Dandandan for review! That's a great extension. The reorder_by_statistics method is generic enough to take any LexOrdering — it doesn't need to be tied to TopK specifically. So extending this for GROUP BY should be a matter of:
- Computing a preferred RG ordering from grouping keys in the aggregate planner
- Passing it through to ParquetSource::sort_order_for_reorder
Happy to track this as a follow-up issue. Will open one after this PR lands.
There was a problem hiding this comment.
Thanks @Dandandan! Created #21581 to track this. The existing infrastructure from this PR should be directly reusable — mainly needs the aggregate planner to populate sort_order_for_reorder from grouping keys.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned clickbench_extended |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_extended File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
I wonder if the ordering should be done before the files / row groups are assigned to partitions? So they are more globally sorted instead of just per partition? It seems now they are sorted within each partition, which should help, but perhaps not nearly as much as it would be if all the partitions contain the optimal row groups? This would also help in the case of #21581 |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
File an issue against this benchmark runner |
Great point @Dandandan — you're right that global reorder is much more effective than per-partition reorder. With global reorder + round-robin distribution, each partition's first RG is close to the global optimum, so:
The current per-partition reorder is limited because even after sorting, partition 0's "best" RG might be much worse than the global best (which may have landed in partition 2). Moving to global reorder would require changes at the planning / EnforceDistribution layer to load RG statistics and redistribute RGs across partitions. I'd prefer to keep this PR as an incremental step (per-partition) and track global reorder as a follow-up — it would benefit both #21317 and #21581. Does this make sense? |
Sure, makes sense. |
Bring back stats init with all issues fixed: - GtEq/LtEq instead of Gt/Lt (include boundary values) - Use df.fetch() as limit (TopK K value, not scan limit) When K > single RG rows, stats init skips → cumulative prune handles it - Cast threshold to column data type (parquet vs table schema mismatch) - Null-aware filter for NULLS FIRST - Generation check prevents overwrite by later partitions - Restricted to sort pushdown + pure DynamicFilter (no WHERE) Stats init and cumulative prune are complementary: - Stats init: updates PruningPredicate → prunes at RG statistics level - Cumulative prune: truncates after reorder+reverse → prunes by row count Both work together without conflict when using df.fetch().
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (839ab5a) to 466c3ea (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
create_filter() was called before new_sort.fetch was set, so DynamicFilterPhysicalExpr.fetch was always 0 (or None from old self). Fix by setting fetch before creating the filter. This was the root cause of stats init and cumulative prune not triggering on CI — fetch=0 meant "no rows needed" → skip.
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a269ffd) to 466c3ea (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_1 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a269ffd) to 466c3ea (merge-base) diff using: clickbench_1 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_1 — base (merge-base)
clickbench_1 — branch
File an issue against this benchmark runner |
For GROUP BY + ORDER BY queries, the TopK sort column is an aggregate output (e.g. COUNT(*)) that doesn't exist in the parquet file schema. Previously we still created ReorderByStatistics which tried to look up the column in statistics — wasted work. Now check column existence in file schema before creating the optimizer. This eliminates overhead for non-scan-level TopK queries (ClickBench Q40-Q42 regression fix).
…pache#21711) ## Which issue does this PR close? Related to apache#21580 ## Rationale for this change The sort pushdown benchmark had two problems: 1. **Broken data generation**: The single-file ORDER BY approach caused the parquet writer to merge rows from adjacent chunks at RG boundaries, widening RG ranges to ~6M. The per-file split fix gave each file only 1 RG, so `reorder_by_statistics` (intra-file optimization) had nothing to reorder. 2. **Missing DESC LIMIT queries**: The `sort_pushdown` benchmark only had ASC queries (sort elimination). No queries tested the reverse scan + TopK path (Inexact sort pushdown), which is where RG reorder, stats init, and cumulative pruning provide 20-58x improvement. ## What changes are included in this PR? ### 1. Fix benchmark data generation Generate **multiple files with multiple scrambled RGs each**: - `inexact`: 3 files x ~20 RGs each - `overlap`: 5 files x ~12 RGs each Uses pyarrow to redistribute RGs from a sorted temp file into multiple output files with scrambled RG order. Each RG has a narrow `l_orderkey` range (~100K) but appears in scrambled order within its file. ### 2. Add DESC LIMIT queries to sort_pushdown benchmark New q5-q8 for `sort_pushdown` (sorted data, `WITH ORDER`): | Query | Description | |-------|-------------| | q5 | `ORDER BY l_orderkey DESC LIMIT 100` (narrow projection) | | q6 | `ORDER BY l_orderkey DESC LIMIT 1000` (narrow projection) | | q7 | `SELECT * ORDER BY l_orderkey DESC LIMIT 100` (wide projection) | | q8 | `SELECT * ORDER BY l_orderkey DESC LIMIT 1000` (wide projection) | These test the Inexact sort pushdown path: reverse scan + TopK + dynamic filter, which benefits from the optimizations in apache#21580. ## Are these changes tested? Benchmark changes only. Verified locally: - Data generation produces correct multi-file multi-RG output - DESC LIMIT queries return correct results - q5-q8 show 20-58x improvement with apache#21580 optimizations ## Are there any user-facing changes? No. Adds pyarrow as a dependency for generating benchmark datasets (`pip install pyarrow`).
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (bb87ff6) to 64619a6 (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
bb87ff6 to
5c31674
Compare
|
run benchmark clickbench_1 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (2081071) to 64619a6 (merge-base) diff using: clickbench_1 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_1 — base (merge-base)
clickbench_1 — branch
File an issue against this benchmark runner |
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (2081071) to 64619a6 (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
|
cc @adriangb @Dandandan @alamb The latest CI benchmark results show 2x-3x faster on sort_pushdown_inexact with no regression on ClickBench: Local benchmark on sorted single-file (61 RGs, narrow ranges) shows 17-60x faster — the full optimization chain (stats init + cumulative RG prune) skips 60 of 61 RGs with zero I/O. Would appreciate a review when you get a chance! The PR is ready for review. |
| let mut cumulative = 0usize; | ||
| let mut keep_count = 0; | ||
| for &idx in rg_indexes { | ||
| cumulative += file_metadata.row_group(idx).num_rows() as usize; |
There was a problem hiding this comment.
The parquet opener first turns the dynamic filter into a pushed-down row_filter / row_selection, but later the cumulative cutoff still sums raw row_group.num_rows() and truncates once that raw count reaches fetch. That is unsafe when early RGs contain many rows, but few rows survive the filter.
There was a problem hiding this comment.
Good point. Currently create_filter and fetch are set in the same method (with_fetch), and we fixed the ordering so fetch is set before create_filter is called. There's no separate code path that updates fetch
without recreating the filter.
But you're right that this coupling is fragile, if a future optimizer calls with_fetch independently, the filter's fetch would go stale. I'll optimize it as follow-up to consider making fetch on DynamicFilterPhysicalExpr read directly from SortExec.fetch (via shared reference) instead of copying the value at creation time.
|
|
||
| /// Keep only the first `count` row groups, dropping the rest. | ||
| /// Used for TopK cumulative pruning after reorder + reverse. | ||
| pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self { |
There was a problem hiding this comment.
it drops existing row_selection entirely after truncation, which can widen the scan back to full row groups or discard exact page-level pruning state
There was a problem hiding this comment.
Good catch @xudong963, currently truncate_row_groups drops row_selection entirely, which loses page-level pruning state for the retained RGs. I'll fix this by skipping truncation when row_selection is present cumulative
prune will only apply when there's no page-level pruning active.
This is safe because page pruning is already reducing I/O within those RGs.
| new_sort.filter = fetch.is_some().then(|| { | ||
| // If we already have a filter, keep it. Otherwise, create a new one. | ||
| // Must be called after setting fetch so DynamicFilter gets the K value. | ||
| self.filter |
There was a problem hiding this comment.
SortExec::with_fetch can leave the embedded dynamic filter’s fetch stale. The new parquet optimizations read df.fetch() for threshold init and cumulative prune, so if any later optimizer rewrites fetch through with_fetch, parquet may prune using the old K
There was a problem hiding this comment.
This is actually safe in the current implementation because cumulative prune is guarded by is_pure_dynamic_filter — it only fires when the predicate is purely the DynamicFilterPhysicalExpr (no WHERE
clause). And cumulative prune runs before any data is read, so row_filter hasn't filtered any rows yet — num_rows() is accurate at this point.
If we extend this to WHERE queries in the future (e.g. with dynamic RG pruning at runtime via #21399), we'd need to switch from static row counting to runtime early termination.
|
|
||
| // Get the first sort expression | ||
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | ||
| let first_sort_expr = sort_order.first(); |
There was a problem hiding this comment.
what about multi-key ORDER BY?
There was a problem hiding this comment.
Thanks @xudong963 for review, yes, now only support sort one key, i will support multi-key ORDER BY as follow-up.
Which issue does this PR close?
Closes #21691
Partial fix for #21399
Rationale for this change
TopK queries (
ORDER BY col DESC/ASC LIMIT K) on parquet data have several inefficiencies:lit(true), so early RGs are never prunedWhat changes are included in this PR?
A chain of composable optimizations that minimize I/O for TopK queries:
1. Global file reorder (
FileSource::reorder_files)Sort files in the shared work queue by column statistics. DESC: highest min first; ASC: lowest max first. Works for ALL TopK via
DynamicFilterPhysicalExpr.sort_options. Bails fast when sort column not in file schema (GROUP BY + ORDER BY).2. RG reorder within file (
reorder_by_statistics)Reorder row groups by min values (ASC). Works for all TopK via DynamicFilter sort_options (with file schema check). Combined with reverse for DESC queries.
3. TopK threshold init from statistics (
try_init_topk_threshold)Before reading data, compute threshold from RG min/max stats. Runs BEFORE
PruningPredicatebuild so the threshold is compiled into the predicate. UsesGtEq/LtEqto include boundary values. Null-aware filter for NULLS FIRST. Usesdf.fetch()(TopK K value) so stats init skips when K spans multiple RGs. Restricted to sort pushdown + no WHERE (pure DynamicFilter predicate).4. Cumulative RG pruning (
truncate_row_groups)After reorder + reverse, accumulate rows from the front until >= K, prune the rest. For non-sort-pushdown TopK, guarded by a non-overlap check (
max(i) <= min(i+1)). Only when predicate is pure DynamicFilter (no WHERE).5. Compose reorder + reverse
Sequential steps instead of mutually exclusive. Reverse only triggers when reorder succeeds (sort column found in file schema).
How they work together
Coverage matrix
Local benchmark (single file, 61 sorted RGs, DESC LIMIT, 1 partition)
Key bug fix:
SortExec.fetchorderingcreate_filter()was called beforenew_sort.fetchwas set, soDynamicFilterPhysicalExpr.fetchwas always 0. Fixed by setting fetch before creating the filter.Changes to
DynamicFilterPhysicalExprsort_options: Option<Vec<SortOptions>>— sort direction for each childfetch: Option<usize>— TopK K value for cumulative pruningnew_with_sort_options()constructor,sort_options()andfetch()gettersSortExec::create_filter()for all TopK queriesAre these changes tested?
datafusion-datasource-parquet(all pass)test_fuzz_topk_filter_pushdown— updated with tiebreaker columns for deterministic ORDER BYAre there any user-facing changes?
No. Transparent optimization — same results, faster TopK on parquet with statistics.