Skip to content

Commit 90a8117

Browse files
authored
feat: Add support for LEFT JOIN LATERAL (#21352)
## Which issue does this PR close? - Closes #21199. ## Rationale for this change This PR adds support for LEFT join semantics for lateral joins. This is a bit tricky because of how it interacts with compensation for the "count bug". This might be easiest to illustrate with an example; consider this query (Q1): ```sql SELECT t1.id, sub.cnt FROM t1 LEFT JOIN LATERAL ( SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id ) sub ON sub.cnt > 0 ORDER BY t1.id; ``` The initial decorrelation (Q2) is ```sql SELECT t1.id, sub.cnt FROM t1 LEFT JOIN ( SELECT count(*) AS cnt, t2.t1_id, TRUE AS __always_true FROM t2 GROUP BY t2.t1_id ) sub ON t1.id = sub.t1_id ``` Ignoring the user's original `ON` clause for now. This initial query is wrong, because `t1` rows that don't have a match in `t2` will get all-`NULL` values, not `0` for `count(*)` of an empty set. This is the "count bug", and we compensate for that by checking for rows when `__always_true` is `NULL`, and replacing the agg value with the default for that agg (Q3): ```sql SELECT t1.id, CASE WHEN sub.__always_true IS NULL THEN 0 ELSE sub.cnt END AS cnt FROM ( /* Q2 */ ) ``` Now we just need to handle the user's original `ON` clause. We can't add this to the rewritten `ON` clause in Q1, because we *don't* want the count-bug compensation to fire. But we also can't just add it to the `WHERE` clause, because we need left join semantics. So we can instead wrap another `CASE` that re-checks the `ON` condition and substitutes `NULL` for every right-side column: ```sql SELECT t1.id, CASE WHEN (cnt > 0) IS NOT TRUE THEN NULL ELSE cnt END AS cnt FROM ( /* Q3 */ ) ``` ## What changes are included in this PR? * Implement lateral left join rewrite as described above * Update expected tests and add more test cases * Update documentation ## Are these changes tested? Yes. All new test queries were also run against DuckDB to confirm that both systems produce the same results. ## Are there any user-facing changes? Support for a new feature.
1 parent 9c0edcc commit 90a8117

4 files changed

Lines changed: 458 additions & 56 deletions

File tree

datafusion/optimizer/src/decorrelate_lateral_join.rs

Lines changed: 126 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_expr::{Expr, Join, expr};
2828
use datafusion_common::tree_node::{
2929
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
3030
};
31-
use datafusion_common::{Column, DFSchema, Result, TableReference};
31+
use datafusion_common::{Column, DFSchema, Result, ScalarValue, TableReference};
3232
use datafusion_expr::logical_plan::{JoinType, Subquery};
3333
use datafusion_expr::utils::conjunction;
3434
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, SubqueryAlias};
@@ -71,14 +71,13 @@ impl OptimizerRule for DecorrelateLateralJoin {
7171
}
7272
}
7373

74-
// Build the decorrelated join based on the original lateral join query. For
75-
// now, we only support cross/inner lateral joins.
74+
// Build the decorrelated join based on the original lateral join query.
75+
// Supports INNER and LEFT lateral joins.
7676
fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
77-
// TODO: Support outer joins
78-
// <https://github.com/apache/datafusion/issues/21199>
79-
if join.join_type != JoinType::Inner {
77+
if !matches!(join.join_type, JoinType::Inner | JoinType::Left) {
8078
return Ok(Transformed::no(LogicalPlan::Join(join)));
8179
}
80+
let original_join_type = join.join_type;
8281

8382
// The right side is wrapped in a Subquery node when it contains outer
8483
// references. Quickly skip joins that don't have this structure.
@@ -106,7 +105,7 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
106105
let original_join_filter = join.filter.clone();
107106

108107
// Walk the subquery plan bottom-up, extracting correlated filter
109-
// predicates into join conditions and converting scalar aggregates
108+
// predicates into join conditions and converting ungrouped aggregates
110109
// into group-by aggregates keyed on the correlation columns.
111110
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
112111
let rewritten_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?;
@@ -120,12 +119,9 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
120119
return Ok(Transformed::no(LogicalPlan::Join(join)));
121120
}
122121

123-
// We apply the correlation predicates (extracted from the subquery's WHERE)
124-
// as the ON clause of the rewritten join. The original ON clause is applied
125-
// as a post-join predicate. Semantically, this is important when the join
126-
// is rewritten as a left join; we only want outer join semantics for the
127-
// correlation predicates (which is required for "count bug" handling), not
128-
// the original join predicates.
122+
// The correlation predicates (extracted from the subquery's WHERE) become
123+
// the rewritten join's ON clause. See below for discussion of how the
124+
// user's original ON clause is handled.
129125
let correlation_filter = conjunction(pull_up.join_filters);
130126

