Skip to content

Commit dc0c21b

Browse files
authored
feat: Pushdown filters through UnionExec nodes (#20145)
## Which issue does this PR close? - Closes #20144 ## Rationale for this change see #20144 ## What changes are included in this PR? This PR impl `handle_child_pushdown_result` for `UnionExec`, for any case, the filter will always pushdown to UnionExec UnionExec needs specialized filter pushdown handling when children have heterogeneous pushdown support. Without this, when some children support pushdown and others don't, the default behavior would leave FilterExec above UnionExec, re-applying filters to outputs of all children—including those that already applied the filters via pushdown. This specialized implementation adds FilterExec only to children that don't support pushdown, avoiding redundant filtering and improving performance. ``` Example: Given Child1 (no pushdown support) and Child2 (has pushdown support) Default behavior: This implementation: FilterExec UnionExec UnionExec FilterExec Child1 Child1 Child2(filter) Child2(filter) ``` ## Are these changes tested? yes, add two test cases ## Are there any user-facing changes?
1 parent aebd100 commit dc0c21b

3 files changed

Lines changed: 209 additions & 2 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() {
18081808
);
18091809
}
18101810

1811+
#[test]
1812+
fn test_filter_pushdown_through_union_mixed_support() {
1813+
// Test case where one child supports filter pushdown and one doesn't
1814+
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
1815+
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
1816+
1817+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
1818+
1819+
let predicate = col_lit_predicate("a", "foo", &schema());
1820+
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
1821+
1822+
insta::assert_snapshot!(
1823+
OptimizationTest::new(plan, FilterPushdown::new(), true),
1824+
@r"
1825+
OptimizationTest:
1826+
input:
1827+
- FilterExec: a@0 = foo
1828+
- UnionExec
1829+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1830+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1831+
output:
1832+
Ok:
1833+
- UnionExec
1834+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
1835+
- FilterExec: a@0 = foo
1836+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1837+
"
1838+
);
1839+
}
1840+
1841+
#[test]
1842+
fn test_filter_pushdown_through_union_does_not_support() {
1843+
// Test case where one child supports filter pushdown and one doesn't
1844+
let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
1845+
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
1846+
1847+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
1848+
1849+
let predicate = col_lit_predicate("a", "foo", &schema());
1850+
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
1851+
1852+
insta::assert_snapshot!(
1853+
OptimizationTest::new(plan, FilterPushdown::new(), true),
1854+
@"
1855+
OptimizationTest:
1856+
input:
1857+
- FilterExec: a@0 = foo
1858+
- UnionExec
1859+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1860+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1861+
output:
1862+
Ok:
1863+
- UnionExec
1864+
- FilterExec: a@0 = foo
1865+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1866+
- FilterExec: a@0 = foo
1867+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
1868+
"
1869+
);
1870+
}
1871+
18111872
/// Schema:
18121873
/// a: String
18131874
/// b: String

datafusion/physical-plan/src/union.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ use crate::execution_plan::{
3636
InvariantLevel, boundedness_from_children, check_default_invariants,
3737
emission_type_from_children,
3838
};
39-
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
39+
use crate::filter::FilterExec;
40+
use crate::filter_pushdown::{
41+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
42+
FilterPushdownPropagation, PushedDown,
43+
};
4044
use crate::metrics::BaselineMetrics;
4145
use crate::projection::{ProjectionExec, make_with_child};
4246
use crate::stream::ObservedStream;
@@ -49,7 +53,9 @@ use datafusion_common::{
4953
Result, assert_or_internal_err, exec_err, internal_datafusion_err,
5054
};
5155
use datafusion_execution::TaskContext;
52-
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, calculate_union};
56+
use datafusion_physical_expr::{
57+
EquivalenceProperties, PhysicalExpr, calculate_union, conjunction,
58+
};
5359

