Skip to content

Commit 23fa3f6

Browse files
committed
fix(datasource): keep stats absent when collect_stats is false
1 parent 6e0dde0 commit 23fa3f6

1 file changed

Lines changed: 166 additions & 11 deletions

File tree

datafusion/datasource/src/statistics.rs

Lines changed: 166 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit(
320320
file.statistics = Some(Arc::clone(&file_stats));
321321
result_files.push(file);
322322

323-
// First file, we set them directly from the file statistics.
324-
num_rows = file_stats.num_rows;
325-
total_byte_size = file_stats.total_byte_size;
326-
for (index, file_column) in
327-
file_stats.column_statistics.clone().into_iter().enumerate()
328-
{
329-
col_stats_set[index].null_count = file_column.null_count;
330-
col_stats_set[index].max_value = file_column.max_value;
331-
col_stats_set[index].min_value = file_column.min_value;
332-
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
323+
if collect_stats {
324+
// First file, we set them directly from the file statistics.
325+
num_rows = file_stats.num_rows;
326+
total_byte_size = file_stats.total_byte_size;
327+
for (index, file_column) in
328+
file_stats.column_statistics.clone().into_iter().enumerate()
329+
{
330+
col_stats_set[index].null_count = file_column.null_count;
331+
col_stats_set[index].max_value = file_column.max_value;
332+
col_stats_set[index].min_value = file_column.min_value;
333+
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
334+
col_stats_set[index].byte_size = file_column.byte_size;
335+
}
333336
}
334337

335338
// If the number of rows exceeds the limit, we can stop processing
@@ -520,6 +523,39 @@ mod tests {
520523
}
521524
}
522525

526+
fn test_schema() -> SchemaRef {
527+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
528+
}
529+
530+
fn make_file_stats(
531+
num_rows: usize,
532+
total_byte_size: usize,
533+
col_stats: ColumnStatistics,
534+
) -> Arc<Statistics> {
535+
Arc::new(Statistics {
536+
num_rows: Precision::Exact(num_rows),
537+
total_byte_size: Precision::Exact(total_byte_size),
538+
column_statistics: vec![col_stats],
539+
})
540+
}
541+
542+
fn rich_col_stats(
543+
null_count: usize,
544+
min: i64,
545+
max: i64,
546+
sum: i64,
547+
byte_size: usize,
548+
) -> ColumnStatistics {
549+
ColumnStatistics {
550+
null_count: Precision::Exact(null_count),
551+
max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
552+
min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
553+
distinct_count: Precision::Absent,
554+
sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
555+
byte_size: Precision::Exact(byte_size),
556+
}
557+
}
558+
523559
#[tokio::test]
524560
#[expect(deprecated)]
525561
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
@@ -533,7 +569,7 @@ mod tests {
533569
))]);
534570

535571
let (_group, stats) =
536-
get_statistics_with_limit(files, schema, None, false).await?;
572+
get_statistics_with_limit(files, schema, None, true).await?;
537573

538574
assert_eq!(
539575
stats.column_statistics[0].sum_value,
@@ -571,4 +607,123 @@ mod tests {
571607

572608
Ok(())
573609
}
610+
611+
#[tokio::test]
612+
#[expect(deprecated)]
613+
async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
614+
let all_files = stream::iter(vec![
615+
Ok((
616+
PartitionedFile::new("first.parquet", 10),
617+
make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
618+
)),
619+
Ok((
620+
PartitionedFile::new("second.parquet", 20),
621+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
622+
)),
623+
]);
624+
625+
let (_files, statistics) =
626+
get_statistics_with_limit(all_files, test_schema(), None, false)
627+
.await
628+
.unwrap();
629+
630+
assert_eq!(statistics.num_rows, Precision::Absent);
631+
assert_eq!(statistics.total_byte_size, Precision::Absent);
632+
assert_eq!(statistics.column_statistics.len(), 1);
633+
assert_eq!(
634+
statistics.column_statistics[0].null_count,
635+
Precision::Absent
636+
);
637+
assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
638+
assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
639+
assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
640+
assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
641+
}
642+
643+
#[tokio::test]
644+
#[expect(deprecated)]
645+
async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
646+
let all_files = stream::iter(vec![
647+
Ok((
648+
PartitionedFile::new("first.parquet", 10),
649+
make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
650+
)),
651+
Ok((
652+
PartitionedFile::new("second.parquet", 20),
653+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
654+
)),
655+
]);
656+
657+
let (_files, statistics) =
658+
get_statistics_with_limit(all_files, test_schema(), None, true)
659+
.await
660+
.unwrap();
661+
662+
assert_eq!(statistics.num_rows, Precision::Exact(15));
663+
assert_eq!(statistics.total_byte_size, Precision::Exact(150));
664+
assert_eq!(
665+
statistics.column_statistics[0].null_count,
666+
Precision::Exact(3)
667+
);
668+
assert_eq!(
669+
statistics.column_statistics[0].min_value,
670+
Precision::Exact(ScalarValue::Int64(Some(1)))
671+
);
672+
assert_eq!(
673+
statistics.column_statistics[0].max_value,
674+
Precision::Exact(ScalarValue::Int64(Some(99)))
675+
);
676+
assert_eq!(
677+
statistics.column_statistics[0].sum_value,
678+
Precision::Exact(ScalarValue::Int64(Some(315)))
679+
);
680+
assert_eq!(
681+
statistics.column_statistics[0].byte_size,
682+
Precision::Exact(192)
683+
);
684+
}
685+
686+
#[tokio::test]
687+
#[expect(deprecated)]
688+
async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
689+
let all_files = stream::iter(vec![
690+
Ok((
691+
PartitionedFile::new("first.parquet", 10),
692+
make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
693+
)),
694+
Ok((
695+
PartitionedFile::new("second.parquet", 20),
696+
make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
697+
)),
698+
Ok((
699+
PartitionedFile::new("third.parquet", 20),
700+
make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
701+
)),
702+
]);
703+
704+
let (files, statistics) =
705+
get_statistics_with_limit(all_files, test_schema(), Some(8), true)
706+
.await
707+
.unwrap();
708+
709+
assert_eq!(files.len(), 2);
710+
assert_eq!(statistics.num_rows, Precision::Inexact(10));
711+
assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
712+
assert_eq!(
713+
statistics.column_statistics[0].min_value,
714+
Precision::Inexact(ScalarValue::Int64(Some(1)))
715+
);
716+
assert_eq!(
717+
statistics.column_statistics[0].max_value,
718+
Precision::Inexact(ScalarValue::Int64(Some(10)))
719+
);
720+
assert_eq!(
721+
statistics.column_statistics[0].sum_value,
722+
Precision::Inexact(ScalarValue::Int64(Some(55)))
723+
);
724+
assert_eq!(
725+
statistics.column_statistics[0].byte_size,
726+
Precision::Inexact(128)
727+
);
728+
}
574729
}

0 commit comments

Comments
 (0)