131127
// Look up each aggregate's default value on empty input (e.g., COUNT → 0,
@@ -157,23 +153,85 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
157153
(rewritten_subquery, correlation_filter, original_join_filter)
158154
};
159155

160-
// Use a left join when a scalar aggregation was pulled up (preserves
161-
// outer rows with no matches), otherwise keep inner join.
162-
// SELECT * FROM t0, LATERAL (SELECT sum(v1) FROM t1 WHERE t0.v0 = t1.v0); → left join
163-
// SELECT * FROM t0, LATERAL (SELECT * FROM t1 WHERE t0.v0 = t1.v0); → inner join
164-
let join_type = if pull_up.pulled_up_scalar_agg {
165-
JoinType::Left
166-
} else {
167-
JoinType::Inner
168-
};
156+
// For LEFT lateral joins, verify that all column references in the
157+
// correlation filter are resolvable within the join's left and right
158+
// schemas. If the lateral subquery references columns from an outer scope,
159+
// the extracted filter will contain unresolvable columns and we must skip
160+
// decorrelation.
161+
//
162+
// INNER lateral joins do not need this check: later optimizer passes
163+
// (filter pushdown, join reordering) can restructure the plan to resolve
164+
// cross-scope references. LEFT joins cannot be freely reordered.
165+
if original_join_type == JoinType::Left
166+
&& let Some(ref filter) = correlation_filter
167+
{
168+
let left_schema = join.left.schema();
169+
let right_schema = right_plan.schema();
170+
let has_outer_scope_refs = filter
171+
.column_refs()
172+
.iter()
173+
.any(|col| !left_schema.has_column(col) && !right_schema.has_column(col));
174+
if has_outer_scope_refs {
175+
return Ok(Transformed::no(LogicalPlan::Join(join)));
176+
}
177+
}
178+
179+
// Use a left join when the user wrote LEFT LATERAL or when a scalar
180+
// aggregation was pulled up (preserves outer rows with no matches).
181+
let join_type =
182+
if original_join_type == JoinType::Left || pull_up.pulled_up_scalar_agg {
183+
JoinType::Left
184+
} else {
185+
JoinType::Inner
186+
};
187+
188+
// The correlation predicates (extracted from the subquery's WHERE) are
189+
// turned into the rewritten join's ON clause. There are three cases that
190+
// determine how the user's original ON clause is handled:
191+
//
192+
// - INNER lateral: user ON clause becomes a post-join filter. This restores
193+
// inner-join semantics if the join is upgraded to LEFT for count-bug
194+
// handling.
195+
//
196+
// - LEFT lateral with grouped (or no) agg: user ON clause is merged into
197+
// the rewritten ON clause, alongside the correlation predicates. LEFT
198+
// join semantics correctly preserve unmatched rows with NULLs.
199+
//
200+
// - LEFT lateral with an ungrouped aggregate (which decorrelation converts
201+
// to a group-by keyed on the correlation columns): user ON clause cannot
202+
// be placed in the join condition (it would conflict with count-bug
203+
// compensation) or as a post-join filter (that would remove
204+
// left-preserved rows). Instead, a projection is added after count-bug
205+
// compensation that replaces each right-side column with NULL when the ON
206+
// condition is not satisfied:
207+
//
208+
// CASE WHEN (on_cond) IS NOT TRUE THEN NULL ELSE <col> END
209+
//
210+
// This simulates LEFT JOIN semantics for the user's ON clause without
211+
// interfering with count-bug compensation.
212+
let (join_filter, post_join_filter, on_condition_for_projection) =
213+
if original_join_type == JoinType::Left {
214+
if pull_up.pulled_up_scalar_agg {
215+
(correlation_filter, None, original_join_filter)
216+
} else {
217+
let combined = conjunction(
218+
correlation_filter.into_iter().chain(original_join_filter),
219+
);
220+
(combined, None, None)
221+
}
222+
} else {
223+
(correlation_filter, original_join_filter, None)
224+
};
225+
169226
let left_field_count = join.left.schema().fields().len();
170227
let new_plan = LogicalPlanBuilder::from(join.left)
171-
.join_on(right_plan, join_type, correlation_filter)?
228+
.join_on(right_plan, join_type, join_filter)?
172229
.build()?;
173230

