Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 125 additions & 119 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ fn semi_join_with_join_filter() -> Result<()> {
format!("{plan}"),
@r"
Projection: test.col_utf8
LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32, col_uint32, col_utf8]
RightSemi Join: __correlated_sq_1.col_int32 = test.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32
SubqueryAlias: __correlated_sq_1
SubqueryAlias: t2
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32, col_uint32]
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32, col_uint32, col_utf8]
"
);
Ok(())
Expand Down Expand Up @@ -254,14 +254,14 @@ fn where_exists_distinct() -> Result<()> {
assert_snapshot!(
format!("{plan}"),
@r"
LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32]
RightSemi Join: __correlated_sq_1.col_int32 = test.col_int32
SubqueryAlias: __correlated_sq_1
Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]
SubqueryAlias: t2
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32]
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32]
"

);
Expand Down Expand Up @@ -518,14 +518,14 @@ fn select_correlated_predicate_subquery_with_uppercase_ident() {
assert_snapshot!(
format!("{plan}"),
@r"
LeftSemi Join: test.col_int32 = __correlated_sq_1.COL_INT32
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]
RightSemi Join: __correlated_sq_1.COL_INT32 = test.col_int32
SubqueryAlias: __correlated_sq_1
SubqueryAlias: T1
Projection: test.col_int32 AS COL_INT32
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32]
Filter: test.col_int32 IS NOT NULL
TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]
"
);
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,13 @@ query TT
explain with cte as (select * from person) SELECT * FROM person WHERE EXISTS (SELECT * FROM cte WHERE id = person.id);
----
logical_plan
01)LeftSemi Join: person.id = __correlated_sq_1.id
02)--TableScan: person projection=[id, name]
03)--SubqueryAlias: __correlated_sq_1
04)----SubqueryAlias: cte
05)------TableScan: person projection=[id]
01)RightSemi Join: __correlated_sq_1.id = person.id
02)--SubqueryAlias: __correlated_sq_1
03)----SubqueryAlias: cte
04)------TableScan: person projection=[id]
05)--TableScan: person projection=[id, name]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)]
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
02)--DataSourceExec: partitions=1, partition_sizes=[0]
03)--DataSourceExec: partitions=1, partition_sizes=[0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,12 @@ FROM left_parquet l
WHERE l.id IN (SELECT r.id FROM right_parquet r);
----
logical_plan
01)LeftSemi Join: l.id = __correlated_sq_1.id
02)--SubqueryAlias: l
03)----TableScan: left_parquet projection=[id, data]
04)--SubqueryAlias: __correlated_sq_1
05)----SubqueryAlias: r
06)------TableScan: right_parquet projection=[id]
01)RightSemi Join: __correlated_sq_1.id = l.id
02)--SubqueryAlias: __correlated_sq_1
03)----SubqueryAlias: r
04)------TableScan: right_parquet projection=[id]
05)--SubqueryAlias: l
06)----TableScan: left_parquet projection=[id, data]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
Expand All @@ -295,12 +295,12 @@ FROM right_parquet r
WHERE r.id IN (SELECT l.id FROM left_parquet l);
----
logical_plan
01)LeftSemi Join: r.id = __correlated_sq_1.id
02)--SubqueryAlias: r
03)----TableScan: right_parquet projection=[id, info]
04)--SubqueryAlias: __correlated_sq_1
05)----SubqueryAlias: l
06)------TableScan: left_parquet projection=[id]
01)RightSemi Join: __correlated_sq_1.id = r.id
02)--SubqueryAlias: __correlated_sq_1
03)----SubqueryAlias: l
04)------TableScan: left_parquet projection=[id]
05)--SubqueryAlias: r
06)----TableScan: right_parquet projection=[id, info]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
Expand All @@ -327,12 +327,12 @@ ORDER BY l.data DESC;
----
logical_plan
01)Sort: l.data DESC NULLS FIRST
02)--LeftSemi Join: l.id = __correlated_sq_1.id
03)----SubqueryAlias: l
04)------TableScan: left_parquet projection=[id, data]
05)----SubqueryAlias: __correlated_sq_1
06)------SubqueryAlias: r
07)--------TableScan: right_parquet projection=[id]
02)--RightSemi Join: __correlated_sq_1.id = l.id
03)----SubqueryAlias: __correlated_sq_1
04)------SubqueryAlias: r
05)--------TableScan: right_parquet projection=[id]
06)----SubqueryAlias: l
07)------TableScan: left_parquet projection=[id, data]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
Expand All @@ -351,12 +351,12 @@ ORDER BY l.data DESC;
----
logical_plan
01)Sort: l.data DESC NULLS FIRST
02)--LeftSemi Join: l.id = __correlated_sq_1.id
03)----SubqueryAlias: l
04)------TableScan: left_parquet projection=[id, data]
05)----SubqueryAlias: __correlated_sq_1
06)------SubqueryAlias: r
07)--------TableScan: right_parquet projection=[id]
02)--RightSemi Join: __correlated_sq_1.id = l.id
03)----SubqueryAlias: __correlated_sq_1
04)------SubqueryAlias: r
05)--------TableScan: right_parquet projection=[id]
06)----SubqueryAlias: l
07)------TableScan: left_parquet projection=[id, data]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
Expand Down Expand Up @@ -435,12 +435,12 @@ ORDER BY l.id LIMIT 2;
----
logical_plan
01)Sort: l.id ASC NULLS LAST, fetch=2
02)--LeftSemi Join: l.id = __correlated_sq_1.id
03)----SubqueryAlias: l
04)------TableScan: left_parquet projection=[id, data]
05)----SubqueryAlias: __correlated_sq_1
06)------SubqueryAlias: r
07)--------TableScan: right_parquet projection=[id]
02)--RightSemi Join: __correlated_sq_1.id = l.id
03)----SubqueryAlias: __correlated_sq_1
04)------SubqueryAlias: r
05)--------TableScan: right_parquet projection=[id]
06)----SubqueryAlias: l
07)------TableScan: left_parquet projection=[id, data]
physical_plan
01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
Expand Down
14 changes: 7 additions & 7 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,14 @@ query TT
explain select a from t1 where exists (select count(*) from t2);
----
logical_plan
01)LeftSemi Join:
02)--TableScan: t1 projection=[a]
03)--SubqueryAlias: __correlated_sq_1
04)----EmptyRelation: rows=1
01)RightSemi Join:
02)--SubqueryAlias: __correlated_sq_1
03)----EmptyRelation: rows=1
04)--TableScan: t1 projection=[a]
physical_plan
01)NestedLoopJoinExec: join_type=LeftSemi
02)--DataSourceExec: partitions=1, partition_sizes=[0]
03)--PlaceholderRowExec
01)NestedLoopJoinExec: join_type=RightSemi
02)--PlaceholderRowExec
03)--DataSourceExec: partitions=1, partition_sizes=[0]

