Skip to content

Commit fed07d5

Browse files
committed
Add ExpressionAnalyzer for pluggable expression-level statistics
Introduce ExpressionAnalyzer, a chain-of-responsibility framework for expression-level statistics estimation (NDV, selectivity, min/max). Framework: - ExpressionAnalyzer trait with registry parameter for chain delegation - ExpressionAnalyzerRegistry to chain analyzers (first Computed wins) - DefaultExpressionAnalyzer: Selinger-style estimation for columns, literals, binary expressions, NOT, boolean predicates Integration: - ExpressionAnalyzerRegistry stored in SessionState, initialized once - ProjectionExprs stores optional registry (non-breaking, no signature changes to project_statistics) - ProjectionExec sets registry via Projector, injected by planner - FilterExec uses registry for selectivity when interval analysis cannot handle the predicate - Custom nodes get builtin analyzer as fallback when registry is absent
1 parent 4084a18 commit fed07d5

10 files changed

Lines changed: 1131 additions & 13 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,13 @@ config_namespace! {
963963
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
964964
pub enable_dynamic_filter_pushdown: bool, default = true
965965

966+
/// When set to true, the physical planner will use the ExpressionAnalyzer
967+
/// framework for expression-level statistics estimation (NDV, selectivity,
968+
/// min/max, null fraction) in projections and filters. When false, projections
969+
/// return unknown statistics for non-column expressions and filters use the
970+
/// default selectivity heuristic.
971+
pub enable_expression_analyzer: bool, default = false
972+
966973
/// When set to true, the optimizer will insert filters before a join between
967974
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
968975
/// filter can add additional overhead when the file format does not fully support

datafusion/core/src/execution/session_state.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use datafusion_optimizer::{
6464
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
6565
};
6666
use datafusion_physical_expr::create_physical_expr;
67+
use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry;
6768
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
6869
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6970
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
@@ -184,6 +185,8 @@ pub struct SessionState {
184185
///
185186
/// It will be invoked on `CREATE FUNCTION` statements.
186187
/// thus, changing dialect o PostgreSql is required
188+
/// Registry for expression-level statistics analyzers (NDV, selectivity, etc.)
189+
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
187190
function_factory: Option<Arc<dyn FunctionFactory>>,
188191
cache_factory: Option<Arc<dyn CacheFactory>>,
189192
/// Cache logical plans of prepared statements for later execution.
@@ -202,6 +205,10 @@ impl Debug for SessionState {
202205
.field("runtime_env", &self.runtime_env)
203206
.field("catalog_list", &self.catalog_list)
204207
.field("serializer_registry", &self.serializer_registry)
208+
.field(
209+
"expression_analyzer_registry",
210+
&self.expression_analyzer_registry,
211+
)
205212
.field("file_formats", &self.file_formats)
206213
.field("execution_props", &self.execution_props)
207214
.field("table_options", &self.table_options)
@@ -909,6 +916,11 @@ impl SessionState {
909916
&self.serializer_registry
910917
}
911918

919+
/// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics
920+
pub fn expression_analyzer_registry(&self) -> &Arc<ExpressionAnalyzerRegistry> {
921+
&self.expression_analyzer_registry
922+
}
923+
912924
/// Return version of the cargo package that produced this query
913925
pub fn version(&self) -> &str {
914926
env!("CARGO_PKG_VERSION")
@@ -988,6 +1000,7 @@ pub struct SessionStateBuilder {
9881000
aggregate_functions: Option<Vec<Arc<AggregateUDF>>>,
9891001
window_functions: Option<Vec<Arc<WindowUDF>>>,
9901002
serializer_registry: Option<Arc<dyn SerializerRegistry>>,
1003+
expression_analyzer_registry: Option<Arc<ExpressionAnalyzerRegistry>>,
9911004
file_formats: Option<Vec<Arc<dyn FileFormatFactory>>>,
9921005
config: Option<SessionConfig>,
9931006
table_options: Option<TableOptions>,
@@ -1028,6 +1041,7 @@ impl SessionStateBuilder {
10281041
aggregate_functions: None,
10291042
window_functions: None,
10301043
serializer_registry: None,
1044+
expression_analyzer_registry: None,
10311045
file_formats: None,
10321046
table_options: None,
10331047
config: None,
@@ -1083,6 +1097,7 @@ impl SessionStateBuilder {
10831097
),
10841098
window_functions: Some(existing.window_functions.into_values().collect_vec()),
10851099
serializer_registry: Some(existing.serializer_registry),
1100+
expression_analyzer_registry: Some(existing.expression_analyzer_registry),
10861101
file_formats: Some(existing.file_formats.into_values().collect_vec()),
10871102
config: Some(new_config),
10881103
table_options: Some(existing.table_options),
@@ -1326,6 +1341,15 @@ impl SessionStateBuilder {
13261341
self
13271342
}
13281343

1344+
/// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics
1345+
pub fn with_expression_analyzer_registry(
1346+
mut self,
1347+
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
1348+
) -> Self {
1349+
self.expression_analyzer_registry = Some(expression_analyzer_registry);
1350+
self
1351+
}
1352+
13291353
/// Set the map of [`FileFormatFactory`]s
13301354
pub fn with_file_formats(
13311355
mut self,
@@ -1456,6 +1480,7 @@ impl SessionStateBuilder {
14561480
aggregate_functions,
14571481
window_functions,
14581482
serializer_registry,
1483+
expression_analyzer_registry,
14591484
file_formats,
14601485
table_options,
14611486
config,
@@ -1493,6 +1518,8 @@ impl SessionStateBuilder {
14931518
window_functions: HashMap::new(),
14941519
serializer_registry: serializer_registry
14951520
.unwrap_or_else(|| Arc::new(EmptySerializerRegistry)),
1521+
expression_analyzer_registry: expression_analyzer_registry
1522+
.unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())),
14961523
file_formats: HashMap::new(),
14971524
table_options: table_options.unwrap_or_else(|| {
14981525
TableOptions::default_from_session_config(config.options())
@@ -1675,6 +1702,13 @@ impl SessionStateBuilder {
16751702
&mut self.serializer_registry
16761703
}
16771704

1705+
/// Returns the current expression_analyzer_registry value
1706+
pub fn expression_analyzer_registry(
1707+
&mut self,
1708+
) -> &mut Option<Arc<ExpressionAnalyzerRegistry>> {
1709+
&mut self.expression_analyzer_registry
1710+
}
1711+
16781712
/// Returns the current file_formats value
16791713
pub fn file_formats(&mut self) -> &mut Option<Vec<Arc<dyn FileFormatFactory>>> {
16801714
&mut self.file_formats
@@ -1750,6 +1784,10 @@ impl Debug for SessionStateBuilder {
17501784
.field("runtime_env", &self.runtime_env)
17511785
.field("catalog_list", &self.catalog_list)
17521786
.field("serializer_registry", &self.serializer_registry)
1787+
.field(
1788+
"expression_analyzer_registry",
1789+
&self.expression_analyzer_registry,
1790+
)
17531791
.field("file_formats", &self.file_formats)
17541792
.field("execution_props", &self.execution_props)
17551793
.field("table_options", &self.table_options)

datafusion/core/src/physical_planner.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,12 +1106,23 @@ impl DefaultPhysicalPlanner {
11061106
input_schema.as_arrow(),
11071107
)? {
11081108
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
1109-
FilterExecBuilder::new(
1109+
let builder = FilterExecBuilder::new(
11101110
Arc::clone(&runtime_expr[0]),
11111111
physical_input,
11121112
)
1113-
.with_batch_size(session_state.config().batch_size())
1114-
.build()?
1113+
.with_batch_size(session_state.config().batch_size());
1114+
let builder = if session_state
1115+
.config_options()
1116+
.optimizer
1117+
.enable_expression_analyzer
1118+
{
1119+
builder.with_expression_analyzer_registry(Arc::clone(
1120+
session_state.expression_analyzer_registry(),
1121+
))
1122+
} else {
1123+
builder
1124+
};
1125+
builder.build()?
11151126
}
11161127
PlanAsyncExpr::Async(
11171128
async_map,
@@ -1121,7 +1132,7 @@ impl DefaultPhysicalPlanner {
11211132
async_map.async_exprs,
11221133
physical_input,
11231134
)?;
1124-
FilterExecBuilder::new(
1135+
let builder = FilterExecBuilder::new(
11251136
Arc::clone(&runtime_expr[0]),
11261137
Arc::new(async_exec),
11271138
)
@@ -1130,8 +1141,19 @@ impl DefaultPhysicalPlanner {
11301141
.apply_projection(Some(
11311142
(0..input.schema().fields().len()).collect::<Vec<_>>(),
11321143
))?
1133-
.with_batch_size(session_state.config().batch_size())
1134-
.build()?
1144+
.with_batch_size(session_state.config().batch_size());
1145+
let builder = if session_state
1146+
.config_options()
1147+
.optimizer
1148+
.enable_expression_analyzer
1149+
{
1150+
builder.with_expression_analyzer_registry(Arc::clone(
1151+
session_state.expression_analyzer_registry(),
1152+
))
1153+
} else {
1154+
builder
1155+
};
1156+
builder.build()?
11351157
}
11361158
_ => {
11371159
return internal_err!(
@@ -2892,7 +2914,17 @@ impl DefaultPhysicalPlanner {
28922914
.into_iter()
28932915
.map(|(expr, alias)| ProjectionExpr { expr, alias })
28942916
.collect();
2895-
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
2917+
let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?;
2918+
if session_state
2919+
.config_options()
2920+
.optimizer
2921+
.enable_expression_analyzer
2922+
{
2923+
proj_exec = proj_exec.with_expression_analyzer_registry(Arc::clone(
2924+
session_state.expression_analyzer_registry(),
2925+
));
2926+
}
2927+
Ok(Arc::new(proj_exec))
28962928
}
28972929
PlanAsyncExpr::Async(
28982930
async_map,
@@ -2904,8 +2936,17 @@ impl DefaultPhysicalPlanner {
29042936
.into_iter()
29052937
.map(|(expr, alias)| ProjectionExpr { expr, alias })
29062938
.collect();
2907-
let new_proj_exec =
2939+
let mut new_proj_exec =
29082940
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
2941+
if session_state
2942+
.config_options()
2943+
.optimizer
2944+
.enable_expression_analyzer
2945+
{
2946+
new_proj_exec = new_proj_exec.with_expression_analyzer_registry(
2947+
Arc::clone(session_state.expression_analyzer_registry()),
2948+
);
2949+
}
29092950
Ok(Arc::new(new_proj_exec))
29102951
}
29112952
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),

0 commit comments

Comments
 (0)