Skip to content

Commit d47bd59

Browse files
authored
Support parent dynamic filters for more join types (#20192)
## Which issue does this PR close? Solves the simplest possible subset of #16973 ## Rationale for this change Parent dynamic filters can be assessed for pushdown in much the same way as parent static filters. The essence of it is that you can push a filter into the preserved side if the filter only refers to preserved side columns. This PR makes no attempt to cover other cases (eg non preserved side filters or the self filters that hashjoin creates) ## What changes are included in this PR? 1. Allow a subset of parent dynamic filters for all join types except full depending on lr_is_preserved and which columns they refer to 2. Sql logic tests for the above 3. New helper `ChildFilterDescription::all_unsupported` and `ChildFilterDescription::from_child_with_allowed_columns` 4. `FilterColumnChecker` gets a sibling `FilterColumnIndexChecker` which also needs to look at indices, not just column names, as the same name can be on both sides of a join. We don't use this for other call sites of FilterColumnChecker as we often use the checker before a expr remap (so indices aren't yet reliable anyway) ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 53b0ffb commit d47bd59

5 files changed

Lines changed: 773 additions & 122 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 229 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() {
401401
"
402402
);
403403

404-
// Test left join - filters should NOT be pushed down
404+
// Test left join: filter on preserved (build) side is pushed down,
405+
// filter on non-preserved (probe) side is NOT pushed down.
405406
let join = Arc::new(
406407
HashJoinExec::try_new(
407408
TestScanBuilder::new(Arc::clone(&build_side_schema))
@@ -425,25 +426,30 @@ async fn test_static_filter_pushdown_through_hash_join() {
425426
);
426427

427428
let join_schema = join.schema();
428-
let filter = col_lit_predicate("a", "aa", &join_schema);
429-
let plan =
430-
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;
429+
// Filter on build side column (preserved): should be pushed down
430+
let left_filter = col_lit_predicate("a", "aa", &join_schema);
431+
// Filter on probe side column (not preserved): should NOT be pushed down
432+
let right_filter = col_lit_predicate("e", "ba", &join_schema);
433+
let filter =
434+
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
435+
let plan = Arc::new(FilterExec::try_new(right_filter, filter).unwrap())
436+
as Arc<dyn ExecutionPlan>;
431437

432-
// Test that filters are NOT pushed down for left join
433438
insta::assert_snapshot!(
434439
OptimizationTest::new(plan, FilterPushdown::new(), true),
435440
@r"
436441
OptimizationTest:
437442
input:
438-
- FilterExec: a@0 = aa
439-
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
440-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
441-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
443+
- FilterExec: e@4 = ba
444+
- FilterExec: a@0 = aa
445+
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
446+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
447+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
442448
output:
443449
Ok:
444-
- FilterExec: a@0 = aa
450+
- FilterExec: e@4 = ba
445451
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
446-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
452+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
447453
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
448454
"
449455
);
@@ -1722,6 +1728,218 @@ async fn test_hashjoin_parent_filter_pushdown() {
17221728
);
17231729
}
17241730

1731+
#[test]
1732+
fn test_hashjoin_parent_filter_pushdown_same_column_names() {
1733+
use datafusion_common::JoinType;
1734+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
1735+
1736+
let build_side_schema = Arc::new(Schema::new(vec![
1737+
Field::new("id", DataType::Utf8, false),
1738+
Field::new("build_val", DataType::Utf8, false),
1739+
]));
1740+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
1741+
.with_support(true)
1742+
.build();
1743+
1744+
let probe_side_schema = Arc::new(Schema::new(vec![
1745+
Field::new("id", DataType::Utf8, false),
1746+
Field::new("probe_val", DataType::Utf8, false),
1747+
]));
1748+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
1749+
.with_support(true)
1750+
.build();
1751+
1752+
let on = vec![(
1753+
col("id", &build_side_schema).unwrap(),
1754+
col("id", &probe_side_schema).unwrap(),
1755+
)];
1756+
let join = Arc::new(
1757+
HashJoinExec::try_new(
1758+
build_scan,
1759+
probe_scan,
1760+
on,
1761+
None,
1762+
&JoinType::Inner,
1763+
None,
1764+
PartitionMode::Partitioned,
1765+
datafusion_common::NullEquality::NullEqualsNothing,
1766+
false,
1767+
)
1768+
.unwrap(),
1769+
);
1770+
1771+
let join_schema = join.schema();
1772+
1773+
let build_id_filter = col_lit_predicate("id", "aa", &join_schema);
1774+
let probe_val_filter = col_lit_predicate("probe_val", "x", &join_schema);
1775+
1776+
let filter =
1777+
Arc::new(FilterExec::try_new(build_id_filter, Arc::clone(&join) as _).unwrap());
1778+
let plan = Arc::new(FilterExec::try_new(probe_val_filter, filter).unwrap())
1779+
as Arc<dyn ExecutionPlan>;
1780+
1781+
insta::assert_snapshot!(
1782+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
1783+
@r"
1784+
OptimizationTest:
1785+
input:
1786+
- FilterExec: probe_val@3 = x
1787+
- FilterExec: id@0 = aa
1788+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
1789+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true
1790+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true
1791+
output:
1792+
Ok:
1793+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
1794+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true, predicate=id@0 = aa
1795+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true, predicate=probe_val@1 = x
1796+
"
1797+
);
1798+
}
1799+
1800+
#[test]
1801+
fn test_hashjoin_parent_filter_pushdown_mark_join() {
1802+
use datafusion_common::JoinType;
1803+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
1804+
1805+
let left_schema = Arc::new(Schema::new(vec![
1806+
Field::new("id", DataType::Utf8, false),
1807+
Field::new("val", DataType::Utf8, false),
1808+
]));
1809+
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
1810+
.with_support(true)
1811+
.build();
1812+
1813+
let right_schema =
1814+
Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
1815+
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
1816+
.with_support(true)
1817+
.build();
1818+
1819+
let on = vec![(
1820+
col("id", &left_schema).unwrap(),
1821+
col("id", &right_schema).unwrap(),
1822+
)];
1823+
let join = Arc::new(
1824+
HashJoinExec::try_new(
1825+
left_scan,
1826+
right_scan,
1827+
on,
1828+
None,
1829+
&JoinType::LeftMark,
1830+
None,
1831+
PartitionMode::Partitioned,
1832+
datafusion_common::NullEquality::NullEqualsNothing,
1833+
false,
1834+
)
1835+
.unwrap(),
1836+
);
1837+
1838+
let join_schema = join.schema();
1839+
1840+
let left_filter = col_lit_predicate("val", "x", &join_schema);
1841+
let mark_filter = col_lit_predicate("mark", true, &join_schema);
1842+
1843+
let filter =
1844+
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
1845+
let plan = Arc::new(FilterExec::try_new(mark_filter, filter).unwrap())
1846+
as Arc<dyn ExecutionPlan>;
1847+
1848+
insta::assert_snapshot!(
1849+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
1850+
@r"
1851+
OptimizationTest:
1852+
input:
1853+
- FilterExec: mark@2 = true
1854+
- FilterExec: val@1 = x
1855+
- HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)]
1856+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true
1857+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
1858+
output:
1859+
Ok:
1860+
- FilterExec: mark@2 = true
1861+
- HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)]
1862+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true, predicate=val@1 = x
1863+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
1864+
"
1865+
);
1866+
}
1867+
1868+
/// Test that filters on join key columns are pushed to both sides of semi/anti joins.
1869+
/// For LeftSemi/LeftAnti, the output only contains left columns, but filters on
1870+
/// join key columns can also be pushed to the right (non-preserved) side because
1871+
/// the equijoin condition guarantees the key values match.
1872+
#[test]
1873+
fn test_hashjoin_parent_filter_pushdown_semi_anti_join() {
1874+
use datafusion_common::JoinType;
1875+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
1876+
1877+
let left_schema = Arc::new(Schema::new(vec![
1878+
Field::new("k", DataType::Utf8, false),
1879+
Field::new("v", DataType::Utf8, false),
1880+
]));
1881+
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
1882+
.with_support(true)
1883+
.build();
1884+
1885+
let right_schema = Arc::new(Schema::new(vec![
1886+
Field::new("k", DataType::Utf8, false),
1887+
Field::new("w", DataType::Utf8, false),
1888+
]));
1889+
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
1890+
.with_support(true)
1891+
.build();
1892+
1893+
let on = vec![(
1894+
col("k", &left_schema).unwrap(),
1895+
col("k", &right_schema).unwrap(),
1896+
)];
1897+
1898+
let join = Arc::new(
1899+
HashJoinExec::try_new(
1900+
left_scan,
1901+
right_scan,
1902+
on,
1903+
None,
1904+
&JoinType::LeftSemi,
1905+
None,
1906+
PartitionMode::Partitioned,
1907+
datafusion_common::NullEquality::NullEqualsNothing,
1908+
false,
1909+
)
1910+
.unwrap(),
1911+
);
1912+
1913+
let join_schema = join.schema();
1914+
// Filter on join key column: k = 'x' — should be pushed to BOTH sides
1915+
let key_filter = col_lit_predicate("k", "x", &join_schema);
1916+
// Filter on non-key column: v = 'y' — should only be pushed to the left side
1917+
let val_filter = col_lit_predicate("v", "y", &join_schema);
1918+
1919+
let filter =
1920+
Arc::new(FilterExec::try_new(key_filter, Arc::clone(&join) as _).unwrap());
1921+
let plan = Arc::new(FilterExec::try_new(val_filter, filter).unwrap())
1922+
as Arc<dyn ExecutionPlan>;
1923+
1924+
insta::assert_snapshot!(
1925+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
1926+
@r"
1927+
OptimizationTest:
1928+
input:
1929+
- FilterExec: v@1 = y
1930+
- FilterExec: k@0 = x
1931+
- HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)]
1932+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true
1933+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true
1934+
output:
1935+
Ok:
1936+
- HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)]
1937+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true, predicate=k@0 = x AND v@1 = y
1938+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true, predicate=k@0 = x
1939+
"
1940+
);
1941+
}
1942+
17251943
/// Integration test for dynamic filter pushdown with TopK.
17261944
/// We use an integration test because there are complex interactions in the optimizer rules
17271945
/// that the unit tests applying a single optimizer rule do not cover.

0 commit comments

Comments
 (0)