diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 3e2f78772d6a5..2c36fe0b2c98a 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -863,32 +863,15 @@ impl Unparser<'_> { &mut right_relation, )?; - let join_filters = if table_scan_filters.is_empty() { - join.filter.clone() - } else { - // Combine `table_scan_filters` into a single filter using `AND` - let Some(combined_filters) = - table_scan_filters.into_iter().reduce(|acc, filter| { - Expr::BinaryExpr(BinaryExpr { - left: Box::new(acc), - op: Operator::And, - right: Box::new(filter), - }) - }) - else { - return internal_err!("Failed to combine TableScan filters"); - }; - - // Combine `join.filter` with `combined_filters` using `AND` - match &join.filter { - Some(filter) => Some(Expr::BinaryExpr(BinaryExpr { - left: Box::new(filter.clone()), - op: Operator::And, - right: Box::new(combined_filters), - })), - None => Some(combined_filters), - } - }; + let (join_filters, where_filters) = Self::split_join_on_and_where_filters( + join.join_type, + &join.filter, + table_scan_filters, + ); + for filter in where_filters { + let filter_expr = self.expr_to_sql(&filter)?; + select.selection(Some(filter_expr)); + } let join_constraint = self.join_constraint_to_sql( join.join_constraint, @@ -1936,6 +1919,52 @@ impl Unparser<'_> { vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)] } } + + /// Decides where extracted table-scan filters belong in the unparsed SQL: + /// in the `JOIN ON` clause or in `WHERE`. + /// + /// For inner joins the two are semantically equivalent, so filters go to + /// `WHERE` (some dialects reject subqueries inside `JOIN ON`). + /// For outer joins the filters are AND-folded into `ON` to preserve correctness. + /// + /// Returns `(on_filter, where_filters)`. + fn split_join_on_and_where_filters( + join_type: JoinType, + join_filter: &Option, + table_scan_filters: Vec, + ) -> (Option, Vec) { + if table_scan_filters.is_empty() { + return (join_filter.clone(), vec![]); + } + + if join_type == JoinType::Inner { + // ON and WHERE are equivalent for inner joins; prefer WHERE + // because some dialects reject subqueries inside JOIN ON. + return (join_filter.clone(), table_scan_filters); + } + + // Outer joins: fold table-scan filters into ON to preserve semantics. + let combined = table_scan_filters.into_iter().reduce(|acc, filter| { + Expr::BinaryExpr(BinaryExpr { + left: Box::new(acc), + op: Operator::And, + right: Box::new(filter), + }) + }); + + let on_filter = match (join_filter, combined) { + (Some(jf), Some(c)) => Some(Expr::BinaryExpr(BinaryExpr { + left: Box::new(jf.clone()), + op: Operator::And, + right: Box::new(c), + })), + (Some(jf), None) => Some(jf.clone()), + (None, Some(c)) => Some(c), + (None, None) => None, + }; + + (on_filter, vec![]) + } } impl From for DataFusionError { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 67e67dafbaa64..e30c4264720a8 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -29,7 +29,8 @@ use datafusion_expr::{ ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame, - WindowFunctionDefinition, cast, col, lit, table_scan, wildcard, + WindowFunctionDefinition, cast, col, exists, in_subquery, lit, scalar_subquery, + table_scan, wildcard, }; use datafusion_functions::unicode; use datafusion_functions_aggregate::grouping::grouping_udaf; @@ -1829,7 +1830,7 @@ fn test_join_with_table_scan_filters() -> Result<()> { let sql = plan_to_sql(&join_plan_with_filter)?; assert_snapshot!( sql, - @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"# + @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (age > 10)"# ); let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone()) @@ -1844,7 +1845,7 @@ fn test_join_with_table_scan_filters() -> Result<()> { let sql = plan_to_sql(&join_plan_no_filter)?; assert_snapshot!( sql, - @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"# + @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (age > 10)"# ); let right_plan_with_filter = table_scan_with_filters( @@ -1869,7 +1870,7 @@ fn test_join_with_table_scan_filters() -> Result<()> { let sql = plan_to_sql(&join_plan_multiple_filters)?; assert_snapshot!( sql, - @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"# + @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val') AND (age > 10)"# ); let right_plan_with_filter_schema = table_scan_with_filters( @@ -1899,7 +1900,103 @@ fn test_join_with_table_scan_filters() -> Result<()> { let sql = plan_to_sql(&join_plan_duplicated_filter)?; assert_snapshot!( sql, - @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"# + @r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10) AND (right_table.age < 11)"# + ); + + // Inner join with a scalar subquery in table_scan_filters. The subquery filter should appear in WHERE, not in JOIN ON, + // since dialects like BigQuery reject subqueries in join predicates. + let schema_subquery = Schema::new(vec![Field::new("id", DataType::Utf8, false)]); + let subquery_plan = table_scan(Some("subquery_table"), &schema_subquery, None)? + .aggregate(vec![] as Vec, vec![max(col("subquery_table.id"))])? + .build()?; + let right_plan_with_subquery = table_scan_with_filters( + Some("right_table"), + &schema_right, + None, + vec![col("right_table.id").eq(scalar_subquery(Arc::new(subquery_plan)))], + )? + .build()?; + + let left_plan = + table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?; + + let join_plan_subquery_filter = LogicalPlanBuilder::from(left_plan) + .join( + right_plan_with_subquery, + datafusion_expr::JoinType::Inner, + (vec!["left_table.id"], vec!["right_table.id"]), + None, + )? + .build()?; + + let sql = plan_to_sql(&join_plan_subquery_filter)?; + assert_snapshot!( + sql, + @r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE (right_table.id = (SELECT max(subquery_table.id) FROM subquery_table))"# + ); + + // Inner join with an IN subquery in table_scan_filters. + let subquery_plan_in = table_scan(Some("subquery_table"), &schema_subquery, None)? + .project(vec![col("subquery_table.id")])? + .build()?; + let right_plan_with_in = table_scan_with_filters( + Some("right_table"), + &schema_right, + None, + vec![in_subquery( + col("right_table.id"), + Arc::new(subquery_plan_in), + )], + )? + .build()?; + + let left_plan_in = + table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?; + + let join_plan_in_subquery = LogicalPlanBuilder::from(left_plan_in) + .join( + right_plan_with_in, + datafusion_expr::JoinType::Inner, + (vec!["left_table.id"], vec!["right_table.id"]), + None, + )? + .build()?; + + let sql = plan_to_sql(&join_plan_in_subquery)?; + assert_snapshot!( + sql, + @r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE right_table.id IN (SELECT subquery_table.id FROM subquery_table)"# + ); + + // Inner join with an EXISTS subquery in table_scan_filters. + let subquery_plan_exists = + table_scan(Some("subquery_table"), &schema_subquery, None)? + .filter(col("subquery_table.id").eq(col("right_table.id")))? + .build()?; + let right_plan_with_exists = table_scan_with_filters( + Some("right_table"), + &schema_right, + None, + vec![exists(Arc::new(subquery_plan_exists))], + )? + .build()?; + + let left_plan_exists = + table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?; + + let join_plan_exists = LogicalPlanBuilder::from(left_plan_exists) + .join( + right_plan_with_exists, + datafusion_expr::JoinType::Inner, + (vec!["left_table.id"], vec!["right_table.id"]), + None, + )? + .build()?; + + let sql = plan_to_sql(&join_plan_exists)?; + assert_snapshot!( + sql, + @r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE EXISTS (SELECT * FROM subquery_table WHERE (subquery_table.id = right_table.id))"# ); Ok(())