Skip to content

Commit daa8f52

Browse files
authored
fix: interval analysis error when have two filterexec that inner filter proves zero selectivity (#20743)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20742 ## Rationale for this change - see #20742 ## What changes are included in this PR? In `collect_new_statistics`, when a filter proves no rows can match, use a typed null (e.g., ScalarValue::Int32(None)) instead of untyped ScalarValue::Null for column min/max/sum values. The column's data type is looked up from the schema so that downstream interval analysis can still intersect intervals of the same type. ## Are these changes tested? add one test case ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 1f87930 commit daa8f52

2 files changed

Lines changed: 93 additions & 22 deletions

File tree

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,17 +390,17 @@ mod test {
390390
column_statistics: vec![
391391
ColumnStatistics {
392392
null_count: Precision::Exact(0),
393-
max_value: Precision::Exact(ScalarValue::Null),
394-
min_value: Precision::Exact(ScalarValue::Null),
395-
sum_value: Precision::Exact(ScalarValue::Null),
393+
max_value: Precision::Exact(ScalarValue::Int32(None)),
394+
min_value: Precision::Exact(ScalarValue::Int32(None)),
395+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
396396
distinct_count: Precision::Exact(0),
397397
byte_size: Precision::Exact(16),
398398
},
399399
ColumnStatistics {
400400
null_count: Precision::Exact(0),
401-
max_value: Precision::Exact(ScalarValue::Null),
402-
min_value: Precision::Exact(ScalarValue::Null),
403-
sum_value: Precision::Exact(ScalarValue::Null),
401+
max_value: Precision::Exact(ScalarValue::Date32(None)),
402+
min_value: Precision::Exact(ScalarValue::Date32(None)),
403+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
404404
distinct_count: Precision::Exact(0),
405405
byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32)
406406
},
@@ -419,17 +419,17 @@ mod test {
419419
column_statistics: vec![
420420
ColumnStatistics {
421421
null_count: Precision::Exact(0),
422-
max_value: Precision::Exact(ScalarValue::Null),
423-
min_value: Precision::Exact(ScalarValue::Null),
424-
sum_value: Precision::Exact(ScalarValue::Null),
422+
max_value: Precision::Exact(ScalarValue::Int32(None)),
423+
min_value: Precision::Exact(ScalarValue::Int32(None)),
424+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
425425
distinct_count: Precision::Exact(0),
426426
byte_size: Precision::Exact(8),
427427
},
428428
ColumnStatistics {
429429
null_count: Precision::Exact(0),
430-
max_value: Precision::Exact(ScalarValue::Null),
431-
min_value: Precision::Exact(ScalarValue::Null),
432-
sum_value: Precision::Exact(ScalarValue::Null),
430+
max_value: Precision::Exact(ScalarValue::Date32(None)),
431+
min_value: Precision::Exact(ScalarValue::Date32(None)),
432+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
433433
distinct_count: Precision::Exact(0),
434434
byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32)
435435
},

datafusion/physical-plan/src/filter.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ impl FilterExec {
338338
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
339339

340340
let column_statistics = collect_new_statistics(
341+
schema,
341342
&input_stats.column_statistics,
342343
analysis_ctx.boundaries,
343344
);
@@ -785,6 +786,7 @@ fn interval_bound_to_precision(
785786
/// is adjusted by using the next/previous value for its data type to convert
786787
/// it into a closed bound.
787788
fn collect_new_statistics(
789+
schema: &SchemaRef,
788790
input_column_stats: &[ColumnStatistics],
789791
analysis_boundaries: Vec<ExprBoundaries>,
790792
) -> Vec<ColumnStatistics> {
@@ -801,12 +803,17 @@ fn collect_new_statistics(
801803
},
802804
)| {
803805
let Some(interval) = interval else {
804-
// If the interval is `None`, we can say that there are no rows:
806+
// If the interval is `None`, we can say that there are no rows.
807+
// Use a typed null to preserve the column's data type, so that
808+
// downstream interval analysis can still intersect intervals
809+
// of the same type.
810+
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
811+
.unwrap_or(ScalarValue::Null);
805812
return ColumnStatistics {
806813
null_count: Precision::Exact(0),
807-
max_value: Precision::Exact(ScalarValue::Null),
808-
min_value: Precision::Exact(ScalarValue::Null),
809-
sum_value: Precision::Exact(ScalarValue::Null),
814+
max_value: Precision::Exact(typed_null.clone()),
815+
min_value: Precision::Exact(typed_null.clone()),
816+
sum_value: Precision::Exact(typed_null),
810817
distinct_count: Precision::Exact(0),
811818
byte_size: input_column_stats[idx].byte_size,
812819
};
@@ -1497,17 +1504,17 @@ mod tests {
14971504
statistics.column_statistics,
14981505
vec![
14991506
ColumnStatistics {
1500-
min_value: Precision::Exact(ScalarValue::Null),
1501-
max_value: Precision::Exact(ScalarValue::Null),
1502-
sum_value: Precision::Exact(ScalarValue::Null),
1507+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1508+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1509+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
15031510
distinct_count: Precision::Exact(0),
15041511
null_count: Precision::Exact(0),
15051512
byte_size: Precision::Absent,
15061513
},
15071514
ColumnStatistics {
1508-
min_value: Precision::Exact(ScalarValue::Null),
1509-
max_value: Precision::Exact(ScalarValue::Null),
1510-
sum_value: Precision::Exact(ScalarValue::Null),
1515+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1516+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1517+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
15111518
distinct_count: Precision::Exact(0),
15121519
null_count: Precision::Exact(0),
15131520
byte_size: Precision::Absent,
@@ -1518,6 +1525,70 @@ mod tests {
15181525
Ok(())
15191526
}
15201527

1528+
/// Regression test: stacking two FilterExecs where the inner filter
1529+
/// proves zero selectivity should not panic with a type mismatch
1530+
/// during interval intersection.
1531+
///
1532+
/// Previously, when a filter proved no rows could match, the column
1533+
/// statistics used untyped `ScalarValue::Null` (data type `Null`).
1534+
/// If an outer FilterExec then tried to analyze its own predicate
1535+
/// against those statistics, `Interval::intersect` would fail with:
1536+
/// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1537+
#[tokio::test]
1538+
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1539+
// Inner table: a: [1, 100], b: [1, 3]
1540+
let schema = Schema::new(vec![
1541+
Field::new("a", DataType::Int32, false),
1542+
Field::new("b", DataType::Int32, false),
1543+
]);
1544+
let input = Arc::new(StatisticsExec::new(
1545+
Statistics {
1546+
num_rows: Precision::Inexact(1000),
1547+
total_byte_size: Precision::Inexact(4000),
1548+
column_statistics: vec![
1549+
ColumnStatistics {
1550+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1551+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1552+
..Default::default()
1553+
},
1554+
ColumnStatistics {
1555+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1556+
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1557+
..Default::default()
1558+
},
1559+
],
1560+
},
1561+
schema,
1562+
));
1563+
1564+
// Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1565+
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1566+
Arc::new(Column::new("a", 0)),
1567+
Operator::Gt,
1568+
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1569+
));
1570+
let inner_filter: Arc<dyn ExecutionPlan> =
1571+
Arc::new(FilterExec::try_new(inner_predicate, input)?);
1572+
1573+
// Outer filter: a = 50
1574+
// Before the fix, this would panic because the inner filter's
1575+
// zero-selectivity statistics produced Null-typed intervals for
1576+
// column `a`, which couldn't intersect with the Int32 literal.
1577+
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1578+
Arc::new(Column::new("a", 0)),
1579+
Operator::Eq,
1580+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1581+
));
1582+
let outer_filter: Arc<dyn ExecutionPlan> =
1583+
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1584+
1585+
// Should succeed without error
1586+
let statistics = outer_filter.partition_statistics(None)?;
1587+
assert_eq!(statistics.num_rows, Precision::Inexact(0));
1588+
1589+
Ok(())
1590+
}
1591+
15211592
#[tokio::test]
15221593
async fn test_filter_statistics_more_inputs() -> Result<()> {
15231594
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)