-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix(datasource): keep stats absent when collect_stats is false #21149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit( | |
| file.statistics = Some(Arc::clone(&file_stats)); | ||
| result_files.push(file); | ||
|
|
||
| // First file, we set them directly from the file statistics. | ||
| num_rows = file_stats.num_rows; | ||
| total_byte_size = file_stats.total_byte_size; | ||
| for (index, file_column) in | ||
| file_stats.column_statistics.clone().into_iter().enumerate() | ||
| { | ||
| col_stats_set[index].null_count = file_column.null_count; | ||
| col_stats_set[index].max_value = file_column.max_value; | ||
| col_stats_set[index].min_value = file_column.min_value; | ||
| col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); | ||
| if collect_stats { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small refactor suggestion. It may be worth extracting the "seed stats from first file" logic into a helper that can optionally populate the summary accumulator. This block and the later merge loop now both need to stay in sync on fields like |
||
| // First file, we set them directly from the file statistics. | ||
| num_rows = file_stats.num_rows; | ||
| total_byte_size = file_stats.total_byte_size; | ||
| for (index, file_column) in | ||
| file_stats.column_statistics.clone().into_iter().enumerate() | ||
| { | ||
| col_stats_set[index].null_count = file_column.null_count; | ||
| col_stats_set[index].max_value = file_column.max_value; | ||
| col_stats_set[index].min_value = file_column.min_value; | ||
| col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); | ||
| col_stats_set[index].byte_size = file_column.byte_size; | ||
| } | ||
| } | ||
|
|
||
| // If the number of rows exceeds the limit, we can stop processing | ||
|
|
@@ -520,6 +523,39 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| fn test_schema() -> SchemaRef { | ||
| Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) | ||
| } | ||
|
|
||
| fn make_file_stats( | ||
| num_rows: usize, | ||
| total_byte_size: usize, | ||
| col_stats: ColumnStatistics, | ||
| ) -> Arc<Statistics> { | ||
| Arc::new(Statistics { | ||
| num_rows: Precision::Exact(num_rows), | ||
| total_byte_size: Precision::Exact(total_byte_size), | ||
| column_statistics: vec![col_stats], | ||
| }) | ||
| } | ||
|
|
||
| fn rich_col_stats( | ||
| null_count: usize, | ||
| min: i64, | ||
| max: i64, | ||
| sum: i64, | ||
| byte_size: usize, | ||
| ) -> ColumnStatistics { | ||
| ColumnStatistics { | ||
| null_count: Precision::Exact(null_count), | ||
| max_value: Precision::Exact(ScalarValue::Int64(Some(max))), | ||
| min_value: Precision::Exact(ScalarValue::Int64(Some(min))), | ||
| distinct_count: Precision::Absent, | ||
| sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))), | ||
| byte_size: Precision::Exact(byte_size), | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| #[expect(deprecated)] | ||
| async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() | ||
|
|
@@ -533,7 +569,7 @@ mod tests { | |
| ))]); | ||
|
|
||
| let (_group, stats) = | ||
| get_statistics_with_limit(files, schema, None, false).await?; | ||
| get_statistics_with_limit(files, schema, None, true).await?; | ||
|
|
||
| assert_eq!( | ||
| stats.column_statistics[0].sum_value, | ||
|
|
@@ -571,4 +607,123 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| #[expect(deprecated)] | ||
| async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() { | ||
| let all_files = stream::iter(vec![ | ||
| Ok(( | ||
| PartitionedFile::new("first.parquet", 10), | ||
| make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)), | ||
| )), | ||
| Ok(( | ||
| PartitionedFile::new("second.parquet", 20), | ||
| make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), | ||
| )), | ||
| ]); | ||
|
|
||
| let (_files, statistics) = | ||
| get_statistics_with_limit(all_files, test_schema(), None, false) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(statistics.num_rows, Precision::Absent); | ||
| assert_eq!(statistics.total_byte_size, Precision::Absent); | ||
| assert_eq!(statistics.column_statistics.len(), 1); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].null_count, | ||
| Precision::Absent | ||
| ); | ||
| assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent); | ||
| assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent); | ||
| assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent); | ||
| assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| #[expect(deprecated)] | ||
| async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() { | ||
| let all_files = stream::iter(vec![ | ||
| Ok(( | ||
| PartitionedFile::new("first.parquet", 10), | ||
| make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)), | ||
| )), | ||
| Ok(( | ||
| PartitionedFile::new("second.parquet", 20), | ||
| make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), | ||
| )), | ||
| ]); | ||
|
|
||
| let (_files, statistics) = | ||
| get_statistics_with_limit(all_files, test_schema(), None, true) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(statistics.num_rows, Precision::Exact(15)); | ||
| assert_eq!(statistics.total_byte_size, Precision::Exact(150)); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].null_count, | ||
| Precision::Exact(3) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].min_value, | ||
| Precision::Exact(ScalarValue::Int64(Some(1))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].max_value, | ||
| Precision::Exact(ScalarValue::Int64(Some(99))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].sum_value, | ||
| Precision::Exact(ScalarValue::Int64(Some(315))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].byte_size, | ||
| Precision::Exact(192) | ||
| ); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| #[expect(deprecated)] | ||
| async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() { | ||
| let all_files = stream::iter(vec![ | ||
| Ok(( | ||
| PartitionedFile::new("first.parquet", 10), | ||
| make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)), | ||
| )), | ||
| Ok(( | ||
| PartitionedFile::new("second.parquet", 20), | ||
| make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)), | ||
| )), | ||
| Ok(( | ||
| PartitionedFile::new("third.parquet", 20), | ||
| make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)), | ||
| )), | ||
| ]); | ||
|
|
||
| let (files, statistics) = | ||
| get_statistics_with_limit(all_files, test_schema(), Some(8), true) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(files.len(), 2); | ||
| assert_eq!(statistics.num_rows, Precision::Inexact(10)); | ||
| assert_eq!(statistics.total_byte_size, Precision::Inexact(100)); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].min_value, | ||
| Precision::Inexact(ScalarValue::Int64(Some(1))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].max_value, | ||
| Precision::Inexact(ScalarValue::Int64(Some(10))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].sum_value, | ||
| Precision::Inexact(ScalarValue::Int64(Some(55))) | ||
| ); | ||
| assert_eq!( | ||
| statistics.column_statistics[0].byte_size, | ||
| Precision::Inexact(128) | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing the returned summary here. I think this introduces a separate issue in the
limitpath though.get_statistics_with_limitstill relies onnum_rowsto decide whether it can stop early. Withcollect_stats=false,num_rowsnow staysAbsent, soconservative_num_rowsfalls back tousize::MINand we keep scanning the stream instead of stopping early.Before this change, a known first-file row count could still let us stop after the first file. With this patch, even something like
limit=0can continue reading later files.Could we preserve a separate row-count accumulator, or at least keep the first-file row count around for the limit check, while still returning a bare
Statisticsvalue whencollect_statsis false? I think we should also add a regression test forcollect_stats=falsewith a limit so this behavior is locked in.