Skip to content

Commit 0b8c4c5

Browse files
kumarUjjawalkosiew
andauthored
fix(datasource): keep stats absent when collect_stats is false (#21149)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #2976 - Part of #8227 ## Rationale for this change `get_statistics_with_limit` was copying first-file row/byte stats even when`collect_stats=false`. This could return `Exact(0)` instead of `Absent` (unknown stats). <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Summary stats are only initialized and merged when `collect_stats=true`. Also fixes first-file column `byte_size` initialization in the `collect_stats=true` path, and adds regression tests for both paths and limit behavior. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Siew Kam Onn <kosiew@gmail.com>
1 parent a4b945e commit 0b8c4c5

1 file changed

Lines changed: 278 additions & 54 deletions

File tree

datafusion/datasource/src/statistics.rs

Lines changed: 278 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,80 @@ fn sort_columns_from_physical_sort_exprs(
284284
.collect()
285285
}
286286

287+
fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) {
288+
summary_statistics.num_rows = file_stats.num_rows;
289+
summary_statistics.total_byte_size = file_stats.total_byte_size;
290+
291+
for (summary_col_stats, file_col_stats) in summary_statistics
292+
.column_statistics
293+
.iter_mut()
294+
.zip(file_stats.column_statistics.iter())
295+
{
296+
summary_col_stats.null_count = file_col_stats.null_count;
297+
summary_col_stats.max_value = file_col_stats.max_value.clone();
298+
summary_col_stats.min_value = file_col_stats.min_value.clone();
299+
summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type();
300+
summary_col_stats.byte_size = file_col_stats.byte_size;
301+
}
302+
}
303+
304+
fn merge_summary_statistics(
305+
summary_statistics: &mut Statistics,
306+
file_stats: &Statistics,
307+
) {
308+
summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows);
309+
summary_statistics.total_byte_size = summary_statistics
310+
.total_byte_size
311+
.add(&file_stats.total_byte_size);
312+
313+
for (summary_col_stats, file_col_stats) in summary_statistics
314+
.column_statistics
315+
.iter_mut()
316+
.zip(file_stats.column_statistics.iter())
317+
{
318+
let ColumnStatistics {
319+
null_count: file_nc,
320+
max_value: file_max,
321+
min_value: file_min,
322+
sum_value: file_sum,
323+
distinct_count: _,
324+
byte_size: file_sbs,
325+
} = file_col_stats;
326+
327+
summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc);
328+
summary_col_stats.max_value = summary_col_stats.max_value.max(file_max);
329+
summary_col_stats.min_value = summary_col_stats.min_value.min(file_min);
330+
summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum);
331+
summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs);
332+
}
333+
}
334+
335+
fn seed_first_file_statistics(
336+
limit_num_rows: &mut Precision<usize>,
337+
summary_statistics: &mut Statistics,
338+
file_stats: &Statistics,
339+
collect_stats: bool,
340+
) {
341+
*limit_num_rows = file_stats.num_rows;
342+
343+
if collect_stats {
344+
seed_summary_statistics(summary_statistics, file_stats);
345+
}
346+
}
347+
348+
fn merge_file_statistics(
349+
limit_num_rows: &mut Precision<usize>,
350+
summary_statistics: &mut Statistics,
351+
file_stats: &Statistics,
352+
collect_stats: bool,
353+
) {
354+
*limit_num_rows = limit_num_rows.add(&file_stats.num_rows);
355+
356+
if collect_stats {
357+
merge_summary_statistics(summary_statistics, file_stats);
358+
}
359+
}
360+
287361
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
288362
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
289363
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
@@ -308,9 +382,14 @@ pub async fn get_statistics_with_limit(
308382
// - zero for summations, and
309383
// - neutral element for extreme points.
310384
let size = file_schema.fields().len();
311-
let mut col_stats_set = vec![ColumnStatistics::default(); size];
312-
let mut num_rows = Precision::<usize>::Absent;
313-
let mut total_byte_size = Precision::<usize>::Absent;
385+
let mut summary_statistics = Statistics {
386+
num_rows: Precision::Absent,
387+
total_byte_size: Precision::Absent,
388+
column_statistics: vec![ColumnStatistics::default(); size],
389+
};
390+
// Keep limit pruning separate from the returned summary so `collect_stats=false`
391+
// can still stop early using known file row counts.
392+
let mut limit_num_rows = Precision::<usize>::Absent;
314393

315394
// Fusing the stream allows us to call next safely even once it is finished.
316395
let mut all_files = Box::pin(all_files.fuse());
@@ -320,23 +399,18 @@ pub async fn get_statistics_with_limit(
320399
file.statistics = Some(Arc::clone(&file_stats));
321400
result_files.push(file);
322401

323-
// First file, we set them directly from the file statistics.
324-
num_rows = file_stats.num_rows;
325-
total_byte_size = file_stats.total_byte_size;
326-
for (index, file_column) in
327-
file_stats.column_statistics.clone().into_iter().enumerate()
328-
{
329-
col_stats_set[index].null_count = file_column.null_count;
330-
col_stats_set[index].max_value = file_column.max_value;
331-
col_stats_set[index].min_value = file_column.min_value;
332-
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
333-
}
402+
seed_first_file_statistics(
403+
&mut limit_num_rows,
404+
&mut summary_statistics,
405+
&file_stats,
406+
collect_stats,
407+
);
334408

335409
// If the number of rows exceeds the limit, we can stop processing
336410
// files. This only applies when we know the number of rows. It also
337411
// currently ignores tables that have no statistics regarding the
338412
// number of rows.
339-
let conservative_num_rows = match num_rows {
413+
let conservative_num_rows = match limit_num_rows {
340414
Precision::Exact(nr) => nr,
341415
_ => usize::MIN,
342416
};
@@ -345,44 +419,18 @@ pub async fn get_statistics_with_limit(
345419
let (mut file, file_stats) = current?;
346420
file.statistics = Some(Arc::clone(&file_stats));
347421
result_files.push(file);
348-
if !collect_stats {
349-
continue;
350-
}
351-
352-
// We accumulate the number of rows, total byte size and null
353-
// counts across all the files in question. If any file does not
354-
// provide any information or provides an inexact value, we demote
355-
// the statistic precision to inexact.
356-
num_rows = num_rows.add(&file_stats.num_rows);
357-
358-
total_byte_size = total_byte_size.add(&file_stats.total_byte_size);
359-
360-
for (file_col_stats, col_stats) in file_stats
361-
.column_statistics
362-
.iter()
363-
.zip(col_stats_set.iter_mut())
364-
{
365-
let ColumnStatistics {
366-
null_count: file_nc,
367-
max_value: file_max,
368-
min_value: file_min,
369-
sum_value: file_sum,
370-
distinct_count: _,
371-
byte_size: file_sbs,
372-
} = file_col_stats;
373-
374-
col_stats.null_count = col_stats.null_count.add(file_nc);
375-
col_stats.max_value = col_stats.max_value.max(file_max);
376-
col_stats.min_value = col_stats.min_value.min(file_min);
377-
col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum);
378-
col_stats.byte_size = col_stats.byte_size.add(file_sbs);
379-
}
422+
merge_file_statistics(
423+
&mut limit_num_rows,
424+
&mut summary_statistics,
425+
&file_stats,
426+
collect_stats,
427+
);
380428

381429
// If the number of rows exceeds the limit, we can stop processing
382430
// files. This only applies when we know the number of rows. It also
383431
// currently ignores tables that have no statistics regarding the
384432
// number of rows.
385-
if num_rows.get_value().unwrap_or(&usize::MIN)
433+
if limit_num_rows.get_value().unwrap_or(&usize::MIN)
386434
> &limit.unwrap_or(usize::MAX)
387435
{
388436
break;
@@ -391,11 +439,7 @@ pub async fn get_statistics_with_limit(
391439
}
392440
};
393441

394-
let mut statistics = Statistics {
395-
num_rows,
396-
total_byte_size,
397-
column_statistics: col_stats_set,
398-
};
442+
let mut statistics = summary_statistics;
399443
if all_files.next().await.is_some() {
400444
// If we still have files in the stream, it means that the limit kicked
401445
// in, and the statistic could have been different had we processed the
@@ -520,6 +564,39 @@ mod tests {
520564
}
521565
}
522566

567+
fn test_schema() -> SchemaRef {
568+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
569+
}
570+
571+
fn make_file_stats(
572+
num_rows: usize,
573+
total_byte_size: usize,
574+
col_stats: ColumnStatistics,
575+
) -> Arc<Statistics> {
576+
Arc::new(Statistics {
577+
num_rows: Precision::Exact(num_rows),
578+
total_byte_size: Precision::Exact(total_byte_size),
579+
column_statistics: vec![col_stats],
580+
})
581+
}
582+
583+
fn rich_col_stats(
584+
null_count: usize,
585+
min: i64,
586+
max: i64,
587+
sum: i64,
588+
byte_size: usize,
589+
) -> ColumnStatistics {
590+
ColumnStatistics {
591+
null_count: Precision::Exact(null_count),
592+
max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
593+
min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
594+
distinct_count: Precision::Absent,
595+
sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
596+
byte_size: Precision::Exact(byte_size),
597+
}
598+
}
599+
523600
#[tokio::test]
524601
#[expect(deprecated)]
525602
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
@@ -533,7 +610,7 @@ mod tests {
533610
))]);
534611