statement ok
drop table t1;
Expand Down
21 changes: 15 additions & 6 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1113,14 +1113,23 @@ physical_plan
01)┌───────────────────────────┐
02)│ NestedLoopJoinExec │
03)│ -------------------- ├──────────────┐
04)│ join_type: LeftSemi │ │
04)│ join_type: RightSemi │ │
05)└─────────────┬─────────────┘ │
06)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
07)│ DataSourceExec ││ PlaceholderRowExec │
08)│ -------------------- ││ │
09)│ files: 1 ││ │
10)│ format: csv ││ │
11)└───────────────────────────┘└───────────────────────────┘
07)│ PlaceholderRowExec ││ RepartitionExec │
08)│ ││ -------------------- │
09)│ ││ partition_count(in->out): │
10)│ ││ 1 -> 4 │
11)│ ││ │
12)│ ││ partitioning_scheme: │
13)│ ││ RoundRobinBatch(4) │
14)└───────────────────────────┘└─────────────┬─────────────┘
15)-----------------------------┌─────────────┴─────────────┐
16)-----------------------------│ DataSourceExec │
17)-----------------------------│ -------------------- │
18)-----------------------------│ files: 1 │
19)-----------------------------│ format: csv │
20)-----------------------------└───────────────────────────┘

# Query with cross join.
query TT
Expand Down
10 changes: 5 additions & 5 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5137,10 +5137,10 @@ GROUP BY
logical_plan
01)Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01")) AS ts_chunk, count(keywords_stream.keyword) AS alert_keyword_count
02)--Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"), keywords_stream.ts, TimestampNanosecond(946684800000000000, None)) AS date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))]], aggr=[[count(keywords_stream.keyword)]]
03)----LeftSemi Join: keywords_stream.keyword = __correlated_sq_1.keyword
04)------TableScan: keywords_stream projection=[ts, keyword]
05)------SubqueryAlias: __correlated_sq_1
06)--------TableScan: alert_keywords projection=[keyword]
03)----RightSemi Join: __correlated_sq_1.keyword = keywords_stream.keyword
04)------SubqueryAlias: __correlated_sq_1
05)--------TableScan: alert_keywords projection=[keyword]
06)------TableScan: keywords_stream projection=[ts, keyword]
physical_plan
01)ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))@0 as ts_chunk, count(keywords_stream.keyword)@1 as alert_keyword_count]
02)--AggregateExec: mode=Single, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }, ts@0, 946684800000000000) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))], aggr=[count(keywords_stream.keyword)]
Expand Down Expand Up @@ -5634,7 +5634,7 @@ physical_plan
statement ok
reset datafusion.execution.batch_size;

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/join_limit_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ EXPLAIN SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LI
----
logical_plan
01)Limit: skip=0, fetch=2
02)--LeftSemi Join: t2.x = __correlated_sq_1.a
03)----TableScan: t2 projection=[x]
04)----SubqueryAlias: __correlated_sq_1
05)------TableScan: t1 projection=[a]
02)--RightSemi Join: __correlated_sq_1.a = t2.x
03)----SubqueryAlias: __correlated_sq_1
04)------TableScan: t1 projection=[a]
05)----TableScan: t2 projection=[x]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(x@0, a@0)], fetch=2
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(a@0, x@0)], fetch=2
02)--DataSourceExec: partitions=1, partition_sizes=[1]
03)--DataSourceExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -259,7 +259,7 @@ SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100;

# Config reset

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;
Expand Down
Loading
Loading