Skip to content

Commit 706b6d4

Browse files
committed
feat(unparser): Keep inner join Filter → TableScan predicates to WHERE instead of moving to JOIN ON
1 parent e5966b5 commit 706b6d4

2 files changed

Lines changed: 58 additions & 17 deletions

File tree

datafusion/sql/src/unparser/plan.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -683,28 +683,37 @@ impl Unparser<'_> {
683683

684684
let join_filters = if table_scan_filters.is_empty() {
685685
join.filter.clone()
686+
} else if join.join_type == JoinType::Inner {
687+
// For inner joins, ON and WHERE are semantically equivalent. Place table_scan_filters in WHERE because they may contain
688+
// subquery expressions, which break dialects that reject subqueries in JOIN ON (e.g. BigQuery).
689+
for filter in table_scan_filters {
690+
let filter_expr = self.expr_to_sql(&filter)?;
691+
select.selection(Some(filter_expr));
692+
}
693+
join.filter.clone()
686694
} else {
687695
// Combine `table_scan_filters` into a single filter using `AND`
688-
let Some(combined_filters) =
696+
let combined_filters =
689697
table_scan_filters.into_iter().reduce(|acc, filter| {
690698
Expr::BinaryExpr(BinaryExpr {
691699
left: Box::new(acc),
692700
op: Operator::And,
693701
right: Box::new(filter),
694702
})
695-
})
696-
else {
697-
return internal_err!("Failed to combine TableScan filters");
698-
};
703+
});
699704

700705
// Combine `join.filter` with `combined_filters` using `AND`
701-
match &join.filter {
702-
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
703-
left: Box::new(filter.clone()),
704-
op: Operator::And,
705-
right: Box::new(combined_filters),
706-
})),
707-
None => Some(combined_filters),
706+
match (&join.filter, combined_filters) {
707+
(Some(filter), Some(combined)) => {
708+
Some(Expr::BinaryExpr(BinaryExpr {
709+
left: Box::new(filter.clone()),
710+
op: Operator::And,
711+
right: Box::new(combined),
712+
}))
713+
}
714+
(Some(filter), None) => Some(filter.clone()),
715+
(None, Some(combined)) => Some(combined),
716+
(None, None) => None,
708717
}
709718
};
710719

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_expr::test::function_stub::{
2828
use datafusion_expr::{
2929
EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union,
3030
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame,
31-
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
31+
WindowFunctionDefinition, cast, col, lit, scalar_subquery, table_scan, wildcard,
3232
};
3333
use datafusion_functions::unicode;
3434
use datafusion_functions_aggregate::grouping::grouping_udaf;
@@ -1828,7 +1828,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18281828
let sql = plan_to_sql(&join_plan_with_filter)?;
18291829
assert_snapshot!(
18301830
sql,
1831-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#
1831+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
18321832
);
18331833

18341834
let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone())
@@ -1843,7 +1843,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18431843
let sql = plan_to_sql(&join_plan_no_filter)?;
18441844
assert_snapshot!(
18451845
sql,
1846-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#
1846+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
18471847
);
18481848

18491849
let right_plan_with_filter = table_scan_with_filters(
@@ -1868,7 +1868,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18681868
let sql = plan_to_sql(&join_plan_multiple_filters)?;
18691869
assert_snapshot!(
18701870
sql,
1871-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#
1871+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val') AND (age > 10)"#
18721872
);
18731873

18741874
let right_plan_with_filter_schema = table_scan_with_filters(
@@ -1898,7 +1898,39 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18981898
let sql = plan_to_sql(&join_plan_duplicated_filter)?;
18991899
assert_snapshot!(
19001900
sql,
1901-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"#
1901+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10) AND (right_table.age < 11)"#
1902+
);
1903+
1904+
// Inner join with a scalar subquery in table_scan_filters. The subquery filter should appear in WHERE, not in JOIN ON,
1905+
// since dialects like BigQuery reject subqueries in join predicates.
1906+
let schema_subquery = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
1907+
let subquery_plan = table_scan(Some("subquery_table"), &schema_subquery, None)?
1908+
.aggregate(vec![] as Vec<Expr>, vec![max(col("subquery_table.id"))])?
1909+
.build()?;
1910+
let right_plan_with_subquery = table_scan_with_filters(
1911+
Some("right_table"),
1912+
&schema_right,
1913+
None,
1914+
vec![col("right_table.id").eq(scalar_subquery(Arc::new(subquery_plan)))],
1915+
)?
1916+
.build()?;
1917+
1918+
let left_plan =
1919+
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;
1920+
1921+
let join_plan_subquery_filter = LogicalPlanBuilder::from(left_plan)
1922+
.join(
1923+
right_plan_with_subquery,
1924+
datafusion_expr::JoinType::Inner,
1925+
(vec!["left_table.id"], vec!["right_table.id"]),
1926+
None,
1927+
)?
1928+
.build()?;
1929+
1930+
let sql = plan_to_sql(&join_plan_subquery_filter)?;
1931+
assert_snapshot!(
1932+
sql,
1933+
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE (right_table.id = (SELECT max(subquery_table.id) FROM subquery_table))"#
19021934
);
19031935

19041936
Ok(())

0 commit comments

Comments
 (0)