5460
use futures::Stream;
5561
use itertools::Itertools;
@@ -370,6 +376,83 @@ impl ExecutionPlan for UnionExec {
370376
) -> Result<FilterDescription> {
371377
FilterDescription::from_children(parent_filters, &self.children())
372378
}
379+
380+
fn handle_child_pushdown_result(
381+
&self,
382+
phase: FilterPushdownPhase,
383+
child_pushdown_result: ChildPushdownResult,
384+
_config: &ConfigOptions,
385+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
386+
// Pre phase: handle heterogeneous pushdown by wrapping individual
387+
// children with FilterExec and reporting all filters as handled.
388+
// Post phase: use default behavior to let the filter creator decide how to handle
389+
// filters that weren't fully pushed down.
390+
if !matches!(phase, FilterPushdownPhase::Pre) {
391+
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
392+
}
393+
394+
// UnionExec needs specialized filter pushdown handling when children have
395+
// heterogeneous pushdown support. Without this, when some children support
396+
// pushdown and others don't, the default behavior would leave FilterExec
397+
// above UnionExec, re-applying filters to outputs of all children—including
398+
// those that already applied the filters via pushdown. This specialized
399+
// implementation adds FilterExec only to children that don't support
400+
// pushdown, avoiding redundant filtering and improving performance.
401+
//
402+
// Example: Given Child1 (no pushdown support) and Child2 (has pushdown support)
403+
// Default behavior: This implementation:
404+
// FilterExec UnionExec
405+
// UnionExec FilterExec
406+
// Child1 Child1
407+
// Child2(filter) Child2(filter)
408+
409+
// Collect unsupported filters for each child
410+
let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()];
411+
for parent_filter_result in child_pushdown_result.parent_filters.iter() {
412+
for (child_idx, &child_result) in
413+
parent_filter_result.child_results.iter().enumerate()
414+
{
415+
if matches!(child_result, PushedDown::No) {
416+
unsupported_filters_per_child[child_idx]
417+
.push(Arc::clone(&parent_filter_result.filter));
418+
}
419+
}
420+
}
421+
422+
// Wrap children that have unsupported filters with FilterExec
423+
let mut new_children = self.inputs.clone();
424+
for (child_idx, unsupported_filters) in
425+
unsupported_filters_per_child.iter().enumerate()
426+
{
427+
if !unsupported_filters.is_empty() {
428+
let combined_filter = conjunction(unsupported_filters.clone());
429+
new_children[child_idx] = Arc::new(FilterExec::try_new(
430+
combined_filter,
431+
Arc::clone(&self.inputs[child_idx]),
432+
)?);
433+
}
434+
}
435+
436+
// Check if any children were modified
437+
let children_modified = new_children
438+
.iter()
439+
.zip(self.inputs.iter())
440+
.any(|(new, old)| !Arc::ptr_eq(new, old));
441+
442+
let all_filters_pushed =
443+
vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()];
444+
let propagation = if children_modified {
445+
let updated_node = UnionExec::try_new(new_children)?;
446+
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
447+
.with_updated_node(updated_node)
448+
} else {
449+
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
450+
};
451+
452+
// Report all parent filters as supported since we've ensured they're applied
453+
// on all children (either pushed down or via FilterExec)
454+
Ok(propagation)
455+
}
373456
}
374457

375458
/// Combines multiple input streams by interleaving them.

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,3 +674,66 @@ logical_plan
674674
physical_plan
675675
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
676676
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]
677+
678+
###
679+
# Test filter pushdown through UNION with mixed support
680+
# This tests the case where one child supports filter pushdown (parquet) and one doesn't (memory table)
681+
###
682+
683+
# enable filter pushdown
684+
statement ok
685+
set datafusion.execution.parquet.pushdown_filters = true;
686+
687+
statement ok
688+
set datafusion.optimizer.max_passes = 0;
689+
690+
# Create memory table with matching schema (a: VARCHAR, b: BIGINT)
691+
statement ok
692+
CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5);
693+
694+
# Create parquet table with matching schema
695+
statement ok
696+
CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET
697+
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet';
698+
699+
# Query results combining memory table and Parquet with filter
700+
query I rowsort
701+
SELECT b FROM (
702+
SELECT a, b FROM t_union_mem
703+
UNION ALL
704+
SELECT a, b FROM t_union_parquet
705+
) WHERE b > 2;
706+
----
707+
3
708+
4
709+
5
710+
50
711+
712+
# Explain the union query - filter should be pushed to parquet but not memory table
713+
query TT
714+
EXPLAIN SELECT b FROM (
715+
SELECT a, b FROM t_union_mem
716+
UNION ALL
717+
SELECT a, b FROM t_union_parquet
718+
) WHERE b > 2;
719+
----
720+
logical_plan
721+
01)Projection: b
722+
02)--Filter: b > Int64(2)
723+
03)----Union
724+
04)------Projection: t_union_mem.a, t_union_mem.b
725+
05)--------TableScan: t_union_mem
726+
06)------Projection: t_union_parquet.a, t_union_parquet.b
727+
07)--------TableScan: t_union_parquet
728+
physical_plan
729+
01)UnionExec
730+
02)--FilterExec: b@0 > 2
731+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
732+
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]}, projection=[b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
733+
734+
# Clean up union test tables
735+
statement ok
736+
DROP TABLE t_union_mem;
737+
738+
statement ok
739+
DROP TABLE t_union_parquet;

0 commit comments

Comments
 (0)