Skip to content

Commit bf17571

Browse files
committed
Add configurable UNION DISTINCT to FILTER rewrite optimization
1 parent 8a45d02 commit bf17571

11 files changed

Lines changed: 949 additions & 0 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,13 @@ config_namespace! {
13481348
/// closer to the leaf table scans, and push those projections down
13491349
/// towards the leaf nodes.
13501350
pub enable_leaf_expression_pushdown: bool, default = true
1351+
1352+
/// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that
1353+
/// read from the same source and differ only by filter predicates into a single branch
1354+
/// with a combined filter. This optimization is conservative and only applies when the
1355+
/// branches share the same source and compatible wrapper nodes such as identical
1356+
/// projections or aliases.
1357+
pub enable_unions_to_filter: bool, default = false
13511358
}
13521359
}
13531360

datafusion/optimizer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,7 @@ harness = false
8383
[[bench]]
8484
name = "optimize_projections"
8585
harness = false
86+
87+
[[bench]]
88+
name = "unions_to_filter"
89+
harness = false
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Microbenchmarks for the [`UnionsToFilter`] optimizer rule.
19+
//!
20+
//! Three scenarios are covered:
21+
//!
22+
//! 1. **merge** – N branches over the *same* table, each with a simple
23+
//! equality filter. All branches should be merged into a single
24+
//! `DISTINCT(Filter(OR …))` plan.
25+
//!
26+
//! 2. **no_merge** – N branches over *different* tables. The rule must
27+
//! recognise that no merge is possible and leave the plan unchanged.
28+
//! This exercises the "cold path" without any rewrite work.
29+
//!
30+
//! 3. **merge_with_projection** – N branches over the same table but each
31+
//! branch wraps the filter in a `Projection`. This exercises the wrapper-
32+
//! peeling and re-wrapping paths in addition to the core merge logic.
33+
//!
34+
//! To generate a flamegraph (requires `cargo-flamegraph`):
35+
//! ```text
36+
//! cargo flamegraph -p datafusion-optimizer --bench unions_to_filter \
37+
//! --flamechart --root --profile profiling --freq 1000 -- --bench
38+
//! ```
39+
40+
use arrow::datatypes::{DataType, Field, Schema};
41+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
42+
use datafusion_common::config::ConfigOptions;
43+
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan};
44+
use datafusion_expr::{col, lit};
45+
use datafusion_optimizer::OptimizerContext;
46+
use datafusion_optimizer::unions_to_filter::UnionsToFilter;
47+
use datafusion_optimizer::{Optimizer, OptimizerRule};
48+
use std::hint::black_box;
49+
use std::sync::Arc;
50+
51+
// ---------------------------------------------------------------------------
52+
// Helpers
53+
// ---------------------------------------------------------------------------
54+
55+
/// Build a three-column table scan for `name`.
56+
fn scan(name: &str) -> LogicalPlan {
57+
let schema = Schema::new(vec![
58+
Field::new("a", DataType::Int32, false),
59+
Field::new("b", DataType::Int32, false),
60+
Field::new("c", DataType::Int32, false),
61+
]);
62+
table_scan(Some(name), &schema, None)
63+
.unwrap()
64+
.build()
65+
.unwrap()
66+
}
67+
68+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches all filter over
69+
/// the *same* table (`t`), so the rule can merge them.
70+
fn build_merge_plan(n: usize) -> LogicalPlan {
71+
assert!(n >= 2);
72+
let mut builder: Option<LogicalPlanBuilder> = None;
73+
for i in 0..n {
74+
let branch = LogicalPlanBuilder::from(scan("t"))
75+
.filter(col("a").eq(lit(i as i32)))
76+
.unwrap()
77+
.build()
78+
.unwrap();
79+
builder = Some(match builder {
80+
None => LogicalPlanBuilder::from(branch),
81+
Some(b) => b.union(branch).unwrap(),
82+
});
83+
}
84+
builder.unwrap().distinct().unwrap().build().unwrap()
85+
}
86+
87+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each filter over a
88+
/// *different* table, so no merge is possible.
89+
fn build_no_merge_plan(n: usize) -> LogicalPlan {
90+
assert!(n >= 2);
91+
let mut builder: Option<LogicalPlanBuilder> = None;
92+
for i in 0..n {
93+
let branch = LogicalPlanBuilder::from(scan(&format!("t{i}")))
94+
.filter(col("a").eq(lit(i as i32)))
95+
.unwrap()
96+
.build()
97+
.unwrap();
98+
builder = Some(match builder {
99+
None => LogicalPlanBuilder::from(branch),
100+
Some(b) => b.union(branch).unwrap(),
101+
});
102+
}
103+
builder.unwrap().distinct().unwrap().build().unwrap()
104+
}
105+
106+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each wrap the
107+
/// filter inside a `Projection` over the *same* table.
108+
fn build_merge_with_projection_plan(n: usize) -> LogicalPlan {
109+
assert!(n >= 2);
110+
let mut builder: Option<LogicalPlanBuilder> = None;
111+
for i in 0..n {
112+
let branch = LogicalPlanBuilder::from(scan("t"))
113+
.filter(col("a").eq(lit(i as i32)))
114+
.unwrap()
115+
.project(vec![col("a"), col("b")])
116+
.unwrap()
117+
.build()
118+
.unwrap();
119+
builder = Some(match builder {
120+
None => LogicalPlanBuilder::from(branch),
121+
Some(b) => b.union(branch).unwrap(),
122+
});
123+
}
124+
builder.unwrap().distinct().unwrap().build().unwrap()
125+
}
126+
127+
/// Run the [`UnionsToFilter`] rule through the full [`Optimizer`] pipeline
128+
/// (single pass, feature flag enabled).
129+
fn run_optimizer(plan: &LogicalPlan, ctx: &OptimizerContext) -> LogicalPlan {
130+
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> =
131+
vec![Arc::new(UnionsToFilter::new())];
132+
Optimizer::with_rules(rules)
133+
.optimize(plan.clone(), ctx, |_, _| {})
134+
.unwrap()
135+
}
136+
137+
// ---------------------------------------------------------------------------
138+
// Benchmark functions
139+
// ---------------------------------------------------------------------------
140+
141+
fn bench_merge(c: &mut Criterion) {
142+
let mut options = ConfigOptions::default();
143+
options.optimizer.enable_unions_to_filter = true;
144+
let ctx =
145+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
146+
147+
let mut group = c.benchmark_group("unions_to_filter/merge");
148+
for n in [2, 8, 32, 128] {
149+
let plan = build_merge_plan(n);
150+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
151+
b.iter(|| black_box(run_optimizer(p, &ctx)));
152+
});
153+
}
154+
group.finish();
155+
}
156+
157+
fn bench_no_merge(c: &mut Criterion) {
158+
let mut options = ConfigOptions::default();
159+
options.optimizer.enable_unions_to_filter = true;
160+
let ctx =
161+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
162+
163+
let mut group = c.benchmark_group("unions_to_filter/no_merge");
164+
for n in [2, 8, 32, 128] {
165+
let plan = build_no_merge_plan(n);
166+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
167+
b.iter(|| black_box(run_optimizer(p, &ctx)));
168+
});
169+
}
170+
group.finish();
171+
}
172+
173+
fn bench_merge_with_projection(c: &mut Criterion) {
174+
let mut options = ConfigOptions::default();
175+
options.optimizer.enable_unions_to_filter = true;
176+
let ctx =
177+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
178+
179+
let mut group = c.benchmark_group("unions_to_filter/merge_with_projection");
180+
for n in [2, 8, 32, 128] {
181+
let plan = build_merge_with_projection_plan(n);
182+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
183+
b.iter(|| black_box(run_optimizer(p, &ctx)));
184+
});
185+
}
186+
group.finish();
187+
}
188+
189+
criterion_group!(
190+
benches,
191+
bench_merge,
192+
bench_no_merge,
193+
bench_merge_with_projection
194+
);
195+
criterion_main!(benches);

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub mod rewrite_set_comparison;
7070
pub mod scalar_subquery_to_join;
7171
pub mod simplify_expressions;
7272
pub mod single_distinct_to_groupby;
73+
pub mod unions_to_filter;
7374
pub mod utils;
7475

7576
#[cfg(test)]

datafusion/optimizer/src/optimizer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use crate::rewrite_set_comparison::RewriteSetComparison;
5656
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
5757
use crate::simplify_expressions::SimplifyExpressions;
5858
use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59+
use crate::unions_to_filter::UnionsToFilter;
5960
use crate::utils::log_plan;
6061

6162
/// Transforms one [`LogicalPlan`] into another which computes the same results,
@@ -280,6 +281,7 @@ impl Optimizer {
280281
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
281282
Arc::new(RewriteSetComparison::new()),
282283
Arc::new(OptimizeUnions::new()),
284+
Arc::new(UnionsToFilter::new()),
283285
Arc::new(SimplifyExpressions::new()),
284286
Arc::new(ReplaceDistinctWithAggregate::new()),
285287
Arc::new(EliminateJoin::new()),

0 commit comments

Comments
 (0)