Skip to content

Commit 73c992f

Browse files
committed
fix(sql): fix a bug when planning semi- or antijoins
Currently, the `exclude_using_columns` called from `expand_wildcard` doesn't consider the filtering semantics of semi- and antijoins when expanding wildcards on top of joins defined via `USING(<columns>)` syntax. From each set of columns equated by a `USING(<column>)` expression, the code currently (1) sorts the set entries, and (2) retains only the first entry from each set. Because of that, the columns surviving the `exclude_using_columns` call might be wrongly chosen from the filtering side if the table qualifier from that side is lexicographically before the filtered side qualifier. For example, given this schema of two identical tables: ```sql create table s(x1 int, x2 int, x3 int); create table t(x1 int, x2 int, x3 int); ``` One would expect that the schema of queries where the `s` and `t` names are swapped will be identical. However, currently this is not the case: ```sql -- Q1 schema: x1 int, x2 int, x3 int (because s < t) select * from s left semi join t using (x1); -- Q2 schema: x2 int, x3 int (because t < s) select * from t left semi join s using (x1); ``` This commit fixes the issue and adds some regression tests.
1 parent 6412c3a commit 73c992f

2 files changed

Lines changed: 97 additions & 23 deletions

File tree

datafusion/expr/src/utils.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -382,30 +382,39 @@ fn get_exprs_except_skipped(
382382
}
383383
}
384384

385-
/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice
386-
/// (once for each join side), but an unqualified wildcard should include it only once.
387-
/// This function returns the columns that should be excluded.
385+
/// When a JOIN has a USING clause, the join columns appear in the output
386+
/// schema once per side (for inner/outer joins) or once total (for semi/anti
387+
/// joins). An unqualified wildcard should include each USING column only once.
388+
/// This function returns the duplicate columns that should be excluded.
388389
fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
389-
let using_columns = plan.using_columns()?;
390-
let excluded = using_columns
391-
.into_iter()
392-
// For each USING JOIN condition, only expand to one of each join column in projection
393-
.flat_map(|cols| {
394-
let mut cols = cols.into_iter().collect::<Vec<_>>();
395-
// sort join columns to make sure we consistently keep the same
396-
// qualified column
397-
cols.sort();
398-
let mut out_column_names: HashSet<String> = HashSet::new();
399-
cols.into_iter().filter_map(move |c| {
400-
if out_column_names.contains(&c.name) {
401-
Some(c)
402-
} else {
403-
out_column_names.insert(c.name);
404-
None
405-
}
406-
})
407-
})
408-
.collect::<HashSet<_>>();
390+
let output_columns: HashSet<_> = plan.schema().columns().iter().cloned().collect();
391+
let mut excluded = HashSet::new();
392+
for cols in plan.using_columns()? {
393+
// `using_columns()` returns join columns from both sides regardless of
394+
// the join type. For semi/anti joins, only one side's columns appear in
395+
// the output schema. Filter to output columns so that columns from the
396+
// non-output side don't participate in the deduplication process below
397+
// and displace real output columns.
398+
let mut cols: Vec<_> = cols
399+
.into_iter()
400+
.filter(|c| output_columns.contains(c))
401+
.collect();
402+
403+
// Sort so we keep the same qualified column, regardless of HashSet
404+
// iteration order.
405+
cols.sort();
406+
407+
// Keep only one column per name from the columns set, adding any
408+
// duplicates to the excluded set.
409+
let mut seen_names = HashSet::new();
410+
for col in cols {
411+
if seen_names.contains(col.name.as_str()) {
412+
excluded.insert(col); // exclude columns with already seen name
413+
} else {
414+
seen_names.insert(col.name.clone()); // mark column name as seen
415+
}
416+
}
417+
}
409418
Ok(excluded)
410419
}
411420

datafusion/sql/tests/sql_integration.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5004,6 +5004,71 @@ fn test_using_join_wildcard_schema() {
50045004
);
50055005
}
50065006

5007+
#[test]
5008+
fn test_using_join_wildcard_schema_semi_anti() {
5009+
let s_columns = &["s.x1", "s.x2", "s.x3"];
5010+
let t_columns = &["t.x1", "t.x2", "t.x3"];
5011+
5012+
let sql = "WITH
5013+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5014+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5015+
SELECT * FROM s LEFT SEMI JOIN t USING (x1)";
5016+
let plan = logical_plan(sql).unwrap();
5017+
assert_eq!(plan.schema().field_names(), s_columns);
5018+
5019+
let sql = "WITH
5020+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5021+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5022+
SELECT * FROM t RIGHT SEMI JOIN s USING (x1)";
5023+
let plan = logical_plan(sql).unwrap();
5024+
assert_eq!(plan.schema().field_names(), s_columns);
5025+
5026+
let sql = "WITH
5027+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5028+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5029+
SELECT * FROM s LEFT ANTI JOIN t USING (x1)";
5030+
let plan = logical_plan(sql).unwrap();
5031+
assert_eq!(plan.schema().field_names(), s_columns);
5032+
5033+
let sql = "WITH
5034+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5035+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5036+
SELECT * FROM t RIGHT ANTI JOIN s USING (x1)";
5037+
let plan = logical_plan(sql).unwrap();
5038+
assert_eq!(plan.schema().field_names(), s_columns);
5039+
5040+
// Same as above, but with swapped s and t sides.
5041+
// Tests the issue fixed with #20990.
5042+
5043+
let sql = "WITH
5044+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5045+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5046+
SELECT * FROM t LEFT SEMI JOIN s USING (x1)";
5047+
let plan = logical_plan(sql).unwrap();
5048+
assert_eq!(plan.schema().field_names(), t_columns);
5049+
5050+
let sql = "WITH
5051+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5052+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5053+
SELECT * FROM s RIGHT SEMI JOIN t USING (x1)";
5054+
let plan = logical_plan(sql).unwrap();
5055+
assert_eq!(plan.schema().field_names(), t_columns);
5056+
5057+
let sql = "WITH
5058+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5059+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5060+
SELECT * FROM t LEFT ANTI JOIN s USING (x1)";
5061+
let plan = logical_plan(sql).unwrap();
5062+
assert_eq!(plan.schema().field_names(), t_columns);
5063+
5064+
let sql = "WITH
5065+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5066+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5067+
SELECT * FROM s RIGHT ANTI JOIN t USING (x1)";
5068+
let plan = logical_plan(sql).unwrap();
5069+
assert_eq!(plan.schema().field_names(), t_columns);
5070+
}
5071+
50075072
#[test]
50085073
fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation()
50095074
{

0 commit comments

Comments
 (0)