Skip to content

Commit c2c0773

Browse files
authored
feat(unparser): Keep inner join Filter → TableScan predicates to WHERE instead of moving to JOIN ON (#21694)
## Which issue does this PR close? Partially addresses #13156 (inner joins only; outer joins require additional work) When the DataFusion optimizer pushes filter predicates into `TableScan` nodes (e.g. via `FilterPushdown`), the unparser's `try_transform_to_simple_table_scan_with_filters` extracts those filters and then always folds them into the `JOIN ON` clause. This is problematic when the extracted filters contain subquery expressions (scalar subqueries, `IN`, `EXISTS`), because some SQL backends — notably BigQuery — reject subqueries inside `JOIN ON` predicates. This currently breaks 5 TPC-H queries (Q2, Q16, Q17, Q18, Q21) when unparsed SQL is sent to BigQuery. We did attempt to fix this (#13496) by moving **all** filters to `WHERE`, which broke `LEFT`/`RIGHT`/`FULL` join semantics (moving a filter from `ON` to `WHERE` changes the result for outer joins, as demonstrated in #13132). ## What changes are included in this PR? For **inner joins only**, `table_scan_filters` extracted by `try_transform_to_simple_table_scan_with_filters` are now placed in the `WHERE` clause instead of the `JOIN ON` clause. This is safe because `ON` and `WHERE` are semantically equivalent for inner joins. For non-inner joins (`LEFT`, `RIGHT`, `FULL`), the existing behavior is preserved — filters remain in `JOIN ON` — since moving them to `WHERE` would change query semantics. ## Are these changes tested? Yes. - Added a test case in `test_join_with_table_scan_filters` that constructs an inner join where the right side has a `table_scan_with_filters` containing a scalar subquery. Verifies the subquery predicate appears in `WHERE`, not `JOIN ON`. - Updated existing snapshots in `test_join_with_table_scan_filters` reflecting that `table_scan_filters` now appear in `WHERE` for inner joins. ## Are there any user-facing changes? SQL generated by the unparser for inner joins may now place `TableScan` pushdown filters in the `WHERE` clause instead of the `JOIN ON` clause (similar to changes in `test_join_with_table_scan_filters`) ## Alternatives considered An alternative approach considered is to introduce a `supports_subquery_in_join_predicate` dialect flag that only moves subquery-containing filters to `WHERE` when the dialect opts in (e.g. `BigQueryDialect`), preserving existing behavior for all other dialects. Example implementation: spiceai#151 Current approach was chosen due to - simplicity: no new dialect flag, fewer code paths, simple change. - performance (potentially): as noted in [#13156](#13156), placing filters in `WHERE` can trigger filter pushdown on the target backend, which is a potential performance win.
1 parent bbf67d9 commit c2c0773

2 files changed

Lines changed: 157 additions & 31 deletions

File tree

datafusion/sql/src/unparser/plan.rs

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -863,32 +863,15 @@ impl Unparser<'_> {
863863
&mut right_relation,
864864
)?;
865865

866-
let join_filters = if table_scan_filters.is_empty() {
867-
join.filter.clone()
868-
} else {
869-
// Combine `table_scan_filters` into a single filter using `AND`
870-
let Some(combined_filters) =
871-
table_scan_filters.into_iter().reduce(|acc, filter| {
872-
Expr::BinaryExpr(BinaryExpr {
873-
left: Box::new(acc),
874-
op: Operator::And,
875-
right: Box::new(filter),
876-
})
877-
})
878-
else {
879-
return internal_err!("Failed to combine TableScan filters");
880-
};
881-
882-
// Combine `join.filter` with `combined_filters` using `AND`
883-
match &join.filter {
884-
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
885-
left: Box::new(filter.clone()),
886-
op: Operator::And,
887-
right: Box::new(combined_filters),
888-
})),
889-
None => Some(combined_filters),
890-
}
891-
};
866+
let (join_filters, where_filters) = Self::split_join_on_and_where_filters(
867+
join.join_type,
868+
&join.filter,
869+
table_scan_filters,
870+
);
871+
for filter in where_filters {
872+
let filter_expr = self.expr_to_sql(&filter)?;
873+
select.selection(Some(filter_expr));
874+
}
892875

