Skip to content

Commit a89b527

Browse files
jonathanc-nalamb
andauthored
feat: Use NDV for equality filter selectivity calculation (#20789)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #20766 . ## Rationale for this change Used NDV for equality filter selectivity calculation. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Check if it is equality filter, and selectivity will be calculated using (1/NDV) <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 5b0938d commit a89b527

2 files changed

Lines changed: 144 additions & 6 deletions

File tree

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ async fn sql_filter() -> Result<()> {
272272

273273
let physical_plan = df.create_physical_plan().await.unwrap();
274274
let stats = physical_plan.partition_statistics(None)?;
275-
assert_eq!(stats.num_rows, Precision::Inexact(1));
275+
assert_eq!(stats.num_rows, Precision::Inexact(7));
276276

277277
Ok(())
278278
}

datafusion/physical-expr/src/analysis.rs

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ pub fn analyze(
167167
schema: &Schema,
168168
) -> Result<AnalysisContext> {
169169
let initial_boundaries = &context.boundaries;
170+
170171
if initial_boundaries
171172
.iter()
172173
.all(|bound| bound.interval.is_none())
@@ -259,6 +260,44 @@ fn shrink_boundaries(
259260
Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
260261
}
261262

263+
/// Returns `Some(1.0 / distinct_count)` when the filter demonstrably collapsed
264+
/// a non-singleton interval down to a single point, i.e. an equality predicate
265+
/// was applied. Returns `None` in all other cases, signalling that the caller
266+
/// should fall back to [`cardinality_ratio`].
267+
///
268+
/// The `initial_interval` guard prevents double-counting selectivity when the
269+
/// column statistics already described a singleton before any filter was
270+
/// applied: if the initial interval was already the same single point, no
271+
/// additional selectivity has been gained and the `1 / NDV` shortcut must not
272+
/// fire.
273+
fn singleton_selectivity(
274+
initial_interval: &Interval,
275+
target_interval: &Interval,
276+
distinct_count: usize,
277+
) -> Option<f64> {
278+
// The target must have collapsed to a single non-null value.
279+
if distinct_count == 0
280+
|| target_interval.lower().is_null()
281+
|| target_interval.lower() != target_interval.upper()
282+
{
283+
return None;
284+
}
285+
286+
// Only treat this as a newly-applied equality filter when the initial
287+
// interval was not already that same singleton. If it was, the stats
288+
// already encoded this restriction and applying 1/NDV again would
289+
// under-estimate the row count.
290+
let initial_is_same_singleton = !initial_interval.lower().is_null()
291+
&& initial_interval.lower() == initial_interval.upper()
292+
&& initial_interval.lower() == target_interval.lower();
293+
294+
if initial_is_same_singleton {
295+
return None;
296+
}
297+
298+
Some(1.0 / distinct_count as f64)
299+
}
300+
262301
/// This function calculates the filter predicate's selectivity by comparing
263302
/// the initial and pruned column boundaries. Selectivity is defined as the
264303
/// ratio of rows in a table that satisfy the filter's predicate.
@@ -277,8 +316,19 @@ fn calculate_selectivity(
277316
let mut acc: f64 = 1.0;
278317
for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
279318
match (initial.interval.as_ref(), target.interval.as_ref()) {
280-
(Some(initial), Some(target)) => {
281-
acc *= cardinality_ratio(initial, target);
319+
(Some(initial_interval), Some(target_interval)) => {
320+
if let Precision::Exact(distinct_count)
321+
| Precision::Inexact(distinct_count) = target.distinct_count
322+
&& let Some(s) = singleton_selectivity(
323+
initial_interval,
324+
target_interval,
325+
distinct_count,
326+
)
327+
{
328+
acc *= s;
329+
continue;
330+
}
331+
acc *= cardinality_ratio(initial_interval, target_interval);
282332
}
283333
(None, Some(_)) => {
284334
return internal_err!(
@@ -297,14 +347,14 @@ mod tests {
297347
use std::sync::Arc;
298348

299349
use arrow::datatypes::{DataType, Field, Schema};
300-
use datafusion_common::{DFSchema, assert_contains};
350+
use datafusion_common::{DFSchema, ScalarValue, assert_contains, stats::Precision};
301351
use datafusion_expr::{
302352
Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit,
303353
};
304354

305-
use crate::{AnalysisContext, create_physical_expr};
355+
use crate::{AnalysisContext, create_physical_expr, expressions::Column};
306356

307-
use super::{ExprBoundaries, analyze};
357+
use super::{ExprBoundaries, analyze, calculate_selectivity, singleton_selectivity};
308358

309359
fn make_field(name: &str, data_type: DataType) -> Field {
310360
let nullable = false;
@@ -435,4 +485,92 @@ mod tests {
435485
.unwrap_err();
436486
assert_contains!(analysis_error.to_string(), expected_error);
437487
}
488+
489+
// ---------------------------------------------------------------------------
490+
// Unit tests for singleton_selectivity and calculate_selectivity
491+
// ---------------------------------------------------------------------------
492+
493+
fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries {
494+
ExprBoundaries {
495+
column: Column::new("a", 0),
496+
interval: Some(
497+
Interval::try_new(
498+
ScalarValue::Int32(Some(lower)),
499+
ScalarValue::Int32(Some(upper)),
500+
)
501+
.unwrap(),
502+
),
503+
distinct_count: Precision::Exact(distinct_count),
504+
}
505+
}
506+
507+
/// When the initial interval is already the same singleton as the target,
508+
/// `singleton_selectivity` must return `None` so we do not double-apply
509+
/// 1/NDV selectivity.
510+
#[test]
511+
fn test_singleton_selectivity_skipped_when_initial_is_same_singleton() {
512+
let singleton =
513+
Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
514+
.unwrap();
515+
// Both initial and target are [5, 5] — no new equality filter was applied.
516+
assert_eq!(
517+
singleton_selectivity(&singleton, &singleton, 10),
518+
None,
519+
"shortcut must not fire when initial interval was already the same singleton"
520+
);
521+
}
522+
523+
/// When the initial interval is a broader range and the target collapses to
524+
/// a singleton, `singleton_selectivity` must return `Some(1/NDV)`.
525+
#[test]
526+
fn test_singleton_selectivity_applied_when_range_collapses() {
527+
let initial =
528+
Interval::try_new(ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))
529+
.unwrap();
530+
let target =
531+
Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
532+
.unwrap();
533+
let result = singleton_selectivity(&initial, &target, 10);
534+
assert_eq!(
535+
result,
536+
Some(0.1),
537+
"shortcut must return 1/NDV when a range collapses to a singleton"
538+
);
539+
}
540+
541+
/// Regression test: `calculate_selectivity` must not apply the `1/NDV`
542+
/// shortcut when the column statistics already describe a singleton interval
543+
/// (i.e. before the filter, the column only ever held one value). In that
544+
/// case the target and initial intervals are the same singleton, so the
545+
/// cardinality ratio is 1.0 and the overall selectivity should remain 1.0.
546+
#[test]
547+
fn test_calculate_selectivity_already_singleton_initial_interval() {
548+
let already_singleton = make_boundary(7, 7, 1);
549+
550+
let selectivity = calculate_selectivity(
551+
std::slice::from_ref(&already_singleton),
552+
std::slice::from_ref(&already_singleton),
553+
)
554+
.unwrap();
555+
556+
let wide_initial = make_boundary(1, 100, 50);
557+
let same_singleton_target = make_boundary(7, 7, 50);
558+
let selectivity_new =
559+
calculate_selectivity(&[same_singleton_target], &[wide_initial]).unwrap();
560+
assert!(
561+
(selectivity_new - 0.02).abs() < 1e-10,
562+
"expected selectivity 1/NDV = 0.02, got {selectivity_new}"
563+
);
564+
565+
let singleton_initial = make_boundary(7, 7, 50);
566+
let singleton_target = make_boundary(7, 7, 50);
567+
let selectivity_no_new_filter =
568+
calculate_selectivity(&[singleton_target], &[singleton_initial]).unwrap();
569+
assert!(
570+
(selectivity_no_new_filter - 1.0).abs() < 1e-10,
571+
"expected selectivity 1.0 when initial was already the same singleton, got {selectivity_no_new_filter}"
572+
);
573+
574+
let _ = selectivity; // silence unused warning
575+
}
438576
}

0 commit comments

Comments
 (0)