Skip to content

Commit 4be9245

Browse files
committed
test(expression_analyzer): add StatisticsTable and end-to-end SLT for OR selectivity
Add a reusable StatisticsTable (TableProvider + ExecutionPlan with user-supplied statistics) to the sqllogictest harness, and use it in expression_analyzer.slt
1 parent 2c26e96 commit 4be9245

2 files changed

Lines changed: 284 additions & 0 deletions

File tree

datafusion/sqllogictest/src/test_context.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ impl TestContext {
177177
info!("Registering dummy async udf");
178178
register_async_abs_udf(test_ctx.session_ctx())
179179
}
180+
"expression_analyzer.slt" => {
181+
info!("Registering tables with controlled statistics");
182+
statistics_table::register_statistics_tables(test_ctx.session_ctx());
183+
}
180184
_ => {
181185
info!("Using default SessionContext");
182186
}
@@ -615,3 +619,165 @@ fn register_async_abs_udf(ctx: &SessionContext) {
615619
let udf = AsyncScalarUDF::new(Arc::new(async_abs));
616620
ctx.register_udf(udf.into_scalar_udf());
617621
}
622+
623+
/// A table provider with fully controlled statistics for testing
624+
/// statistics-dependent optimizer and planner behaviors.
625+
///
626+
/// Unlike [`MemTable`] (which derives statistics from actual data), this
627+
/// provider returns whatever [`Statistics`] you supply, letting tests exercise
628+
/// code paths that depend on specific column NDV, min/max, or row-count values
629+
/// without needing real data files.
630+
pub mod statistics_table {
631+
use std::sync::Arc;
632+
633+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
634+
use async_trait::async_trait;
635+
use datafusion::catalog::Session;
636+
use datafusion::common::tree_node::TreeNodeRecursion;
637+
use datafusion::common::{Result, stats::Precision};
638+
use datafusion::datasource::{TableProvider, TableType};
639+
use datafusion::execution::TaskContext;
640+
use datafusion::logical_expr::Expr;
641+
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr};
642+
use datafusion::physical_plan::{
643+
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
644+
PlanProperties, SendableRecordBatchStream, Statistics,
645+
execution_plan::{Boundedness, EmissionType},
646+
};
647+
use datafusion::prelude::SessionContext;
648+
649+
/// A [`TableProvider`] and [`ExecutionPlan`] that returns user-supplied
650+
/// statistics. Useful for testing code paths that depend on specific column
651+
/// NDV, min/max, or row counts without requiring real data files.
652+
#[derive(Debug, Clone)]
653+
pub struct StatisticsTable {
654+
schema: SchemaRef,
655+
stats: Statistics,
656+
cache: Arc<PlanProperties>,
657+
}
658+
659+
impl StatisticsTable {
660+
pub fn new(schema: SchemaRef, stats: Statistics) -> Self {
661+
assert_eq!(
662+
schema.fields().len(),
663+
stats.column_statistics.len(),
664+
"column_statistics length must match schema field count"
665+
);
666+
let cache = Arc::new(PlanProperties::new(
667+
EquivalenceProperties::new(Arc::clone(&schema)),
668+
Partitioning::UnknownPartitioning(1),
669+
EmissionType::Incremental,
670+
Boundedness::Bounded,
671+
));
672+
Self {
673+
schema,
674+
stats,
675+
cache,
676+
}
677+
}
678+
}
679+
680+
impl DisplayAs for StatisticsTable {
681+
fn fmt_as(
682+
&self,
683+
_t: DisplayFormatType,
684+
f: &mut std::fmt::Formatter,
685+
) -> std::fmt::Result {
686+
write!(f, "StatisticsTable")
687+
}
688+
}
689+
690+
impl ExecutionPlan for StatisticsTable {
691+
fn name(&self) -> &'static str {
692+
"StatisticsTable"
693+
}
694+
695+
fn properties(&self) -> &Arc<PlanProperties> {
696+
&self.cache
697+
}
698+
699+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
700+
vec![]
701+
}
702+
703+
fn apply_expressions(
704+
&self,
705+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
706+
) -> Result<TreeNodeRecursion> {
707+
Ok(TreeNodeRecursion::Continue)
708+
}
709+
710+
fn with_new_children(
711+
self: Arc<Self>,
712+
_children: Vec<Arc<dyn ExecutionPlan>>,
713+
) -> Result<Arc<dyn ExecutionPlan>> {
714+
Ok(self)
715+
}
716+
717+
fn execute(
718+
&self,
719+
_partition: usize,
720+
_context: Arc<TaskContext>,
721+
) -> Result<SendableRecordBatchStream> {
722+
datafusion::common::not_impl_err!(
723+
"StatisticsTable is for statistics testing only"
724+
)
725+
}
726+
727+
fn partition_statistics(
728+
&self,
729+
_partition: Option<usize>,
730+
) -> Result<Arc<Statistics>> {
731+
Ok(Arc::new(self.stats.clone()))
732+
}
733+
}
734+
735+
#[async_trait]
736+
impl TableProvider for StatisticsTable {
737+
fn schema(&self) -> SchemaRef {
738+
Arc::clone(&self.schema)
739+
}
740+
741+
fn table_type(&self) -> TableType {
742+
TableType::Base
743+
}
744+
745+
async fn scan(
746+
&self,
747+
_state: &dyn Session,
748+
projection: Option<&Vec<usize>>,
749+
_filters: &[Expr],
750+
_limit: Option<usize>,
751+
) -> Result<Arc<dyn ExecutionPlan>> {
752+
let schema = datafusion::common::project_schema(&self.schema, projection)?;
753+
let stats = self.stats.clone().project(projection);
754+
Ok(Arc::new(StatisticsTable::new(schema, stats)))
755+
}
756+
}
757+
758+
/// Registers named [`StatisticsTable`] instances needed by SLT tests
759+
/// that require controlled statistics (NDV, row count, min/max).
760+
pub fn register_statistics_tables(ctx: &SessionContext) {
761+
// t_ndv: 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5).
762+
let schema = Arc::new(Schema::new(vec![
763+
Field::new("a", DataType::Int64, false),
764+
Field::new("b", DataType::Int64, false),
765+
]));
766+
let stats = Statistics {
767+
num_rows: Precision::Inexact(1000),
768+
total_byte_size: Precision::Absent,
769+
column_statistics: vec![
770+
ColumnStatistics {
771+
distinct_count: Precision::Inexact(10),
772+
..Default::default()
773+
},
774+
ColumnStatistics {
775+
distinct_count: Precision::Inexact(5),
776+
..Default::default()
777+
},
778+
],
779+
};
780+
ctx.register_table("t_ndv", Arc::new(StatisticsTable::new(schema, stats)))
781+
.expect("registering t_ndv should succeed");
782+
}
783+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Tests for ExpressionAnalyzerRegistry end-to-end integration.
19+
#
20+
# t_ndv is a StatisticsTable with controlled statistics:
21+
# 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5).
22+
#
23+
# OR predicates are not expressible as a single interval, so the built-in
24+
# interval-arithmetic path always falls back to the default selectivity (20%).
25+
# ExpressionAnalyzerRegistry applies inclusion-exclusion instead:
26+
# P(a=42 OR b=5) = P(a=42) + P(b=5) - P(a=42)*P(b=5)
27+
# = 1/10 + 1/5 - 1/50
28+
# = 0.1 + 0.2 - 0.02 = 0.28
29+
# Expected rows = round(1000 * 0.28) = 280.
30+
31+
statement ok
32+
SET datafusion.execution.target_partitions = 1;
33+
34+
statement ok
35+
SET datafusion.explain.show_statistics = true;
36+
37+
statement ok
38+
SET datafusion.explain.physical_plan_only = true;
39+
40+
# Without ExpressionAnalyzerRegistry: OR predicate falls back to 20% default selectivity
41+
# → FilterExec estimated rows = 1000 * 0.20 = 200
42+
query TT
43+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5;
44+
----
45+
physical_plan
46+
01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(200), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
47+
02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
48+
03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
49+
50+
# Enable ExpressionAnalyzerRegistry so inclusion-exclusion applies to OR predicates
51+
statement ok
52+
SET datafusion.optimizer.use_expression_analyzer = true;
53+
54+
# With ExpressionAnalyzerRegistry: OR uses inclusion-exclusion
55+
# P(a=42 OR b=5) = 1/10 + 1/5 - (1/10 * 1/5) = 0.28 → 280 rows
56+
query TT
57+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5;
58+
----
59+
physical_plan
60+
01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
61+
02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
62+
03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
63+
64+
# Verify the registry survives physical optimizer rules: ORDER BY + LIMIT triggers the
65+
# TopK sort rule which rewrites the plan above FilterExec. The FilterExec row estimate
66+
# must still reflect inclusion-exclusion (280), not the 20% default.
67+
query TT
68+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5 ORDER BY a LIMIT 100;
69+
----
70+
physical_plan
71+
01)SortExec: TopK(fetch=100), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Inexact(100), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
72+
02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
73+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
74+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
75+
76+
# Verify the registry reaches FilterExec nodes created by optimizer rules: the
77+
# filter_pushdown rule running on UnionExec creates fresh FilterExec nodes (one per
78+
# branch) that never existed when the registry was first injected. Both must show
79+
# 280 rows, confirming re-injection after each rule reaches newly created nodes.
80+
# The UnionExec row count (560 = 2 * 280) and doubled NDVs (20, 10) also confirm
81+
# that distinct-count propagation through UnionExec is correct.
82+
query TT
83+
EXPLAIN SELECT * FROM (SELECT * FROM t_ndv UNION ALL SELECT * FROM t_ndv) WHERE a = 42 OR b = 5;
84+
----
85+
physical_plan
86+
01)UnionExec, statistics=[Rows=Inexact(560), Bytes=Absent, [(Col[0]: Distinct=Inexact(20)),(Col[1]: Distinct=Inexact(10))]]
87+
02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
88+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
89+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
90+
05)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
91+
06)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
92+
07)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
93+
94+
# Verify the registry reaches a FilterExec pushed through a join: filter_pushdown
95+
# moves the WHERE clause filter to the left side of the HashJoinExec, creating a
96+
# FilterExec that was not present in the plan at initial injection time.
97+
query TT
98+
EXPLAIN SELECT l.a, r.b FROM t_ndv l JOIN t_ndv r ON l.a = r.a WHERE l.a = 42 OR l.b = 5;
99+
----
100+
physical_plan
101+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, b@2], statistics=[Rows=Inexact(28000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
102+
02)--FilterExec: a@0 = 42 OR b@1 = 5, projection=[a@0], statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10))]]
103+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
104+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
105+
05)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
106+
06)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
107+
108+
statement ok
109+
SET datafusion.optimizer.use_expression_analyzer = false;
110+
111+
statement ok
112+
SET datafusion.explain.show_statistics = false;
113+
114+
statement ok
115+
SET datafusion.explain.physical_plan_only = false;
116+
117+
statement ok
118+
SET datafusion.execution.target_partitions = 4;

0 commit comments

Comments
 (0)