Skip to content

Commit 936db37

Browse files
SubhamSinghalSubham Singhal2010YOUY01
authored
Perf: Window topn optimisation (#21479)
## Which issue does this PR close? - Related to #6899. ## Rationale for this change Queries like `SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= K` are extremely common in analytics ("top N per group"). The current plan sorts the **entire** dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K. This PR introduces a `PartitionedTopKExec` operator that replaces the `SortExec`, maintaining a per-partition `TopK` heap (reusing DataFusion's existing `TopK` implementation). Cost drops to O(N log K) time and O(K × P × row_size) memory. ## What changes are included in this PR? **New physical operator: `PartitionedTopKExec`** (`physical-plan/src/sorts/partitioned_topk.rs`) - Reads unsorted input, groups rows by partition key using `RowConverter`, feeds sub-batches to a per-partition `TopK` heap - Emits only the top-K rows per partition in sorted `(partition_keys, order_keys)` order - Reuses the existing `TopK` implementation for heap management, sort key comparison, eviction, and batch compaction **New optimizer rule: `WindowTopN`** (`physical-optimizer/src/window_topn.rs`) Detects the pattern: ```text FilterExec(rn <= K) [optional ProjectionExec] BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) SortExec(partition_keys, order_keys) ``` And replaces it with: ```text [optional ProjectionExec] BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) PartitionedTopKExec(fetch=K) ``` Both `FilterExec` and `SortExec` are removed. Supported predicates: `rn <= K`, `rn < K`, `K >= rn`, `K > rn`. The rule only fires for `ROW_NUMBER` with a `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already handled by `SortExec` with `fetch`. **Config flag:** `datafusion.optimizer.enable_window_topn` (default: `true`) **Benchmark results** (H2O groupby Q8, 10M rows, top-2 per partition): cargo run --release --example h2o_window_topn_bench | Scenario | Enabled (ms) | Disabled (ms) | Speedup | |----------|-------------|--------------|---------| | 100 partitions (100K rows/part) | 43 | 174 | 4.0x | | 1K partitions (10K rows/part) | 71 | 146 | 2.1x | | 10K partitions (1K rows/part) | 619 | 128 | 0.2x (regression) | | 100K partitions (100 rows/part) | 4368 | 135 | 0.03x (regression) | The 100K-partition regression is expected: per-partition `TopK` overhead (RowConverter, MemoryReservation per instance) dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the optimization provides 2-3x speedup. ## Are these changes tested? Yes: - **7 unit tests** (`core/tests/physical_optimizer/window_topn.rs`): basic ROW_NUMBER, `rn < K`, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and window - **5 SLT tests** (`sqllogictest/test_files/window_topn.slt`): correctness verification, EXPLAIN plan validation, `rn < K`, no-partition-by case, config disabled fallback ## Are there any user-facing changes? No breaking API changes. The optimization is disabled by default and transparent to users. It can be enabled via: ```sql SET datafusion.optimizer.enable_window_topn = true; ``` --------- Co-authored-by: Subham Singhal <subhamsinghal@Subhams-MacBook-Air.local> Co-authored-by: Yongting You <2010youy01@gmail.com>
1 parent dc6142e commit 936db37

File tree

14 files changed

+1941
-2
lines changed

14 files changed

+1941
-2
lines changed

benchmarks/queries/h2o/window.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,11 @@ SELECT
109109
id3,
110110
v2,
111111
sum(v2) OVER (PARTITION BY id2 ORDER BY v2 RANGE BETWEEN 3 PRECEDING AND CURRENT ROW) AS my_range_between_by_id2
112-
FROM large;
112+
FROM large;
113+
114+
-- Window Top-N (ROW_NUMBER top-2 per partition)
115+
SELECT id2, largest2_v2 FROM (
116+
SELECT id2, v2 AS largest2_v2,
117+
ROW_NUMBER() OVER (PARTITION BY id2 ORDER BY v2 DESC) AS order_v2
118+
FROM large WHERE v2 IS NOT NULL
119+
) sub_query WHERE order_v2 <= 2;

datafusion/common/src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,15 @@ config_namespace! {
10871087
/// past window functions, if possible
10881088
pub enable_window_limits: bool, default = true
10891089

1090+
/// When set to true, the optimizer will replace
1091+
/// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a
1092+
/// PartitionedTopKExec that maintains per-partition heaps, avoiding
1093+
/// a full sort of the input.
1094+
/// When the window partition key has low cardinality, enabling this optimization
1095+
/// can improve performance. However, for high cardinality keys, it may
1096+
/// cause regressions in both memory usage and runtime.
1097+
pub enable_window_topn: bool, default = false
1098+
10901099
/// When set to true, the optimizer will push TopK (Sort with fetch)
10911100
/// below hash repartition when the partition key is a prefix of the
10921101
/// sort key, reducing data volume before the shuffle.

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ mod sanity_checker;
3737
#[expect(clippy::needless_pass_by_value)]
3838
mod test_utils;
3939
mod window_optimize;
40+
mod window_topn;
4041

4142
mod pushdown_utils;

0 commit comments

Comments
 (0)