Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 55 additions & 26 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,32 +863,15 @@ impl Unparser<'_> {
&mut right_relation,
)?;

let join_filters = if table_scan_filters.is_empty() {
join.filter.clone()
} else {
// Combine `table_scan_filters` into a single filter using `AND`
let Some(combined_filters) =
table_scan_filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
})
else {
return internal_err!("Failed to combine TableScan filters");
};

// Combine `join.filter` with `combined_filters` using `AND`
match &join.filter {
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(filter.clone()),
op: Operator::And,
right: Box::new(combined_filters),
})),
None => Some(combined_filters),
}
};
let (join_filters, where_filters) = Self::split_join_on_and_where_filters(
join.join_type,
&join.filter,
table_scan_filters,
);
for filter in where_filters {
let filter_expr = self.expr_to_sql(&filter)?;
select.selection(Some(filter_expr));
}

let join_constraint = self.join_constraint_to_sql(
join.join_constraint,
Expand Down Expand Up @@ -1936,6 +1919,52 @@ impl Unparser<'_> {
vec![Expr::Literal(ScalarValue::Int64(Some(1)), None)]
}
}

/// Decides where extracted table-scan filters belong in the unparsed SQL:
/// in the `JOIN ON` clause or in `WHERE`.
///
/// For inner joins the two are semantically equivalent, so filters go to
/// `WHERE` (some dialects reject subqueries inside `JOIN ON`).
/// For outer joins the filters are AND-folded into `ON` to preserve correctness.
///
/// Returns `(on_filter, where_filters)`.
fn split_join_on_and_where_filters(
join_type: JoinType,
join_filter: &Option<Expr>,
table_scan_filters: Vec<Expr>,
) -> (Option<Expr>, Vec<Expr>) {
if table_scan_filters.is_empty() {
return (join_filter.clone(), vec![]);
}

if join_type == JoinType::Inner {
// ON and WHERE are equivalent for inner joins; prefer WHERE
// because some dialects reject subqueries inside JOIN ON.
return (join_filter.clone(), table_scan_filters);
}

// Outer joins: fold table-scan filters into ON to preserve semantics.
let combined = table_scan_filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
});

let on_filter = match (join_filter, combined) {
(Some(jf), Some(c)) => Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(jf.clone()),
op: Operator::And,
right: Box::new(c),
})),
(Some(jf), None) => Some(jf.clone()),
(None, Some(c)) => Some(c),
(None, None) => None,
};

(on_filter, vec![])
}
}

impl From<BuilderError> for DataFusionError {
Expand Down
107 changes: 102 additions & 5 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use datafusion_expr::{
ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder,
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union,
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame,
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
WindowFunctionDefinition, cast, col, exists, in_subquery, lit, scalar_subquery,
table_scan, wildcard,
};
use datafusion_functions::unicode;
use datafusion_functions_aggregate::grouping::grouping_udaf;
Expand Down Expand Up @@ -1829,7 +1830,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_with_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
);

let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone())
Expand All @@ -1844,7 +1845,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_no_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
);

let right_plan_with_filter = table_scan_with_filters(
Expand All @@ -1869,7 +1870,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_multiple_filters)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val') AND (age > 10)"#
);

let right_plan_with_filter_schema = table_scan_with_filters(
Expand Down Expand Up @@ -1899,7 +1900,103 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_duplicated_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10) AND (right_table.age < 11)"#
);

// Inner join with a scalar subquery in table_scan_filters. The subquery filter should appear in WHERE, not in JOIN ON,
Comment thread
sgrebnov marked this conversation as resolved.
// since dialects like BigQuery reject subqueries in join predicates.
let schema_subquery = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
let subquery_plan = table_scan(Some("subquery_table"), &schema_subquery, None)?
.aggregate(vec![] as Vec<Expr>, vec![max(col("subquery_table.id"))])?
.build()?;
let right_plan_with_subquery = table_scan_with_filters(
Some("right_table"),
&schema_right,
None,
vec![col("right_table.id").eq(scalar_subquery(Arc::new(subquery_plan)))],
)?
.build()?;

let left_plan =
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;

let join_plan_subquery_filter = LogicalPlanBuilder::from(left_plan)
.join(
right_plan_with_subquery,
datafusion_expr::JoinType::Inner,
(vec!["left_table.id"], vec!["right_table.id"]),
None,
)?
.build()?;

let sql = plan_to_sql(&join_plan_subquery_filter)?;
assert_snapshot!(
sql,
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE (right_table.id = (SELECT max(subquery_table.id) FROM subquery_table))"#
);

// Inner join with an IN subquery in table_scan_filters.
let subquery_plan_in = table_scan(Some("subquery_table"), &schema_subquery, None)?
.project(vec![col("subquery_table.id")])?
.build()?;
let right_plan_with_in = table_scan_with_filters(
Some("right_table"),
&schema_right,
None,
vec![in_subquery(
col("right_table.id"),
Arc::new(subquery_plan_in),
)],
)?
.build()?;

let left_plan_in =
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;

let join_plan_in_subquery = LogicalPlanBuilder::from(left_plan_in)
.join(
right_plan_with_in,
datafusion_expr::JoinType::Inner,
(vec!["left_table.id"], vec!["right_table.id"]),
None,
)?
.build()?;

let sql = plan_to_sql(&join_plan_in_subquery)?;
assert_snapshot!(
sql,
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE right_table.id IN (SELECT subquery_table.id FROM subquery_table)"#
);

// Inner join with an EXISTS subquery in table_scan_filters.
let subquery_plan_exists =
table_scan(Some("subquery_table"), &schema_subquery, None)?
.filter(col("subquery_table.id").eq(col("right_table.id")))?
.build()?;
let right_plan_with_exists = table_scan_with_filters(
Some("right_table"),
&schema_right,
None,
vec![exists(Arc::new(subquery_plan_exists))],
)?
.build()?;

let left_plan_exists =
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;

let join_plan_exists = LogicalPlanBuilder::from(left_plan_exists)
.join(
right_plan_with_exists,
datafusion_expr::JoinType::Inner,
(vec!["left_table.id"], vec!["right_table.id"]),
None,
)?
.build()?;

let sql = plan_to_sql(&join_plan_exists)?;
assert_snapshot!(
sql,
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE EXISTS (SELECT * FROM subquery_table WHERE (subquery_table.id = right_table.id))"#
);

Ok(())
Expand Down
Loading