Skip to content

Commit a4b945e

Browse files
feat: Set NDV to Exact(1) for numeric equality filter predicates (#21077)
## Which issue does this PR close? - Part of #20766 Related: #20789 (uses NDV for equality filter selectivity, complementary - this PR improves the NDV output stats, that PR consumes them) ## Rationale for this change When a filter predicate collapses a column interval to a single value (e.g. `d_qoy = 1`), the output column can only have one distinct value. Currently `distinct_count` is always demoted to `Inexact`, losing this information. This matters for downstream optimizers that rely on `distinct_count`, such as join cardinality estimation in `estimate_inner_join_cardinality`. ## What changes are included in this PR? In `collect_new_statistics` (filter.rs), when the post-filter interval has `lower == upper` (both non-null), set `distinct_count` to `Precision::Exact(1)` instead of demoting the input NDV to `Inexact`. ## Are these changes tested? Yes, 4 unit tests: - Equality predicate (`a = 42`) -> NDV becomes `Exact(1)` - OR predicate (`a = 42 OR a = 22`) -> interval does not collapse, NDV stays `Inexact` - AND with mixed predicates (`a = 42 AND b > 10 AND c = 7`) -> `a` and `c` get `Exact(1)`, `b` stays `Inexact` - Equality with absent bounds (`a = 42`, no min/max) -> interval analysis still resolves to `Exact(1)` ## Are there any user-facing changes? No breaking changes. Statistics consumers will now see `Exact(1)` for `distinct_count` on columns constrained to a single value by filter predicates. Disclaimer: I used AI to assist in the code generation, I have manually reviewed the output and it matches my intention and understanding. --------- Co-authored-by: xudong.w <wxd963996380@gmail.com>
1 parent cdfade5 commit a4b945e

2 files changed

Lines changed: 407 additions & 7 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 335 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -807,15 +807,23 @@ fn collect_new_statistics(
807807
};
808808
};
809809
let (lower, upper) = interval.into_bounds();
810-
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
811-
let min_value = interval_bound_to_precision(lower, is_exact);
812-
let max_value = interval_bound_to_precision(upper, is_exact);
810+
let is_single_value =
811+
!lower.is_null() && !upper.is_null() && lower == upper;
812+
let min_value = interval_bound_to_precision(lower, is_single_value);
813+
let max_value = interval_bound_to_precision(upper, is_single_value);
814+
// When the interval collapses to a single value (equality
815+
// predicate), the column has exactly 1 distinct value
816+
let capped_distinct_count = if is_single_value {
817+
Precision::Exact(1)
818+
} else {
819+
distinct_count.to_inexact()
820+
};
813821
ColumnStatistics {
814822
null_count: input_column_stats[idx].null_count.to_inexact(),
815823
max_value,
816824
min_value,
817825
sum_value: Precision::Absent,
818-
distinct_count: distinct_count.to_inexact(),
826+
distinct_count: capped_distinct_count,
819827
byte_size: input_column_stats[idx].byte_size,
820828
}
821829
},
@@ -2241,4 +2249,327 @@ mod tests {
22412249

22422250
Ok(())
22432251
}
2252+
2253+
#[tokio::test]
2254+
async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> {
2255+
// a: min=1, max=100, ndv=80
2256+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2257+
let input = Arc::new(StatisticsExec::new(
2258+
Statistics {
2259+
num_rows: Precision::Inexact(100),
2260+
total_byte_size: Precision::Inexact(400),
2261+
column_statistics: vec![ColumnStatistics {
2262+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2263+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2264+
distinct_count: Precision::Inexact(80),
2265+
..Default::default()
2266+
}],
2267+
},
2268+
schema.clone(),
2269+
));
2270+
2271+
// a = 42 collapses interval to a single value
2272+
let predicate = Arc::new(BinaryExpr::new(
2273+
Arc::new(Column::new("a", 0)),
2274+
Operator::Eq,
2275+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2276+
));
2277+
let filter: Arc<dyn ExecutionPlan> =
2278+
Arc::new(FilterExec::try_new(predicate, input)?);
2279+
let statistics = filter.partition_statistics(None)?;
2280+
assert_eq!(
2281+
statistics.column_statistics[0].distinct_count,
2282+
Precision::Exact(1)
2283+
);
2284+
Ok(())
2285+
}
2286+
2287+
#[tokio::test]
2288+
async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> {
2289+
// a: min=1, max=100, ndv=80
2290+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2291+
let input = Arc::new(StatisticsExec::new(
2292+
Statistics {
2293+
num_rows: Precision::Inexact(100),
2294+
total_byte_size: Precision::Inexact(400),
2295+
column_statistics: vec![ColumnStatistics {
2296+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2297+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2298+
distinct_count: Precision::Inexact(80),
2299+
..Default::default()
2300+
}],
2301+
},
2302+
schema.clone(),
2303+
));
2304+
2305+
// a = 42 OR a = 22: interval stays [1, 100], not a single value
2306+
let predicate = Arc::new(BinaryExpr::new(
2307+
Arc::new(BinaryExpr::new(
2308+
Arc::new(Column::new("a", 0)),
2309+
Operator::Eq,
2310+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2311+
)),
2312+
Operator::Or,
2313+
Arc::new(BinaryExpr::new(
2314+
Arc::new(Column::new("a", 0)),
2315+
Operator::Eq,
2316+
Arc::new(Literal::new(ScalarValue::Int32(Some(22)))),
2317+
)),
2318+
));
2319+
let filter: Arc<dyn ExecutionPlan> =
2320+
Arc::new(FilterExec::try_new(predicate, input)?);
2321+
let statistics = filter.partition_statistics(None)?;
2322+
assert_eq!(
2323+
statistics.column_statistics[0].distinct_count,
2324+
Precision::Inexact(80)
2325+
);
2326+
Ok(())
2327+
}
2328+
2329+
#[tokio::test]
2330+
async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
2331+
// a: min=1, max=100, ndv=80
2332+
// b: min=1, max=50, ndv=40
2333+
// c: min=1, max=200, ndv=150
2334+
let schema = Schema::new(vec![
2335+
Field::new("a", DataType::Int32, false),
2336+
Field::new("b", DataType::Int32, false),
2337+
Field::new("c", DataType::Int32, false),
2338+
]);
2339+
let input = Arc::new(StatisticsExec::new(
2340+
Statistics {
2341+
num_rows: Precision::Inexact(100),
2342+
total_byte_size: Precision::Inexact(1200),
2343+
column_statistics: vec![
2344+
ColumnStatistics {
2345+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2346+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2347+
distinct_count: Precision::Inexact(80),
2348+
..Default::default()
2349+
},
2350+
ColumnStatistics {
2351+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2352+
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2353+
distinct_count: Precision::Inexact(40),
2354+
..Default::default()
2355+
},
2356+
ColumnStatistics {
2357+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2358+
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2359+
distinct_count: Precision::Inexact(150),
2360+
..Default::default()
2361+
},
2362+
],
2363+
},
2364+
schema.clone(),
2365+
));
2366+
2367+
// a = 42 AND b > 10 AND c = 7
2368+
let predicate = Arc::new(BinaryExpr::new(
2369+
Arc::new(BinaryExpr::new(
2370+
Arc::new(BinaryExpr::new(
2371+
Arc::new(Column::new("a", 0)),
2372+
Operator::Eq,
2373+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2374+
)),
2375+
Operator::And,
2376+
Arc::new(BinaryExpr::new(
2377+
Arc::new(Column::new("b", 1)),
2378+
Operator::Gt,
2379+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2380+
)),
2381+
)),
2382+
Operator::And,
2383+
Arc::new(BinaryExpr::new(
2384+
Arc::new(Column::new("c", 2)),
2385+
Operator::Eq,
2386+
Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
2387+
)),
2388+
));
2389+
let filter: Arc<dyn ExecutionPlan> =
2390+
Arc::new(FilterExec::try_new(predicate, input)?);
2391+
let statistics = filter.partition_statistics(None)?;
2392+
// a = 42 collapses to single value
2393+
assert_eq!(
2394+
statistics.column_statistics[0].distinct_count,
2395+
Precision::Exact(1)
2396+
);
2397+
// b > 10 narrows to [11, 50] but doesn't collapse
2398+
assert_eq!(
2399+
statistics.column_statistics[1].distinct_count,
2400+
Precision::Inexact(40)
2401+
);
2402+
// c = 7 collapses to single value
2403+
assert_eq!(
2404+
statistics.column_statistics[2].distinct_count,
2405+
Precision::Exact(1)
2406+
);
2407+
Ok(())
2408+
}
2409+
2410+
#[tokio::test]
2411+
async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
2412+
// a: ndv=80, no min/max
2413+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2414+
let input = Arc::new(StatisticsExec::new(
2415+
Statistics {
2416+
num_rows: Precision::Inexact(100),
2417+
total_byte_size: Precision::Inexact(400),
2418+
column_statistics: vec![ColumnStatistics {
2419+
distinct_count: Precision::Inexact(80),
2420+
..Default::default()
2421+
}],
2422+
},
2423+
schema.clone(),
2424+
));
2425+
2426+
// a = 42: even without known bounds, interval analysis resolves
2427+
// the equality to [42, 42], so NDV is correctly set to Exact(1)
2428+
let predicate = Arc::new(BinaryExpr::new(
2429+
Arc::new(Column::new("a", 0)),
2430+
Operator::Eq,
2431+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2432+
));
2433+
let filter: Arc<dyn ExecutionPlan> =
2434+
Arc::new(FilterExec::try_new(predicate, input)?);
2435+
let statistics = filter.partition_statistics(None)?;
2436+
assert_eq!(
2437+
statistics.column_statistics[0].distinct_count,
2438+
Precision::Exact(1)
2439+
);
2440+
Ok(())
2441+
}
2442+
2443+
#[tokio::test]
2444+
async fn test_filter_statistics_equality_int8_ndv() -> Result<()> {
2445+
// a: min=-100, max=100, ndv=50
2446+
let schema = Schema::new(vec![Field::new("a", DataType::Int8, false)]);
2447+
let input = Arc::new(StatisticsExec::new(
2448+
Statistics {
2449+
num_rows: Precision::Inexact(100),
2450+
total_byte_size: Precision::Inexact(100),
2451+
column_statistics: vec![ColumnStatistics {
2452+
min_value: Precision::Inexact(ScalarValue::Int8(Some(-100))),
2453+
max_value: Precision::Inexact(ScalarValue::Int8(Some(100))),
2454+
distinct_count: Precision::Inexact(50),
2455+
..Default::default()
2456+
}],
2457+
},
2458+
schema.clone(),
2459+
));
2460+
2461+
let predicate = Arc::new(BinaryExpr::new(
2462+
Arc::new(Column::new("a", 0)),
2463+
Operator::Eq,
2464+
Arc::new(Literal::new(ScalarValue::Int8(Some(42)))),
2465+
));
2466+
let filter: Arc<dyn ExecutionPlan> =
2467+
Arc::new(FilterExec::try_new(predicate, input)?);
2468+
let statistics = filter.partition_statistics(None)?;
2469+
assert_eq!(
2470+
statistics.column_statistics[0].distinct_count,
2471+
Precision::Exact(1)
2472+
);
2473+
Ok(())
2474+
}
2475+
2476+
#[tokio::test]
2477+
async fn test_filter_statistics_equality_int64_ndv() -> Result<()> {
2478+
// a: min=0, max=1_000_000, ndv=100_000
2479+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
2480+
let input = Arc::new(StatisticsExec::new(
2481+
Statistics {
2482+
num_rows: Precision::Inexact(100_000),
2483+
total_byte_size: Precision::Inexact(800_000),
2484+
column_statistics: vec![ColumnStatistics {
2485+
min_value: Precision::Inexact(ScalarValue::Int64(Some(0))),
2486+
max_value: Precision::Inexact(ScalarValue::Int64(Some(1_000_000))),
2487+
distinct_count: Precision::Inexact(100_000),
2488+
..Default::default()
2489+
}],
2490+
},
2491+
schema.clone(),
2492+
));
2493+
2494+
let predicate = Arc::new(BinaryExpr::new(
2495+
Arc::new(Column::new("a", 0)),
2496+
Operator::Eq,
2497+
Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2498+
));
2499+
let filter: Arc<dyn ExecutionPlan> =
2500+
Arc::new(FilterExec::try_new(predicate, input)?);
2501+
let statistics = filter.partition_statistics(None)?;
2502+
assert_eq!(
2503+
statistics.column_statistics[0].distinct_count,
2504+
Precision::Exact(1)
2505+
);
2506+
Ok(())
2507+
}
2508+
2509+
#[tokio::test]
2510+
async fn test_filter_statistics_equality_float32_ndv() -> Result<()> {
2511+
// a: min=0.0, max=100.0, ndv=50
2512+
let schema = Schema::new(vec![Field::new("a", DataType::Float32, false)]);
2513+
let input = Arc::new(StatisticsExec::new(
2514+
Statistics {
2515+
num_rows: Precision::Inexact(100),
2516+
total_byte_size: Precision::Inexact(400),
2517+
column_statistics: vec![ColumnStatistics {
2518+
min_value: Precision::Inexact(ScalarValue::Float32(Some(0.0))),
2519+
max_value: Precision::Inexact(ScalarValue::Float32(Some(100.0))),
2520+
distinct_count: Precision::Inexact(50),
2521+
..Default::default()
2522+
}],
2523+
},
2524+
schema.clone(),
2525+
));
2526+
2527+
let predicate = Arc::new(BinaryExpr::new(
2528+
Arc::new(Column::new("a", 0)),
2529+
Operator::Eq,
2530+
Arc::new(Literal::new(ScalarValue::Float32(Some(42.5)))),
2531+
));
2532+
let filter: Arc<dyn ExecutionPlan> =
2533+
Arc::new(FilterExec::try_new(predicate, input)?);
2534+
let statistics = filter.partition_statistics(None)?;
2535+
assert_eq!(
2536+
statistics.column_statistics[0].distinct_count,
2537+
Precision::Exact(1)
2538+
);
2539+
Ok(())
2540+
}
2541+
2542+
#[tokio::test]
2543+
async fn test_filter_statistics_equality_reversed_ndv() -> Result<()> {
2544+
// a: min=1, max=100, ndv=80
2545+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2546+
let input = Arc::new(StatisticsExec::new(
2547+
Statistics {
2548+
num_rows: Precision::Inexact(100),
2549+
total_byte_size: Precision::Inexact(400),
2550+
column_statistics: vec![ColumnStatistics {
2551+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2552+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2553+
distinct_count: Precision::Inexact(80),
2554+
..Default::default()
2555+
}],
2556+
},
2557+
schema.clone(),
2558+
));
2559+
2560+
// 42 = a (literal on the left)
2561+
let predicate = Arc::new(BinaryExpr::new(
2562+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2563+
Operator::Eq,
2564+
Arc::new(Column::new("a", 0)),
2565+
));
2566+
let filter: Arc<dyn ExecutionPlan> =
2567+
Arc::new(FilterExec::try_new(predicate, input)?);
2568+
let statistics = filter.partition_statistics(None)?;
2569+
assert_eq!(
2570+
statistics.column_statistics[0].distinct_count,
2571+
Precision::Exact(1)
2572+
);
2573+
Ok(())
2574+
}
22442575
}

0 commit comments

Comments
 (0)