893876
let join_constraint = self.join_constraint_to_sql(
894877
join.join_constraint,
@@ -1936,6 +1919,52 @@ impl Unparser<'_> {
19361919
vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
19371920
}
19381921
}
1922+
1923+
/// Decides where extracted table-scan filters belong in the unparsed SQL:
1924+
/// in the `JOIN ON` clause or in `WHERE`.
1925+
///
1926+
/// For inner joins the two are semantically equivalent, so filters go to
1927+
/// `WHERE` (some dialects reject subqueries inside `JOIN ON`).
1928+
/// For outer joins the filters are AND-folded into `ON` to preserve correctness.
1929+
///
1930+
/// Returns `(on_filter, where_filters)`.
1931+
fn split_join_on_and_where_filters(
1932+
join_type: JoinType,
1933+
join_filter: &Option<Expr>,
1934+
table_scan_filters: Vec<Expr>,
1935+
) -> (Option<Expr>, Vec<Expr>) {
1936+
if table_scan_filters.is_empty() {
1937+
return (join_filter.clone(), vec![]);
1938+
}
1939+
1940+
if join_type == JoinType::Inner {
1941+
// ON and WHERE are equivalent for inner joins; prefer WHERE
1942+
// because some dialects reject subqueries inside JOIN ON.
1943+
return (join_filter.clone(), table_scan_filters);
1944+
}
1945+
1946+
// Outer joins: fold table-scan filters into ON to preserve semantics.
1947+
let combined = table_scan_filters.into_iter().reduce(|acc, filter| {
1948+
Expr::BinaryExpr(BinaryExpr {
1949+
left: Box::new(acc),
1950+
op: Operator::And,
1951+
right: Box::new(filter),
1952+
})
1953+
});
1954+
1955+
let on_filter = match (join_filter, combined) {
1956+
(Some(jf), Some(c)) => Some(Expr::BinaryExpr(BinaryExpr {
1957+
left: Box::new(jf.clone()),
1958+
op: Operator::And,
1959+
right: Box::new(c),
1960+
})),
1961+
(Some(jf), None) => Some(jf.clone()),
1962+
(None, Some(c)) => Some(c),
1963+
(None, None) => None,
1964+
};
1965+
1966+
(on_filter, vec![])
1967+
}
19391968
}
19401969

19411970
impl From<BuilderError> for DataFusionError {

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use datafusion_expr::{
2929
ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder,
3030
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union,
3131
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame,
32-
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
32+
WindowFunctionDefinition, cast, col, exists, in_subquery, lit, scalar_subquery,
33+
table_scan, wildcard,
3334
};
3435
use datafusion_functions::unicode;
3536
use datafusion_functions_aggregate::grouping::grouping_udaf;
@@ -1829,7 +1830,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18291830
let sql = plan_to_sql(&join_plan_with_filter)?;
18301831
assert_snapshot!(
18311832
sql,
1832-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#
1833+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
18331834
);
18341835

18351836
let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone())
@@ -1844,7 +1845,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18441845
let sql = plan_to_sql(&join_plan_no_filter)?;
18451846
assert_snapshot!(
18461847
sql,
1847-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#
1848+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
18481849
);
18491850

