Skip to content

Commit 7e5d9d2

Browse files
committed
Add optimizer-level test for scalar subquery filter
Introduce a focused optimizer-level reproducer for the scalar-subquery Cross Join + Filter scenario. This test case highlights the bug in PushDownFilter, showing that it incorrectly optimizes to Inner Join instead of retaining the Filter above the Cross Join. This change helps ensure future stability and correctness.
1 parent b7908a5 commit 7e5d9d2

1 file changed

Lines changed: 53 additions & 1 deletion

File tree

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1495,7 +1495,7 @@ mod tests {
14951495
use crate::simplify_expressions::SimplifyExpressions;
14961496
use crate::test::udfs::leaf_udf_expr;
14971497
use crate::test::*;
1498-
use datafusion_expr::test::function_stub::sum;
1498+
use datafusion_expr::test::function_stub::{avg, sum};
14991499
use insta::assert_snapshot;
15001500

15011501
use super::*;
@@ -2419,6 +2419,58 @@ mod tests {
24192419
)
24202420
}
24212421

2422+
#[test]
2423+
#[ignore = "FIX_06 step(1): reproduces current scalar-subquery cross-join promotion regression"]
2424+
fn window_over_scalar_subquery_cross_join_keeps_filter_above_join() -> Result<()> {
2425+
let left = LogicalPlanBuilder::from(test_table_scan()?)
2426+
.project(vec![col("a").alias("nation"), col("b").alias("acctbal")])?
2427+
.alias("s")?
2428+
.build()?;
2429+
let right = LogicalPlanBuilder::from(test_table_scan_with_name("test1")?)
2430+
.project(vec![col("a").alias("acctbal")])?
2431+
.aggregate(
2432+
Vec::<Expr>::new(),
2433+
vec![avg(col("acctbal")).alias("avg_acctbal")],
2434+
)?
2435+
.alias("__scalar_sq_1")?
2436+
.build()?;
2437+
2438+
let window = Expr::from(WindowFunction::new(
2439+
WindowFunctionDefinition::WindowUDF(
2440+
datafusion_functions_window::row_number::row_number_udwf(),
2441+
),
2442+
vec![],
2443+
))
2444+
.partition_by(vec![col("s.nation")])
2445+
.order_by(vec![col("s.acctbal").sort(false, true)])
2446+
.build()
2447+
.unwrap();
2448+
2449+
let plan = LogicalPlanBuilder::from(left)
2450+
.cross_join(right)?
2451+
.filter(col("s.acctbal").gt(col("__scalar_sq_1.avg_acctbal")))?
2452+
.project(vec![col("s.nation"), col("s.acctbal")])?
2453+
.window(vec![window])?
2454+
.build()?;
2455+
2456+
assert_optimized_plan_equal!(
2457+
plan,
2458+
@r"
2459+
WindowAggr: windowExpr=[[row_number() PARTITION BY [s.nation] ORDER BY [s.acctbal DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
2460+
Projection: s.nation, s.acctbal
2461+
Filter: s.acctbal > __scalar_sq_1.avg_acctbal
2462+
Cross Join:
2463+
SubqueryAlias: s
2464+
Projection: test.a AS nation, test.b AS acctbal
2465+
TableScan: test
2466+
SubqueryAlias: __scalar_sq_1
2467+
Aggregate: groupBy=[[]], aggr=[[avg(test1.a) AS avg_acctbal]]
2468+
Projection: test1.a AS acctbal
2469+
TableScan: test1
2470+
"
2471+
)
2472+
}
2473+
24222474
/// verifies that filters with the same columns are correctly placed
24232475
#[test]
24242476
fn filter_2_breaks_limits() -> Result<()> {

0 commit comments

Comments
 (0)