Skip to content

Commit c4562dc

Browse files
authored
[Minor]: support window functions in order by expressions (#20963)
## Which issue does this PR close? - Closes #608. ## Rationale for this change #608 has details but as @alamb describes in #608 (comment) ``` SELECT c2 FROM test ORDER BY max(c3) OVER (ORDER BY c9); ``` fails with: ``` This feature is not implemented: Physical plan does not support logical expression WindowFunction(WindowFunction { fun: AggregateUDF(AggregateUDF { inner: Max { signature: Signature { type_signature: UserDefined, volatility: Immutable, parameter_names: None } } }), params: WindowFunctionParams { args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c3" })], partition_by: [], order_by: [Sort { expr: Column(Column { relation: Some(Bare { table: "test" }), name: "c9" }), asc: true, nulls_first: false }], window_frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }, filter: None, null_treatment: None, distinct: false } }) ``` ## What changes are included in this PR? - relevant change - slt and unit tests ## Are these changes tested? Yes added both unit tests for plan and slt tests for checking output ## Are there any user-facing changes? Additive user changes that users can now use window in order by expressions
1 parent 139b0b4 commit c4562dc

3 files changed

Lines changed: 222 additions & 10 deletions

File tree

datafusion/sql/src/select.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
257257
select_exprs: mut select_exprs_post_aggr,
258258
having_expr: having_expr_post_aggr,
259259
qualify_expr: qualify_expr_post_aggr,
260-
order_by_exprs: order_by_rex,
260+
order_by_exprs: mut order_by_rex,
261261
} = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
262262
self.aggregate(
263263
&base_plan,
@@ -293,14 +293,23 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
293293
plan
294294
};
295295

296-
// The outer expressions we will search through for window functions.
297-
// Window functions may be sourced from the SELECT list or from the QUALIFY expression.
298-
let windows_expr_haystack = select_exprs_post_aggr
299-
.iter()
300-
.chain(qualify_expr_post_aggr.iter());
296+
// The window expressions from SELECT and QUALIFY only, used to validate that
297+
// QUALIFY is used with window functions (ORDER BY window functions don't count).
298+
let qualify_window_func_exprs = find_window_exprs(
299+
select_exprs_post_aggr
300+
.iter()
301+
.chain(qualify_expr_post_aggr.iter()),
302+
);
303+
301304
// All of the window expressions (deduplicated and rewritten to reference aggregates as
302-
// columns from input).
303-
let window_func_exprs = find_window_exprs(windows_expr_haystack);
305+
// columns from input). Window functions may be sourced from the SELECT list, QUALIFY
306+
// expression, or ORDER BY.
307+
let window_func_exprs = find_window_exprs(
308+
select_exprs_post_aggr
309+
.iter()
310+
.chain(qualify_expr_post_aggr.iter())
311+
.chain(order_by_rex.iter().map(|s| &s.expr)),
312+
);
304313

305314
// Process window functions after aggregation as they can reference
306315
// aggregate functions in their body
@@ -315,14 +324,25 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
315324
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
316325
.collect::<Result<Vec<Expr>>>()?;
317326

327+
order_by_rex = order_by_rex
328+
.into_iter()
329+
.map(|sort_expr| {
330+
Ok(sort_expr.with_expr(rebase_expr(
331+
&sort_expr.expr,
332+
&window_func_exprs,
333+
&plan,
334+
)?))
335+
})
336+
.collect::<Result<Vec<_>>>()?;
337+
318338
plan
319339
};
320340