174-
// Handle the count bug: after a left join, unmatched outer rows get NULLs
175-
// for all right-side columns. But COUNT(*) over an empty group should
176-
// return 0, not NULL. Add a projection that wraps affected expressions:
231+
// Handle the count bug: in the rewritten left join, unmatched outer
232+
// rows get NULLs for all right-side columns. But some aggregates
233+
// have non-NULL defaults on empty input (e.g., COUNT returns 0, not
234+
// NULL). Add a projection that wraps those columns:
177235
// CASE WHEN __always_true IS NULL THEN <default> ELSE <column> END
178236
let new_plan = if let Some(expr_map) = collected_count_expr_map {
179237
let join_schema = new_plan.schema();
@@ -202,12 +260,7 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
202260
)],
203261
else_expr: Some(Box::new(col)),
204262
});
205-
proj_exprs.push(Expr::Alias(expr::Alias {
206-
expr: Box::new(case_expr),
207-
relation: qualifier.cloned(),
208-
name: name.to_string(),
209-
metadata: None,
210-
}));
263+
proj_exprs.push(case_expr.alias_qualified(qualifier.cloned(), name));
211264
continue;
212265
}
213266
proj_exprs.push(col);
@@ -220,8 +273,47 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
220273
new_plan
221274
};
222275

223-
// Apply the original ON clause as a post-join filter.
224-
let new_plan = if let Some(on_filter) = original_join_filter {
276+
// For LEFT lateral joins with an ungrouped aggregate, simulate LEFT JOIN
277+
// semantics for the user's ON clause by adding a projection that replaces
278+
// right-side columns with NULL when the ON condition is false (see
279+
// commentary above).
280+
//
281+
// Note: the ON condition expression is duplicated per column, so this
282+
// assumes it is deterministic.
283+
let new_plan = if let Some(on_cond) = on_condition_for_projection {
284+
let schema = Arc::clone(new_plan.schema());
285+
let mut proj_exprs: Vec<Expr> = vec![];
286+
287+
for (i, (qualifier, field)) in schema.iter().enumerate() {
288+
let col = Expr::Column(Column::new(qualifier.cloned(), field.name()));
289+
290+
if i < left_field_count {
291+
proj_exprs.push(col);
292+
continue;
293+
}
294+
295+
let typed_null =
296+
Expr::Literal(ScalarValue::try_from(field.data_type())?, None);
297+
let case_expr = Expr::Case(expr::Case {
298+
expr: None,
299+
when_then_expr: vec![(
300+
Box::new(Expr::IsNotTrue(Box::new(on_cond.clone()))),
301+
Box::new(typed_null),
302+
)],
303+
else_expr: Some(Box::new(col)),
304+
});
305+
proj_exprs.push(case_expr.alias_qualified(qualifier.cloned(), field.name()));
306+
}
307+
308+
LogicalPlanBuilder::from(new_plan)
309+
.project(proj_exprs)?
310+
.build()?
311+
} else {
312+
new_plan
313+
};
314+
315+
// Apply the original ON clause as a post-join filter (INNER lateral only).
316+
let new_plan = if let Some(on_filter) = post_join_filter {
225317
LogicalPlanBuilder::from(new_plan)
226318
.filter(on_filter)?
227319
.build()?

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4687,14 +4687,22 @@ query TT
46874687
explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true);
46884688
----
46894689
logical_plan
4690-
01)Left Join:
4691-
02)--TableScan: j1 projection=[j1_string]
4692-
03)--SubqueryAlias: j2
4693-
04)----Projection: j2.j2_string
4694-
05)------Subquery:
4695-
06)--------Filter: outer_ref(j1.j1_id) < j2.j2_id
4696-
07)----------TableScan: j2 projection=[j2_string, j2_id]
4697-
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" })
4690+
01)Projection: j1.j1_string, j2.j2_string
4691+
02)--Left Join: Filter: j1.j1_id < j2.j2_id
4692+
03)----TableScan: j1 projection=[j1_string, j1_id]
4693+
04)----SubqueryAlias: j2
4694+
05)------TableScan: j2 projection=[j2_string, j2_id]
4695+
physical_plan
4696+
01)NestedLoopJoinExec: join_type=Left, filter=j1_id@0 < j2_id@1, projection=[j1_string@0, j2_string@2]
4697+
02)--DataSourceExec: partitions=1, partition_sizes=[0]
4698+
03)--DataSourceExec: partitions=1, partition_sizes=[0]
4699+
4700+
# INNER lateral with multi-scope correlation: the subquery references
4701+
# j1 (grandparent scope) and j2 (parent scope). The optimizer
4702+
# restructures this into a valid plan via join reordering.
4703+
query TITITI
4704+
SELECT * FROM j1, (j2 CROSS JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3_lat);
4705+
----
46984706

46994707
query TT
47004708
explain SELECT * FROM j1, (j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true));

0 commit comments

Comments
 (0)