Skip to content

Commit 5d1ba59

Browse files
committed
Inject ExpressionAnalyzerRegistry into exec nodes and re-inject via optimizer loop
Add trait methods on ExecutionPlan for expression-level statistics injection (uses_expression_level_statistics, with_expression_analyzer_registry, expression_analyzer_registry). The physical planner injects the registry after plan creation and re-injects after each optimizer rule that modifies the plan, gated by the use_expression_analyzer config flag.
1 parent 9b91988 commit 5d1ba59

16 files changed

Lines changed: 452 additions & 163 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,11 +1113,9 @@ config_namespace! {
11131113
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
11141114
pub enable_dynamic_filter_pushdown: bool, default = true
11151115

1116-
/// When set to true, the physical planner uses the ExpressionAnalyzer
1117-
/// framework for expression-level statistics estimation (NDV, selectivity,
1118-
/// min/max, null fraction). When `use_statistics_registry` is also enabled,
1119-
/// the registry providers (filters, projections) also use it.
1120-
/// When false, existing behavior is unchanged.
1116+
/// When set to true, the pluggable `ExpressionAnalyzerRegistry` from
1117+
/// `SessionState` is used for expression-level statistics estimation
1118+
/// (NDV, selectivity, min/max, null fraction) in physical plan operators.
11211119
pub use_expression_analyzer: bool, default = false
11221120

11231121
/// When set to true, the optimizer will insert filters before a join between
@@ -1252,9 +1250,11 @@ config_namespace! {
12521250
pub join_reordering: bool, default = true
12531251

12541252
/// When set to true, the physical plan optimizer uses the pluggable
1255-
/// `StatisticsRegistry` for statistics propagation across operators.
1256-
/// This enables more accurate cardinality estimates compared to each
1257-
/// operator's built-in `partition_statistics`.
1253+
/// `StatisticsRegistry` for a bottom-up statistics walk across operators,
1254+
/// enabling more accurate cardinality estimates. Enabling
1255+
/// `use_expression_analyzer` alongside this flag gives built-in
1256+
/// providers access to custom expression-level analyzers (NDV,
1257+
/// selectivity) for the operators they process.
12581258
pub use_statistics_registry: bool, default = false
12591259

12601260
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.

datafusion/core/src/execution/session_state.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,6 @@ pub struct SessionState {
190190
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
191191
/// Runtime environment
192192
runtime_env: Arc<RuntimeEnv>,
193-
/// Registry for expression-level statistics analyzers (NDV, selectivity, etc.)
194-
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
195193
/// [FunctionFactory] to support pluggable user defined function handler.
196194
///
197195
/// It will be invoked on `CREATE FUNCTION` statements.
@@ -217,6 +215,13 @@ impl PhysicalOptimizerContext for SessionState {
217215
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
218216
self.statistics_registry.as_ref()
219217
}
218+
219+
fn expression_analyzer_registry(&self) -> Option<&Arc<ExpressionAnalyzerRegistry>> {
220+
self.config_options()
221+
.optimizer
222+
.use_expression_analyzer
223+
.then_some(&self.expression_analyzer_registry)
224+
}
220225
}
221226

222227
impl Debug for SessionState {

datafusion/core/src/physical_planner.rs

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,11 +1111,6 @@ impl DefaultPhysicalPlanner {
11111111
physical_input,
11121112
)
11131113
.with_batch_size(session_state.config().batch_size());
1114-
let builder =
1115-
match Self::expression_analyzer_registry(session_state) {
1116-
Some(r) => builder.with_expression_analyzer_registry(r),
1117-
None => builder,
1118-
};
11191114
builder.build()?
11201115
}
11211116
PlanAsyncExpr::Async(
@@ -1126,7 +1121,7 @@ impl DefaultPhysicalPlanner {
11261121
async_map.async_exprs,
11271122
physical_input,
11281123
)?;
1129-
let builder = FilterExecBuilder::new(
1124+
FilterExecBuilder::new(
11301125
Arc::clone(&runtime_expr[0]),
11311126
Arc::new(async_exec),
11321127
)
@@ -1135,13 +1130,8 @@ impl DefaultPhysicalPlanner {
11351130
.apply_projection(Some(
11361131
(0..input.schema().fields().len()).collect::<Vec<_>>(),
11371132
))?
1138-
.with_batch_size(session_state.config().batch_size());
1139-
let builder =
1140-
match Self::expression_analyzer_registry(session_state) {
1141-
Some(r) => builder.with_expression_analyzer_registry(r),
1142-
None => builder,
1143-
};
1144-
builder.build()?
1133+
.with_batch_size(session_state.config().batch_size())
1134+
.build()?
11451135
}
11461136
_ => {
11471137
return internal_err!(
@@ -1664,15 +1654,16 @@ impl DefaultPhysicalPlanner {
16641654
{
16651655
// Use SortMergeJoin if hash join is not preferred
16661656
let join_on_len = join_on.len();
1667-
Arc::new(SortMergeJoinExec::try_new(
1657+
let exec = SortMergeJoinExec::try_new(
16681658
physical_left,
16691659
physical_right,
16701660
join_on,
16711661
join_filter,
16721662
*join_type,
16731663
vec![SortOptions::default(); join_on_len],
16741664
*null_equality,
1675-
)?)
1665+
)?;
1666+
Arc::new(exec)
16761667
} else if session_state.config().target_partitions() > 1
16771668
&& session_state.config().repartition_joins()
16781669
&& prefer_hash_join
@@ -2780,15 +2771,30 @@ impl DefaultPhysicalPlanner {
27802771
// to verify that the plan fulfills the base requirements.
27812772
InvariantChecker(InvariantLevel::Always).check(&plan)?;
27822773

2774+
let use_expression_analyzer = session_state
2775+
.config_options()
2776+
.optimizer
2777+
.use_expression_analyzer;
27832778
let mut new_plan = Arc::clone(&plan);
27842779
for optimizer in optimizers {
27852780
let before_schema = new_plan.schema();
2781+
let plan_before_rule = Arc::clone(&new_plan);
27862782
new_plan = optimizer
27872783
.optimize_with_context(new_plan, session_state)
27882784
.map_err(|e| {
27892785
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
27902786
})?;
27912787

2788+
// Re-inject ExpressionAnalyzer registry into any exec nodes created or replaced by
2789+
// this rule. Skip if the rule returned the same plan unchanged to
2790+
// avoid an O(nodes) walk for no-op rules.
2791+
if use_expression_analyzer && !Arc::ptr_eq(&plan_before_rule, &new_plan) {
2792+
new_plan = Self::inject_expression_analyzer(
2793+
new_plan,
2794+
session_state.expression_analyzer_registry(),
2795+
)?;
2796+
}
2797+
27922798
// This only checks the schema in release build, and performs additional checks in debug mode.
27932799
OptimizationInvariantChecker::new(optimizer)
27942800
.check(&new_plan, &before_schema)?;
@@ -2858,15 +2864,22 @@ impl DefaultPhysicalPlanner {
28582864
Ok(mem_exec)
28592865
}
28602866

2861-
/// Returns the expression analyzer registry if the config option is enabled.
2862-
fn expression_analyzer_registry(
2863-
session_state: &SessionState,
2864-
) -> Option<Arc<ExpressionAnalyzerRegistry>> {
2865-
session_state
2866-
.config_options()
2867-
.optimizer
2868-
.use_expression_analyzer
2869-
.then(|| Arc::clone(session_state.expression_analyzer_registry()))
2867+
/// Walks `plan` and injects `registry` into every exec node that accepts it,
2868+
/// skipping nodes that already have a registry set.
2869+
fn inject_expression_analyzer(
2870+
plan: Arc<dyn ExecutionPlan>,
2871+
registry: &Arc<ExpressionAnalyzerRegistry>,
2872+
) -> Result<Arc<dyn ExecutionPlan>> {
2873+
use datafusion_common::tree_node::{Transformed, TreeNode};
2874+
2875+
plan.transform_up(|node| {
2876+
if let Some(updated) = node.with_expression_analyzer_registry(registry) {
2877+
Ok(Transformed::yes(updated))
2878+
} else {
2879+
Ok(Transformed::no(node))
2880+
}
2881+
})
2882+
.map(|t| t.data)
28702883
}
28712884

28722885
fn create_project_physical_exec(
@@ -2928,11 +2941,7 @@ impl DefaultPhysicalPlanner {
29282941
.into_iter()
29292942
.map(|(expr, alias)| ProjectionExpr { expr, alias })
29302943
.collect();
2931-
let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?;
2932-
if let Some(r) = Self::expression_analyzer_registry(session_state) {
2933-
proj_exec = proj_exec.with_expression_analyzer_registry(r);
2934-
}
2935-
Ok(Arc::new(proj_exec))
2944+
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
29362945
}
29372946
PlanAsyncExpr::Async(
29382947
async_map,
@@ -2944,18 +2953,10 @@ impl DefaultPhysicalPlanner {
29442953
.into_iter()
29452954
.map(|(expr, alias)| ProjectionExpr { expr, alias })
29462955
.collect();
2947-
let mut new_proj_exec =
2948-
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
2949-
if session_state
2950-
.config_options()
2951-
.optimizer
2952-
.use_expression_analyzer
2953-
{
2954-
new_proj_exec = new_proj_exec.with_expression_analyzer_registry(
2955-
Arc::clone(session_state.expression_analyzer_registry()),
2956-
);
2957-
}
2958-
Ok(Arc::new(new_proj_exec))
2956+
Ok(Arc::new(ProjectionExec::try_new(
2957+
proj_exprs,
2958+
Arc::new(async_exec),
2959+
)?))
29592960
}
29602961
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
29612962
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Tests that ExpressionAnalyzerRegistry is preserved through optimizer rules.
19+
//!
20+
//! Optimizer rules like CombinePartialFinalAggregate, EnforceDistribution, and
21+
//! ProjectionPushdown rebuild exec nodes internally. The post-rule injection loop
22+
//! in the physical planner must re-inject the ExpressionAnalyzer registry into those new nodes.
23+
24+
#[cfg(test)]
25+
mod test {
26+
use std::sync::Arc;
27+
28+
use arrow::array::{Int64Array, RecordBatch};
29+
use arrow_schema::{DataType, Field, Schema};
30+
use datafusion::prelude::SessionContext;
31+
use datafusion_common::Result;
32+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
33+
use datafusion_execution::config::SessionConfig;
34+
use datafusion_physical_plan::ExecutionPlan;
35+
36+
/// Creates a SessionContext with `use_expression_analyzer = true`.
37+
fn make_ctx() -> SessionContext {
38+
let mut config = SessionConfig::new();
39+
config.options_mut().optimizer.use_expression_analyzer = true;
40+
SessionContext::new_with_config(config)
41+
}
42+
43+
/// Registers an in-memory table with a single int64 column.
44+
async fn register_table(ctx: &SessionContext, name: &str) -> Result<()> {
45+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
46+
let batch = RecordBatch::try_new(
47+
Arc::clone(&schema),
48+
vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
49+
)?;
50+
let table = datafusion::datasource::MemTable::try_new(schema, vec![vec![batch]])?;
51+
ctx.register_table(name, Arc::new(table))?;
52+
Ok(())
53+
}
54+
55+
/// Asserts that every node in `plan` that declares
56+
/// `uses_expression_level_statistics() == true` has the registry set.
57+
fn assert_expression_analyzer_injected(plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
58+
plan.clone().apply(|node| {
59+
if node.uses_expression_level_statistics() {
60+
assert!(
61+
node.expression_analyzer_registry().is_some(),
62+
"{} declares uses_expression_level_statistics but has no registry after optimization",
63+
node.name()
64+
);
65+
}
66+
Ok(TreeNodeRecursion::Continue)
67+
})?;
68+
Ok(())
69+
}
70+
71+
/// Filter and aggregate go through CombinePartialFinalAggregate which rebuilds
72+
/// the partial/final AggregateExec pair; both must end up with the ExpressionAnalyzer registry injected.
73+
#[tokio::test]
74+
async fn test_expression_analyzer_injected_through_aggregate_rewrite() -> Result<()> {
75+
let ctx = make_ctx();
76+
register_table(&ctx, "t").await?;
77+
78+
let df = ctx
79+
.sql("SELECT a, COUNT(*) FROM t WHERE a > 1 GROUP BY a")
80+
.await?;
81+
let plan = df.create_physical_plan().await?;
82+
83+
assert_expression_analyzer_injected(&plan)
84+
}
85+
86+
/// A simple filter query - FilterExec must have the registry even without complex rewrites.
87+
#[tokio::test]
88+
async fn test_expression_analyzer_injected_filter() -> Result<()> {
89+
let ctx = make_ctx();
90+
register_table(&ctx, "t").await?;
91+
92+
let df = ctx.sql("SELECT a FROM t WHERE a > 1").await?;
93+
let plan = df.create_physical_plan().await?;
94+
95+
assert_expression_analyzer_injected(&plan)
96+
}
97+
98+
/// A join query - HashJoinExec must have the registry after join selection rewrites it.
99+
#[tokio::test]
100+
async fn test_expression_analyzer_injected_join() -> Result<()> {
101+
let ctx = make_ctx();
102+
register_table(&ctx, "t1").await?;
103+
register_table(&ctx, "t2").await?;
104+
105+
let df = ctx
106+
.sql("SELECT t1.a FROM t1 JOIN t2 ON t1.a = t2.a WHERE t1.a > 0")
107+
.await?;
108+
let plan = df.create_physical_plan().await?;
109+
110+
assert_expression_analyzer_injected(&plan)
111+
}
112+
113+
/// Disabled by default: verify that the flag gates the injection walk.
114+
/// When `use_expression_analyzer = false`, exec nodes must NOT have the registry set.
115+
#[tokio::test]
116+
async fn test_expression_analyzer_not_injected_when_disabled() -> Result<()> {
117+
let ctx = SessionContext::new(); // default config, ExpressionAnalyzer disabled
118+
119+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
120+
let batch = RecordBatch::try_new(
121+
Arc::clone(&schema),
122+
vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
123+
)?;
124+
let table = datafusion::datasource::MemTable::try_new(schema, vec![vec![batch]])?;
125+
ctx.register_table("t", Arc::new(table))?;
126+
127+
let df = ctx.sql("SELECT a FROM t WHERE a > 1").await?;
128+
let plan = df.create_physical_plan().await?;
129+
130+
plan.clone().apply(|node| {
131+
if node.uses_expression_level_statistics() {
132+
assert!(
133+
node.expression_analyzer_registry().is_none(),
134+
"{} should NOT have the registry when use_expression_analyzer=false",
135+
node.name()
136+
);
137+
}
138+
Ok(TreeNodeRecursion::Continue)
139+
})?;
140+
Ok(())
141+
}
142+
}

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ mod sanity_checker;
3838
mod test_utils;
3939
mod window_optimize;
4040

41+
mod expression_analyzer_injection;
4142
mod pushdown_utils;

0 commit comments

Comments
 (0)