diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index e5a1e4613b3d4..5835e47bbadaa 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -284,6 +284,80 @@ fn sort_columns_from_physical_sort_exprs( .collect() } +fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) { + summary_statistics.num_rows = file_stats.num_rows; + summary_statistics.total_byte_size = file_stats.total_byte_size; + + for (summary_col_stats, file_col_stats) in summary_statistics + .column_statistics + .iter_mut() + .zip(file_stats.column_statistics.iter()) + { + summary_col_stats.null_count = file_col_stats.null_count; + summary_col_stats.max_value = file_col_stats.max_value.clone(); + summary_col_stats.min_value = file_col_stats.min_value.clone(); + summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type(); + summary_col_stats.byte_size = file_col_stats.byte_size; + } +} + +fn merge_summary_statistics( + summary_statistics: &mut Statistics, + file_stats: &Statistics, +) { + summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows); + summary_statistics.total_byte_size = summary_statistics + .total_byte_size + .add(&file_stats.total_byte_size); + + for (summary_col_stats, file_col_stats) in summary_statistics + .column_statistics + .iter_mut() + .zip(file_stats.column_statistics.iter()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + sum_value: file_sum, + distinct_count: _, + byte_size: file_sbs, + } = file_col_stats; + + summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc); + summary_col_stats.max_value = summary_col_stats.max_value.max(file_max); + summary_col_stats.min_value = summary_col_stats.min_value.min(file_min); + summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum); + summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs); + } +} + +fn seed_first_file_statistics( + limit_num_rows: &mut Precision, + summary_statistics: &mut Statistics, + file_stats: &Statistics, + collect_stats: bool, +) { + *limit_num_rows = file_stats.num_rows; + + if collect_stats { + seed_summary_statistics(summary_statistics, file_stats); + } +} + +fn merge_file_statistics( + limit_num_rows: &mut Precision, + summary_statistics: &mut Statistics, + file_stats: &Statistics, + collect_stats: bool, +) { + *limit_num_rows = limit_num_rows.add(&file_stats.num_rows); + + if collect_stats { + merge_summary_statistics(summary_statistics, file_stats); + } +} + /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to /// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on @@ -308,9 +382,14 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; + let mut summary_statistics = Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::default(); size], + }; + // Keep limit pruning separate from the returned summary so `collect_stats=false` + // can still stop early using known file row counts. + let mut limit_num_rows = Precision::::Absent; // Fusing the stream allows us to call next safely even once it is finished. let mut all_files = Box::pin(all_files.fuse()); @@ -320,23 +399,18 @@ pub async fn get_statistics_with_limit( file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); - // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in - file_stats.column_statistics.clone().into_iter().enumerate() - { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); - } + seed_first_file_statistics( + &mut limit_num_rows, + &mut summary_statistics, + &file_stats, + collect_stats, + ); // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also // currently ignores tables that have no statistics regarding the // number of rows. - let conservative_num_rows = match num_rows { + let conservative_num_rows = match limit_num_rows { Precision::Exact(nr) => nr, _ => usize::MIN, }; @@ -345,44 +419,18 @@ pub async fn get_statistics_with_limit( let (mut file, file_stats) = current?; file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); - if !collect_stats { - continue; - } - - // We accumulate the number of rows, total byte size and null - // counts across all the files in question. If any file does not - // provide any information or provides an inexact value, we demote - // the statistic precision to inexact. - num_rows = num_rows.add(&file_stats.num_rows); - - total_byte_size = total_byte_size.add(&file_stats.total_byte_size); - - for (file_col_stats, col_stats) in file_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - let ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - sum_value: file_sum, - distinct_count: _, - byte_size: file_sbs, - } = file_col_stats; - - col_stats.null_count = col_stats.null_count.add(file_nc); - col_stats.max_value = col_stats.max_value.max(file_max); - col_stats.min_value = col_stats.min_value.min(file_min); - col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum); - col_stats.byte_size = col_stats.byte_size.add(file_sbs); - } + merge_file_statistics( + &mut limit_num_rows, + &mut summary_statistics, + &file_stats, + collect_stats, + ); // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also // currently ignores tables that have no statistics regarding the // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) + if limit_num_rows.get_value().unwrap_or(&usize::MIN) > &limit.unwrap_or(usize::MAX) { break; @@ -391,11 +439,7 @@ pub async fn get_statistics_with_limit( } }; - let mut statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - }; + let mut statistics = summary_statistics; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked // in, and the statistic could have been different had we processed the @@ -520,6 +564,39 @@ mod tests { } } + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) + } + + fn make_file_stats( + num_rows: usize, + total_byte_size: usize, + col_stats: ColumnStatistics, + ) -> Arc { + Arc::new(Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: vec![col_stats], + }) + } + + fn rich_col_stats( + null_count: usize, + min: i64, + max: i64, + sum: i64, + byte_size: usize, + ) -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Exact(null_count), + max_value: Precision::Exact(ScalarValue::Int64(Some(max))), + min_value: Precision::Exact(ScalarValue::Int64(Some(min))), + distinct_count: Precision::Absent, + sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))), + byte_size: Precision::Exact(byte_size), + } + } + #[tokio::test] #[expect(deprecated)] async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() @@ -533,7 +610,7 @@ mod tests { ))]); let (_group, stats) = - get_statistics_with_limit(files, schema, None, false).await?; + get_statistics_with_limit(files, schema, None, true).await?; assert_eq!( stats.column_statistics[0].sum_value, @@ -571,4 +648,151 @@ mod tests { Ok(()) } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, false) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!(statistics.total_byte_size, Precision::Absent); + assert_eq!(statistics.column_statistics.len(), 1); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Absent + ); + assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)), + )), + Ok(( + PartitionedFile::new("third.parquet", 30), + make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)), + )), + ]); + + let (files, statistics) = + get_statistics_with_limit(all_files, test_schema(), Some(4), false) + .await + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!(statistics.total_byte_size, Precision::Absent); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, true) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Exact(15)); + assert_eq!(statistics.total_byte_size, Precision::Exact(150)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(3) + ); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int64(Some(99))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::Int64(Some(315))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Exact(192) + ); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)), + )), + Ok(( + PartitionedFile::new("third.parquet", 20), + make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)), + )), + ]); + + let (files, statistics) = + get_statistics_with_limit(all_files, test_schema(), Some(8), true) + .await + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(100)); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Inexact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Inexact(ScalarValue::Int64(Some(10))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Inexact(ScalarValue::Int64(Some(55))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(128) + ); + } }