Skip to content

Commit 8614308

Browse files
authored
perf: Optimize logical optimizer's OptimizeProjections pass (#21726)
## Which issue does this PR close? - Closes #21724. ## Rationale for this change Some profiling suggested that `OptimizeProjections` was among the most heavyweight of the logical optimizer passes for TPC-DS. This PR implements two distinct optimizations: 1. In `RequiredIndices::add_expr`, the previous implementation created a `HashSet` and walked the expression tree twice, adding reference columns to the `HashSet`. Finally, members of the `HashSet` were converted to indices. It is faster to just walk the expression tree once ourselves and convert column references to indices. This saves the HashSet allocation and insertions, plus one redundant tree walk. 2. In `optimize_projections`, we computed the minimal required set of `GROUP BY` columns, based on functional dependencies. This was relatively expensive; when there are no functional dependencies (common), this was still quite expensive but will always be a no-op. Add a short-circuit to skip the redundant computation in this scenario. Results on a newly added `optimize_projections` microbenchmark: ``` - tpch_q3: 14.6 µs → 11.9 µs (−18.5%) - tpch_q5: 17.4 µs → 14.0 µs (−19.4%) - clickbench_groupby: 10.3 µs → 6.8 µs (−34.1%) - tpcds_subquery: 11.2 µs → 8.7 µs (−22.1%) - small_schema: 1.87 µs → 1.68 µs (−10.3%) ``` ## What changes are included in this PR? * Add microbenchmark for `optimize_projections` * Implement two optimizations ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent 90a8117 commit 8614308

File tree

4 files changed

+310
-95
lines changed

4 files changed

+310
-95
lines changed