321341
// Process QUALIFY clause after window functions
322342
// QUALIFY filters the results of window functions, similar to how HAVING filters aggregates
323343
let plan = if let Some(qualify_expr) = qualify_expr_post_aggr {
324-
// Validate that QUALIFY is used with window functions
325-
if window_func_exprs.is_empty() {
344+
// Validate that QUALIFY is used with window functions in SELECT or QUALIFY
345+
if qualify_window_func_exprs.is_empty() {
326346
return plan_err!(
327347
"QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
328348
);

datafusion/sql/tests/sql_integration.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,6 +2805,138 @@ fn over_order_by_with_window_frame_double_end() {
28052805
);
28062806
}
28072807

2808+
#[test]
2809+
fn window_function_only_in_order_by() {
2810+
let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)";
2811+
let plan = logical_plan(sql).unwrap();
2812+
assert_snapshot!(
2813+
plan,
2814+
@r"
2815+
Projection: orders.order_id
2816+
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
2817+
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2818+
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2819+
TableScan: orders
2820+
"
2821+
);
2822+
}
2823+
2824+
#[test]
2825+
fn window_function_in_select_and_order_by() {
2826+
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id) FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)";
2827+
let plan = logical_plan(sql).unwrap();
2828+
assert_snapshot!(
2829+
plan,
2830+
@r"
2831+
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
2832+
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2833+
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2834+
TableScan: orders
2835+
"
2836+
);
2837+
}
2838+
2839+
#[test]
2840+
fn window_function_in_order_by_nested_expr() {
2841+
let sql =
2842+
"SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) + 1";
2843+
let plan = logical_plan(sql).unwrap();
2844+
assert_snapshot!(
2845+
plan,
2846+
@r"
2847+
Projection: orders.order_id
2848+
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + Int64(1) ASC NULLS LAST
2849+
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2850+
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2851+
TableScan: orders
2852+
"
2853+
);
2854+
}
2855+
2856+
#[test]
2857+
fn window_function_in_order_by_desc() {
2858+
let sql =
2859+
"SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) DESC";
2860+
let plan = logical_plan(sql).unwrap();
2861+
assert_snapshot!(
2862+
plan,
2863+
@r"
2864+
Projection: orders.order_id
2865+
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW DESC NULLS FIRST
2866+
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2867+
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2868+
TableScan: orders
2869+
"
2870+
);
2871+
}
2872+
2873+
#[test]
2874+
fn multiple_window_functions_in_order_by() {
2875+
let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id), MIN(qty) OVER (ORDER BY order_id DESC)";
2876+
let plan = logical_plan(sql).unwrap();
2877+
assert_snapshot!(
2878+
plan,
2879+
@r"
2880+
Projection: orders.order_id
2881+
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
2882+
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2883+
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2884+
WindowAggr: windowExpr=[[min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2885+
TableScan: orders
2886+
"
2887+
);
2888+
}
2889+
2890+
#[test]
2891+
fn window_function_in_order_by_with_group_by() {
2892+
let sql = "SELECT order_id, SUM(qty) FROM orders GROUP BY order_id ORDER BY MAX(SUM(qty)) OVER (ORDER BY order_id)";
2893+
let plan = logical_plan(sql).unwrap();
2894+
assert_snapshot!(
2895+
plan,
2896+
@r"
2897+
Projection: orders.order_id, sum(orders.qty)
2898+
Sort: max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
2899+
Projection: orders.order_id, sum(orders.qty), max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2900+
WindowAggr: windowExpr=[[max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2901+
Aggregate: groupBy=[[orders.order_id]], aggr=[[sum(orders.qty)]]
2902+
TableScan: orders
2903+
"
2904+
);
2905+
}
2906+
2907+
#[test]
2908+
fn window_function_in_order_by_with_qualify() {
2909+
let sql = "SELECT person.id, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn FROM person QUALIFY rn = 1 ORDER BY ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id)";
2910+
let plan = logical_plan(sql).unwrap();
2911+
assert_snapshot!(
2912+
plan,
2913+
@r"
2914+
Sort: rn ASC NULLS LAST
2915+
Projection: person.id, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn
2916+
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = Int64(1)
2917+
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2918+
TableScan: person
2919+
"
2920+
);
2921+
}
2922+
2923+
#[test]
2924+
fn window_function_in_order_by_not_in_select() {
2925+
let sql =
2926+
"SELECT order_id FROM orders ORDER BY MIN(qty) OVER (PARTITION BY order_id)";
2927+
let plan = logical_plan(sql).unwrap();
2928+
assert_snapshot!(
2929+
plan,
2930+
@r"
2931+
Projection: orders.order_id
2932+
Sort: min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ASC NULLS LAST
2933+
Projection: orders.order_id, min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
2934+
WindowAggr: windowExpr=[[min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
2935+
TableScan: orders
2936+
"
2937+
);
2938+
}
2939+
28082940
#[test]
28092941
fn over_order_by_with_window_frame_single_end() {
28102942
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id ROWS 3 PRECEDING), MIN(qty) OVER (ORDER BY order_id DESC) from orders";
@@ -4256,6 +4388,16 @@ fn test_select_qualify_without_window_function() {
42564388
);
42574389
}
42584390

4391+
#[test]
4392+
fn test_select_qualify_without_window_function_but_window_in_order_by() {
4393+
let sql = "SELECT person.id FROM person QUALIFY person.id > 1 ORDER BY ROW_NUMBER() OVER (ORDER BY person.id)";
4394+
let err = logical_plan(sql).unwrap_err();
4395+
assert_eq!(
4396+
err.strip_backtrace(),
4397+
"Error during planning: QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
4398+
);
4399+
}
4400+
42594401
#[test]
42604402
fn test_select_qualify_complex_condition() {
42614403
let sql = "SELECT person.id, person.age, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn, RANK() OVER (ORDER BY person.salary) as rank FROM person QUALIFY rn <= 2 AND rank <= 5";

datafusion/sqllogictest/test_files/window.slt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5664,6 +5664,56 @@ DROP TABLE new_table;
56645664
statement ok
56655665
DROP TABLE aggregate_test_100_utf8view;
56665666

5667+
# Window function only in ORDER BY
5668+
query I
5669+
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5;
5670+
----
5671+
4
5672+
2
5673+
5
5674+
2
5675+
2
5676+
5677+
# Window function in both SELECT and ORDER BY (deduplication)
5678+
query II
5679+
SELECT c2, row_number() OVER (ORDER BY c9) as rn FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5;
5680+
----
5681+
4 1
5682+
2 2
5683+
5 3
5684+
2 4
5685+
2 5
5686+
5687+
# Nested expression: ORDER BY window_func(...) + 1
5688+
query I
5689+
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) + 1 LIMIT 5;
5690+
----
5691+
4
5692+
2
5693+
5
5694+
2
5695+
2
5696+
5697+
# Multiple window functions in ORDER BY
5698+
query I
5699+
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9), max(c3) OVER (ORDER BY c9) LIMIT 5;
5700+
----
5701+
4
5702+
2
5703+
5
5704+
2
5705+
2
5706+
5707+
# DESC ordering with window function
5708+
query I
5709+
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) DESC LIMIT 5;
5710+
----
5711+
5
5712+
1
5713+
1
5714+
2
5715+
1
5716+
56675717
statement ok
56685718
DROP TABLE aggregate_test_100
56695719

0 commit comments

Comments
 (0)