18501851
let right_plan_with_filter = table_scan_with_filters(
@@ -1869,7 +1870,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18691870
let sql = plan_to_sql(&join_plan_multiple_filters)?;
18701871
assert_snapshot!(
18711872
sql,
1872-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#
1873+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val') AND (age > 10)"#
18731874
);
18741875

18751876
let right_plan_with_filter_schema = table_scan_with_filters(
@@ -1899,7 +1900,103 @@ fn test_join_with_table_scan_filters() -> Result<()> {
18991900
let sql = plan_to_sql(&join_plan_duplicated_filter)?;
19001901
assert_snapshot!(
19011902
sql,
1902-
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"#
1903+
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10) AND (right_table.age < 11)"#
1904+
);
1905+
1906+
// Inner join with a scalar subquery in table_scan_filters. The subquery filter should appear in WHERE, not in JOIN ON,
1907+
// since dialects like BigQuery reject subqueries in join predicates.
1908+
let schema_subquery = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
1909+
let subquery_plan = table_scan(Some("subquery_table"), &schema_subquery, None)?
1910+
.aggregate(vec![] as Vec<Expr>, vec![max(col("subquery_table.id"))])?
1911+
.build()?;
1912+
let right_plan_with_subquery = table_scan_with_filters(
1913+
Some("right_table"),
1914+
&schema_right,
1915+
None,
1916+
vec![col("right_table.id").eq(scalar_subquery(Arc::new(subquery_plan)))],
1917+
)?
1918+
.build()?;
1919+
1920+
let left_plan =
1921+
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;
1922+
1923+
let join_plan_subquery_filter = LogicalPlanBuilder::from(left_plan)
1924+
.join(
1925+
right_plan_with_subquery,
1926+
datafusion_expr::JoinType::Inner,
1927+
(vec!["left_table.id"], vec!["right_table.id"]),
1928+
None,
1929+
)?
1930+
.build()?;
1931+
1932+
let sql = plan_to_sql(&join_plan_subquery_filter)?;
1933+
assert_snapshot!(
1934+
sql,
1935+
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE (right_table.id = (SELECT max(subquery_table.id) FROM subquery_table))"#
1936+
);
1937+
1938+
// Inner join with an IN subquery in table_scan_filters.
1939+
let subquery_plan_in = table_scan(Some("subquery_table"), &schema_subquery, None)?
1940+
.project(vec![col("subquery_table.id")])?
1941+
.build()?;
1942+
let right_plan_with_in = table_scan_with_filters(
1943+
Some("right_table"),
1944+
&schema_right,
1945+
None,
1946+
vec![in_subquery(
1947+
col("right_table.id"),
1948+
Arc::new(subquery_plan_in),
1949+
)],
1950+
)?
1951+
.build()?;
1952+
1953+
let left_plan_in =
1954+
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;
1955+
1956+
let join_plan_in_subquery = LogicalPlanBuilder::from(left_plan_in)
1957+
.join(
1958+
right_plan_with_in,
1959+
datafusion_expr::JoinType::Inner,
1960+
(vec!["left_table.id"], vec!["right_table.id"]),
1961+
None,
1962+
)?
1963+
.build()?;
1964+
1965+
let sql = plan_to_sql(&join_plan_in_subquery)?;
1966+
assert_snapshot!(
1967+
sql,
1968+
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE right_table.id IN (SELECT subquery_table.id FROM subquery_table)"#
1969+
);
1970+
1971+
// Inner join with an EXISTS subquery in table_scan_filters.
1972+
let subquery_plan_exists =
1973+
table_scan(Some("subquery_table"), &schema_subquery, None)?
1974+
.filter(col("subquery_table.id").eq(col("right_table.id")))?
1975+
.build()?;
1976+
let right_plan_with_exists = table_scan_with_filters(
1977+
Some("right_table"),
1978+
&schema_right,
1979+
None,
1980+
vec![exists(Arc::new(subquery_plan_exists))],
1981+
)?
1982+
.build()?;
1983+
1984+
let left_plan_exists =
1985+
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;
1986+
1987+
let join_plan_exists = LogicalPlanBuilder::from(left_plan_exists)
1988+
.join(
1989+
right_plan_with_exists,
1990+
datafusion_expr::JoinType::Inner,
1991+
(vec!["left_table.id"], vec!["right_table.id"]),
1992+
None,
1993+
)?
1994+
.build()?;
1995+
1996+
let sql = plan_to_sql(&join_plan_exists)?;
1997+
assert_snapshot!(
1998+
sql,
1999+
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE EXISTS (SELECT * FROM subquery_table WHERE (subquery_table.id = right_table.id))"#
19032000
);
19042001

19052002
Ok(())

0 commit comments

Comments
 (0)