535612
let (_group, stats) =
536-
get_statistics_with_limit(files, schema, None, false).await?;
613+
get_statistics_with_limit(files, schema, None, true).await?;
537614

538615
assert_eq!(
539616
stats.column_statistics[0].sum_value,
@@ -571,4 +648,151 @@ mod tests {
571648

572649
Ok(())
573650
}
651+
652+
#[tokio::test]
653+
#[expect(deprecated)]
654+
async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
655+
let all_files = stream::iter(vec![
656+
Ok((
657+
PartitionedFile::new("first.parquet", 10),
658+
make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
659+
)),
660+
Ok((
661+
PartitionedFile::new("second.parquet", 20),
662+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
663+
)),
664+
]);
665+
666+
let (_files, statistics) =
667+
get_statistics_with_limit(all_files, test_schema(), None, false)
668+
.await
669+
.unwrap();
670+
671+
assert_eq!(statistics.num_rows, Precision::Absent);
672+
assert_eq!(statistics.total_byte_size, Precision::Absent);
673+
assert_eq!(statistics.column_statistics.len(), 1);
674+
assert_eq!(
675+
statistics.column_statistics[0].null_count,
676+
Precision::Absent
677+
);
678+
assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
679+
assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
680+
assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
681+
assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
682+
}
683+
684+
#[tokio::test]
685+
#[expect(deprecated)]
686+
async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() {
687+
let all_files = stream::iter(vec![
688+
Ok((
689+
PartitionedFile::new("first.parquet", 10),
690+
make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)),
691+
)),
692+
Ok((
693+
PartitionedFile::new("second.parquet", 20),
694+
make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)),
695+
)),
696+
Ok((
697+
PartitionedFile::new("third.parquet", 30),
698+
make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)),
699+
)),
700+
]);
701+
702+
let (files, statistics) =
703+
get_statistics_with_limit(all_files, test_schema(), Some(4), false)
704+
.await
705+
.unwrap();
706+
707+
assert_eq!(files.len(), 2);
708+
assert_eq!(statistics.num_rows, Precision::Absent);
709+
assert_eq!(statistics.total_byte_size, Precision::Absent);
710+
}
711+
712+
#[tokio::test]
713+
#[expect(deprecated)]
714+
async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
715+
let all_files = stream::iter(vec![
716+
Ok((
717+
PartitionedFile::new("first.parquet", 10),
718+
make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
719+
)),
720+
Ok((
721+
PartitionedFile::new("second.parquet", 20),
722+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
723+
)),
724+
]);
725+
726+
let (_files, statistics) =
727+
get_statistics_with_limit(all_files, test_schema(), None, true)
728+
.await
729+
.unwrap();
730+
731+
assert_eq!(statistics.num_rows, Precision::Exact(15));
732+
assert_eq!(statistics.total_byte_size, Precision::Exact(150));
733+
assert_eq!(
734+
statistics.column_statistics[0].null_count,
735+
Precision::Exact(3)
736+
);
737+
assert_eq!(
738+
statistics.column_statistics[0].min_value,
739+
Precision::Exact(ScalarValue::Int64(Some(1)))
740+
);
741+
assert_eq!(
742+
statistics.column_statistics[0].max_value,
743+
Precision::Exact(ScalarValue::Int64(Some(99)))
744+
);
745+
assert_eq!(
746+
statistics.column_statistics[0].sum_value,
747+
Precision::Exact(ScalarValue::Int64(Some(315)))
748+
);
749+
assert_eq!(
750+
statistics.column_statistics[0].byte_size,
751+
Precision::Exact(192)
752+
);
753+
}
754+
755+
#[tokio::test]
756+
#[expect(deprecated)]
757+
async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
758+
let all_files = stream::iter(vec![
759+
Ok((
760+
PartitionedFile::new("first.parquet", 10),
761+
make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
762+
)),
763+
Ok((
764+
PartitionedFile::new("second.parquet", 20),
765+
make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
766+
)),
767+
Ok((
768+
PartitionedFile::new("third.parquet", 20),
769+
make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
770+
)),
771+
]);
772+
773+
let (files, statistics) =
774+
get_statistics_with_limit(all_files, test_schema(), Some(8), true)
775+
.await
776+
.unwrap();
777+
778+
assert_eq!(files.len(), 2);
779+
assert_eq!(statistics.num_rows, Precision::Inexact(10));
780+
assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
781+
assert_eq!(
782+
statistics.column_statistics[0].min_value,
783+
Precision::Inexact(ScalarValue::Int64(Some(1)))
784+
);
785+
assert_eq!(
786+
statistics.column_statistics[0].max_value,
787+
Precision::Inexact(ScalarValue::Int64(Some(10)))
788+
);
789+
assert_eq!(
790+
statistics.column_statistics[0].sum_value,
791+
Precision::Inexact(ScalarValue::Int64(Some(55)))
792+
);
793+
assert_eq!(
794+
statistics.column_statistics[0].byte_size,
795+
Precision::Inexact(128)
796+
);
797+
}
574798
}

0 commit comments

Comments
 (0)