datafusion/optimizer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@ insta = { workspace = true }
7979
[[bench]]
8080
name = "projection_unnecessary"
8181
harness = false
82+
83+
[[bench]]
84+
name = "optimize_projections"
85+
harness = false
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Micro-benchmarks for the `OptimizeProjections` logical optimizer rule.
19+
//!
20+
//! Each case models a plan shape typical of TPC-H, TPC-DS, or ClickBench.
21+
//! Schemas use realistic widths and the rule operates on a fresh
22+
//! `LogicalPlan` per iteration (construction is in the criterion setup
23+
//! closure and excluded from measurement).
24+
25+
use std::hint::black_box;
26+
27+
use arrow::datatypes::{DataType, Field, Schema};
28+
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
29+
use datafusion_expr::{
30+
JoinType, LogicalPlan, LogicalPlanBuilder, col, lit, logical_plan::table_scan,
31+
};
32+
use datafusion_functions_aggregate::expr_fn::sum;
33+
use datafusion_optimizer::optimize_projections::OptimizeProjections;
34+
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
35+
36+
fn table(name: &str, cols: usize) -> LogicalPlan {
37+
let fields: Vec<Field> = (0..cols)
38+
.map(|i| Field::new(format!("c{i}"), DataType::Int32, true))
39+
.collect();
40+
table_scan(Some(name), &Schema::new(fields), None)
41+
.unwrap()
42+
.build()
43+
.unwrap()
44+
}
45+
46+
fn scan_with_filter(name: &str, cols: usize, filter_col: usize) -> LogicalPlan {
47+
LogicalPlanBuilder::from(table(name, cols))
48+
.filter(col(format!("{name}.c{filter_col}")).gt(lit(0i32)))
49+
.unwrap()
50+
.build()
51+
.unwrap()
52+
}
53+
54+
/// TPC-H Q3-like: customer ⨝ orders ⨝ lineitem with filters above each scan,
55+
/// GROUP BY 3 keys, 1 SUM aggregate. Models the canonical filter→join→aggregate
56+
/// analytical shape after PushDownFilter.
57+
fn plan_tpch_q3() -> LogicalPlan {
58+
let customer = scan_with_filter("customer", 8, 6);
59+
let orders = scan_with_filter("orders", 9, 4);
60+
let lineitem = scan_with_filter("lineitem", 16, 10);
61+
62+
LogicalPlanBuilder::from(customer)
63+
.join_on(
64+
orders,
65+
JoinType::Inner,
66+
vec![col("customer.c0").eq(col("orders.c1"))],
67+
)
68+
.unwrap()
69+
.join_on(
70+
lineitem,
71+
JoinType::Inner,
72+
vec![col("lineitem.c0").eq(col("orders.c0"))],
73+
)
74+
.unwrap()
75+
.aggregate(
76+
vec![col("lineitem.c0"), col("orders.c4"), col("orders.c7")],
77+
vec![sum(col("lineitem.c5") - col("lineitem.c6"))],
78+
)
79+
.unwrap()
80+
.build()
81+
.unwrap()
82+
}
83+
84+
/// TPC-H Q5-like: 6-way join through region→nation→customer→orders→lineitem
85+
/// →supplier, GROUP BY 1 key, 1 SUM. Exercises nested-join pruning depth.
86+
fn plan_tpch_q5() -> LogicalPlan {
87+
let region = scan_with_filter("region", 3, 1);
88+
let nation = table("nation", 4);
89+
let customer = table("customer", 8);
90+
let orders = table("orders", 9);
91+
let lineitem = table("lineitem", 16);
92+
let supplier = table("supplier", 7);
93+
94+
LogicalPlanBuilder::from(region)
95+
.join_on(
96+
nation,
97+
JoinType::Inner,
98+
vec![col("region.c0").eq(col("nation.c2"))],
99+
)
100+
.unwrap()
101+
.join_on(
102+
customer,
103+
JoinType::Inner,
104+
vec![col("nation.c0").eq(col("customer.c3"))],
105+
)
106+
.unwrap()
107+
.join_on(
108+
orders,
109+
JoinType::Inner,
110+
vec![col("customer.c0").eq(col("orders.c1"))],
111+
)
112+
.unwrap()
113+
.join_on(
114+
lineitem,
115+
JoinType::Inner,
116+
vec![col("lineitem.c0").eq(col("orders.c0"))],
117+
)
118+
.unwrap()
119+
.join_on(
120+
supplier,
121+
JoinType::Inner,
122+
vec![col("lineitem.c2").eq(col("supplier.c0"))],
123+
)
124+
.unwrap()
125+
.aggregate(
126+
vec![col("nation.c1")],
127+
vec![sum(col("lineitem.c5") - col("lineitem.c6"))],
128+
)
129+
.unwrap()
130+
.build()
131+
.unwrap()
132+
}
133+
134+
/// ClickBench-style: single wide `hits` table (100 cols), conjunctive filter,
135+
/// GROUP BY 2 keys, 2 SUM aggregates. Stresses wide-schema column lookup.
136+
fn plan_clickbench_groupby() -> LogicalPlan {
137+
let hits = table("hits", 100);
138+
let predicate = col("hits.c5")
139+
.gt(lit(100i32))
140+
.and(col("hits.c12").lt(lit(1000i32)));
141+
LogicalPlanBuilder::from(hits)
142+
.filter(predicate)
143+
.unwrap()
144+
.aggregate(
145+
vec![col("hits.c3"), col("hits.c7")],
146+
vec![sum(col("hits.c42")), sum(col("hits.c60"))],
147+
)
148+
.unwrap()
149+
.build()
150+
.unwrap()
151+
}
152+
153+
/// TPC-DS-style CTE shape: a SubqueryAlias wrapping a filter+projection over
154+
/// a wide fact table, joined back on two dimension tables and aggregated.
155+
fn plan_tpcds_subquery() -> LogicalPlan {
156+
let store_sales = table("store_sales", 23);
157+
let customer = table("customer", 18);
158+
let item = table("item", 22);
159+
160+
let sub = LogicalPlanBuilder::from(store_sales)
161+
.filter(col("store_sales.c5").gt(lit(0i32)))
162+
.unwrap()
163+
.project(vec![
164+
col("store_sales.c0"),
165+
col("store_sales.c3"),
166+
col("store_sales.c13"),
167+
])
168+
.unwrap()
169+
.alias("sub")
170+
.unwrap()
171+
.build()
172+
.unwrap();
173+
174+
LogicalPlanBuilder::from(customer)
175+
.join_on(
176+
sub,
177+
JoinType::Inner,
178+
vec![col("customer.c0").eq(col("sub.c3"))],
179+
)
180+
.unwrap()
181+
.join_on(
182+
item,
183+
JoinType::Inner,
184+
vec![col("item.c0").eq(col("sub.c0"))],
185+
)
186+
.unwrap()
187+
.aggregate(vec![col("customer.c2")], vec![sum(col("sub.c13"))])
188+
.unwrap()
189+
.build()
190+
.unwrap()
191+
}
192+
193+
/// Narrow 10-column table, single filter, project 3 cols. Guards against
194+
/// regressions on the common small-schema case where a lookup-map fix for
195+
/// wide schemas might hurt by adding hashing overhead.
196+
fn plan_small_schema() -> LogicalPlan {
197+
LogicalPlanBuilder::from(table("t", 10))
198+
.filter(col("t.c3").gt(lit(0i32)))
199+
.unwrap()
200+
.project(vec![col("t.c0"), col("t.c1"), col("t.c5")])
201+
.unwrap()
202+
.build()
203+
.unwrap()
204+
}
205+
206+
type BenchCase = (&'static str, fn() -> LogicalPlan);
207+
208+
fn bench_optimize_projections(c: &mut Criterion) {
209+
let rule = OptimizeProjections::new();
210+
let config = OptimizerContext::new();
211+
let mut group = c.benchmark_group("optimize_projections");
212+
213+
let cases: &[BenchCase] = &[
214+
("tpch_q3", plan_tpch_q3),
215+
("tpch_q5", plan_tpch_q5),
216+
("clickbench_groupby", plan_clickbench_groupby),
217+
("tpcds_subquery", plan_tpcds_subquery),
218+
("small_schema", plan_small_schema),
219+
];
220+
221+
for (name, build) in cases {
222+
group.bench_function(*name, |b| {
223+
b.iter_batched(
224+
build,
225+
|plan| black_box(rule.rewrite(plan, &config).unwrap()),
226+
BatchSize::SmallInput,
227+
);
228+
});
229+
}
230+
231+
group.finish();
232+
}
233+
234+
criterion_group!(benches, bench_optimize_projections);
235+
criterion_main!(benches);

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 32 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ mod required_indices;
2121

2222
use crate::optimizer::ApplyOrder;
2323
use crate::{OptimizerConfig, OptimizerRule};
24-
use std::collections::HashSet;
2524
use std::sync::Arc;
2625

2726
use datafusion_common::{
@@ -147,26 +146,39 @@ fn optimize_projections(
147146
// `aggregate.aggr_expr`:
148147
let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
149148

150-
// Get absolutely necessary GROUP BY fields:
151-
let group_by_expr_existing = aggregate
152-
.group_expr
153-
.iter()
154-
.map(|group_by_expr| group_by_expr.schema_name().to_string())
155-
.collect::<Vec<_>>();
156-
157-
let new_group_bys = if let Some(simplest_groupby_indices) =
158-
get_required_group_by_exprs_indices(
159-
aggregate.input.schema(),
160-
&group_by_expr_existing,
161-
) {
162-
// Some of the fields in the GROUP BY may be required by the
163-
// parent even if these fields are unnecessary in terms of
164-
// functional dependency.
165-
group_by_reqs
166-
.append(&simplest_groupby_indices)
167-
.get_at_indices(&aggregate.group_expr)
168-
} else {
149+
// Get absolutely necessary GROUP BY fields.
150+
//
151+
// When the input has no functional dependencies, we can
152+
// short-circuit this analysis.
153+
let new_group_bys = if aggregate
154+
.input
155+
.schema()
156+
.functional_dependencies()
157+
.is_empty()
158+
{
169159
aggregate.group_expr
160+
} else {
161+
let group_by_expr_existing = aggregate
162+
.group_expr
163+
.iter()
164+
.map(|group_by_expr| group_by_expr.schema_name().to_string())
165+
.collect::<Vec<_>>();
166+
167+
if let Some(simplest_groupby_indices) =
168+
get_required_group_by_exprs_indices(
169+
aggregate.input.schema(),
170+
&group_by_expr_existing,
171+
)
172+
{
173+
// Some of the fields in the GROUP BY may be required by
174+
// the parent even if these fields are unnecessary in
175+
// terms of functional dependency.
176+
group_by_reqs
177+
.append(&simplest_groupby_indices)
178+
.get_at_indices(&aggregate.group_expr)
179+
} else {
180+
aggregate.group_expr
181+
}
170182
};
171183

172184
// Only use the absolutely necessary aggregate expressions required
@@ -682,56 +694,6 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
682694
})
683695
}
684696

685-
/// Accumulates outer-referenced columns by the
686-
/// given expression, `expr`.
687-
///
688-
/// # Parameters
689-
///
690-
/// * `expr` - The expression to analyze for outer-referenced columns.
691-
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
692-
/// columns are collected.
693-
fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) {
694-
// inspect_expr_pre doesn't handle subquery references, so find them explicitly
695-
expr.apply(|expr| {
696-
match expr {
697-
Expr::OuterReferenceColumn(_, col) => {
698-
columns.insert(col);
699-
}
700-
Expr::ScalarSubquery(subquery) => {
701-
outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
702-
}
703-
Expr::Exists(exists) => {
704-
outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
705-
}
706-
Expr::InSubquery(insubquery) => {
707-
outer_columns_helper_multi(
708-
&insubquery.subquery.outer_ref_columns,
709-
columns,
710-
);
711-
}
712-
_ => {}
713-
};
714-
Ok(TreeNodeRecursion::Continue)
715-
})
716-
// unwrap: closure above never returns Err, so can not be Err here
717-
.unwrap();
718-
}
719-
720-
/// A recursive subroutine that accumulates outer-referenced columns by the
721-
/// given expressions (`exprs`).
722-
///
723-
/// # Parameters
724-
///
725-
/// * `exprs` - The expressions to analyze for outer-referenced columns.
726-
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
727-
/// columns are collected.
728-
fn outer_columns_helper_multi<'a, 'b>(
729-
exprs: impl IntoIterator<Item = &'a Expr>,
730-
columns: &'b mut HashSet<&'a Column>,
731-
) {
732-
exprs.into_iter().for_each(|e| outer_columns(e, columns));
733-
}
734-
735697
/// Splits requirement indices for a join into left and right children based on
736698
/// the join type.
737699
///

0 commit comments

Comments
 (0)