Skip to content

Commit c62a5cb

Browse files
committed
Snapshot column stats for analyzer to avoid ordering dependency
1 parent f6f27ac commit c62a5cb

1 file changed

Lines changed: 120 additions & 4 deletions

File tree

datafusion/physical-expr/src/projection.rs

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,15 @@ impl ProjectionExprs {
688688
mut stats: Statistics,
689689
output_schema: &Schema,
690690
) -> Result<Statistics> {
691+
// Snapshot for analyzer lookups
692+
let original_stats =
693+
self.expression_analyzer_registry
694+
.as_ref()
695+
.map(|_| Statistics {
696+
num_rows: stats.num_rows,
697+
total_byte_size: stats.total_byte_size,
698+
column_statistics: stats.column_statistics.clone(),
699+
});
691700
let mut column_statistics = vec![];
692701

693702
for proj_expr in self.exprs.iter() {
@@ -747,18 +756,19 @@ impl ProjectionExprs {
747756
}
748757
} else if let Some(registry) = &self.expression_analyzer_registry {
749758
// Use ExpressionAnalyzer to estimate statistics for arbitrary expressions
759+
let original_stats = original_stats.as_ref().unwrap();
750760
let distinct_count = registry
751-
.get_distinct_count(expr, &stats)
761+
.get_distinct_count(expr, original_stats)
752762
.map(Precision::Inexact)
753763
.unwrap_or(Precision::Absent);
754764
let (min_value, max_value) = registry
755-
.get_min_max(expr, &stats)
765+
.get_min_max(expr, original_stats)
756766
.map(|(min, max)| (Precision::Inexact(min), Precision::Inexact(max)))
757767
.unwrap_or((Precision::Absent, Precision::Absent));
758768
let null_count = registry
759-
.get_null_fraction(expr, &stats)
769+
.get_null_fraction(expr, original_stats)
760770
.and_then(|frac| {
761-
stats
771+
original_stats
762772
.num_rows
763773
.get_value()
764774
.map(|&rows| (rows as f64 * frac).ceil() as usize)
@@ -3147,4 +3157,110 @@ pub(crate) mod tests {
31473157

31483158
Ok(())
31493159
}
3160+
3161+
#[test]
3162+
fn test_project_statistics_column_then_expression() -> Result<()> {
3163+
// SELECT a, a + 1: bare column first, then expression on same column
3164+
let input_stats = Statistics {
3165+
num_rows: Precision::Exact(1000),
3166+
total_byte_size: Precision::Absent,
3167+
column_statistics: vec![ColumnStatistics {
3168+
distinct_count: Precision::Exact(100),
3169+
null_count: Precision::Exact(0),
3170+
min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
3171+
max_value: Precision::Exact(ScalarValue::Int64(Some(1000))),
3172+
sum_value: Precision::Absent,
3173+
byte_size: Precision::Absent,
3174+
}],
3175+
};
3176+
let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
3177+
3178+
let projection = ProjectionExprs::new(vec![
3179+
ProjectionExpr {
3180+
expr: Arc::new(Column::new("a", 0)),
3181+
alias: "a".to_string(),
3182+
},
3183+
ProjectionExpr {
3184+
expr: Arc::new(BinaryExpr::new(
3185+
Arc::new(Column::new("a", 0)),
3186+
Operator::Plus,
3187+
Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
3188+
)),
3189+
alias: "a_plus_1".to_string(),
3190+
},
3191+
])
3192+
.with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new()));
3193+
3194+
let output_stats = projection.project_statistics(
3195+
input_stats,
3196+
&projection.project_schema(&input_schema)?,
3197+
)?;
3198+
3199+
// Bare column: exact stats preserved
3200+
assert_eq!(
3201+
output_stats.column_statistics[0].distinct_count,
3202+
Precision::Exact(100)
3203+
);
3204+
3205+
// Expression on same column: analyzer should still see a's NDV
3206+
assert_eq!(
3207+
output_stats.column_statistics[1].distinct_count,
3208+
Precision::Inexact(100)
3209+
);
3210+
3211+
Ok(())
3212+
}
3213+
3214+
#[test]
3215+
fn test_project_statistics_expression_then_column() -> Result<()> {
3216+
// SELECT a + 1, a: expression first, then bare column
3217+
let input_stats = Statistics {
3218+
num_rows: Precision::Exact(1000),
3219+
total_byte_size: Precision::Absent,
3220+
column_statistics: vec![ColumnStatistics {
3221+
distinct_count: Precision::Exact(100),
3222+
null_count: Precision::Exact(0),
3223+
min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
3224+
max_value: Precision::Exact(ScalarValue::Int64(Some(1000))),
3225+
sum_value: Precision::Absent,
3226+
byte_size: Precision::Absent,
3227+
}],
3228+
};
3229+
let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
3230+
3231+
let projection = ProjectionExprs::new(vec![
3232+
ProjectionExpr {
3233+
expr: Arc::new(BinaryExpr::new(
3234+
Arc::new(Column::new("a", 0)),
3235+
Operator::Plus,
3236+
Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
3237+
)),
3238+
alias: "a_plus_1".to_string(),
3239+
},
3240+
ProjectionExpr {
3241+
expr: Arc::new(Column::new("a", 0)),
3242+
alias: "a".to_string(),
3243+
},
3244+
])
3245+
.with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new()));
3246+
3247+
let output_stats = projection.project_statistics(
3248+
input_stats,
3249+
&projection.project_schema(&input_schema)?,
3250+
)?;
3251+
3252+
// Expression: analyzer sees a's NDV (no take yet)
3253+
assert_eq!(
3254+
output_stats.column_statistics[0].distinct_count,
3255+
Precision::Inexact(100)
3256+
);
3257+
3258+
// Bare column: exact stats preserved
3259+
assert_eq!(
3260+
output_stats.column_statistics[1].distinct_count,
3261+
Precision::Exact(100)
3262+
);
3263+
3264+
Ok(())
3265+
}
31503266
}

0 commit comments

Comments
 (0)