Skip to content

Commit d612950

Browse files
committed
fix(datasource): keep stats absent when collect_stats is false
1 parent 6ab16cc commit d612950

1 file changed

Lines changed: 173 additions & 11 deletions

File tree

datafusion/datasource/src/statistics.rs

Lines changed: 173 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs(
293293
since = "47.0.0",
294294
note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
295295
)]
296-
#[expect(unused)]
296+
#[cfg_attr(not(test), expect(unused))]
297297
pub async fn get_statistics_with_limit(
298298
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
299299
file_schema: SchemaRef,
@@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit(
320320
file.statistics = Some(Arc::clone(&file_stats));
321321
result_files.push(file);
322322

323-
// First file, we set them directly from the file statistics.
324-
num_rows = file_stats.num_rows;
325-
total_byte_size = file_stats.total_byte_size;
326-
for (index, file_column) in
327-
file_stats.column_statistics.clone().into_iter().enumerate()
328-
{
329-
col_stats_set[index].null_count = file_column.null_count;
330-
col_stats_set[index].max_value = file_column.max_value;
331-
col_stats_set[index].min_value = file_column.min_value;
332-
col_stats_set[index].sum_value = file_column.sum_value;
323+
if collect_stats {
324+
// First file, we set them directly from the file statistics.
325+
num_rows = file_stats.num_rows;
326+
total_byte_size = file_stats.total_byte_size;
327+
for (index, file_column) in
328+
file_stats.column_statistics.clone().into_iter().enumerate()
329+
{
330+
col_stats_set[index].null_count = file_column.null_count;
331+
col_stats_set[index].max_value = file_column.max_value;
332+
col_stats_set[index].min_value = file_column.min_value;
333+
col_stats_set[index].sum_value = file_column.sum_value;
334+
col_stats_set[index].byte_size = file_column.byte_size;
335+
}
333336
}
334337

335338
// If the number of rows exceeds the limit, we can stop processing
@@ -497,3 +500,162 @@ pub fn add_row_stats(
497500
) -> Precision<usize> {
498501
file_num_rows.add(&num_rows)
499502
}
503+
504+
#[cfg(test)]
505+
mod tests {
506+
use super::*;
507+
use arrow::datatypes::{DataType, Field, Schema};
508+
use futures::stream;
509+
510+
fn test_schema() -> SchemaRef {
511+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
512+
}
513+
514+
fn make_file_stats(
515+
num_rows: usize,
516+
total_byte_size: usize,
517+
col_stats: ColumnStatistics,
518+
) -> Arc<Statistics> {
519+
Arc::new(Statistics {
520+
num_rows: Precision::Exact(num_rows),
521+
total_byte_size: Precision::Exact(total_byte_size),
522+
column_statistics: vec![col_stats],
523+
})
524+
}
525+
526+
fn rich_col_stats(
527+
null_count: usize,
528+
min: i64,
529+
max: i64,
530+
sum: i64,
531+
byte_size: usize,
532+
) -> ColumnStatistics {
533+
ColumnStatistics {
534+
null_count: Precision::Exact(null_count),
535+
max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
536+
min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
537+
distinct_count: Precision::Absent,
538+
sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
539+
byte_size: Precision::Exact(byte_size),
540+
}
541+
}
542+
543+
#[tokio::test]
544+
#[expect(deprecated)]
545+
async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
546+
let all_files = stream::iter(vec![
547+
Ok((
548+
PartitionedFile::new("first.parquet", 10),
549+
make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
550+
)),
551+
Ok((
552+
PartitionedFile::new("second.parquet", 20),
553+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
554+
)),
555+
]);
556+
557+
let (_files, statistics) =
558+
get_statistics_with_limit(all_files, test_schema(), None, false)
559+
.await
560+
.unwrap();
561+
562+
assert_eq!(statistics.num_rows, Precision::Absent);
563+
assert_eq!(statistics.total_byte_size, Precision::Absent);
564+
assert_eq!(statistics.column_statistics.len(), 1);
565+
assert_eq!(
566+
statistics.column_statistics[0].null_count,
567+
Precision::Absent
568+
);
569+
assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
570+
assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
571+
assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
572+
assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
573+
}
574+
575+
#[tokio::test]
576+
#[expect(deprecated)]
577+
async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
578+
let all_files = stream::iter(vec![
579+
Ok((
580+
PartitionedFile::new("first.parquet", 10),
581+
make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
582+
)),
583+
Ok((
584+
PartitionedFile::new("second.parquet", 20),
585+
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
586+
)),
587+
]);
588+
589+
let (_files, statistics) =
590+
get_statistics_with_limit(all_files, test_schema(), None, true)
591+
.await
592+
.unwrap();
593+
594+
assert_eq!(statistics.num_rows, Precision::Exact(15));
595+
assert_eq!(statistics.total_byte_size, Precision::Exact(150));
596+
assert_eq!(
597+
statistics.column_statistics[0].null_count,
598+
Precision::Exact(3)
599+
);
600+
assert_eq!(
601+
statistics.column_statistics[0].min_value,
602+
Precision::Exact(ScalarValue::Int64(Some(1)))
603+
);
604+
assert_eq!(
605+
statistics.column_statistics[0].max_value,
606+
Precision::Exact(ScalarValue::Int64(Some(99)))
607+
);
608+
assert_eq!(
609+
statistics.column_statistics[0].sum_value,
610+
Precision::Exact(ScalarValue::Int64(Some(315)))
611+
);
612+
assert_eq!(
613+
statistics.column_statistics[0].byte_size,
614+
Precision::Exact(192)
615+
);
616+
}
617+
618+
#[tokio::test]
619+
#[expect(deprecated)]
620+
async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
621+
let all_files = stream::iter(vec![
622+
Ok((
623+
PartitionedFile::new("first.parquet", 10),
624+
make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
625+
)),
626+
Ok((
627+
PartitionedFile::new("second.parquet", 20),
628+
make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
629+
)),
630+
Ok((
631+
PartitionedFile::new("third.parquet", 20),
632+
make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
633+
)),
634+
]);
635+
636+
let (files, statistics) =
637+
get_statistics_with_limit(all_files, test_schema(), Some(8), true)
638+
.await
639+
.unwrap();
640+
641+
assert_eq!(files.len(), 2);
642+
assert_eq!(statistics.num_rows, Precision::Inexact(10));
643+
assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
644+
assert_eq!(
645+
statistics.column_statistics[0].min_value,
646+
Precision::Inexact(ScalarValue::Int64(Some(1)))
647+
);
648+
assert_eq!(
649+
statistics.column_statistics[0].max_value,
650+
Precision::Inexact(ScalarValue::Int64(Some(10)))
651+
);
652+
assert_eq!(
653+
statistics.column_statistics[0].sum_value,
654+
Precision::Inexact(ScalarValue::Int64(Some(55)))
655+
);
656+
assert_eq!(
657+
statistics.column_statistics[0].byte_size,
658+
Precision::Inexact(128)
659+
);
660+
}
661+
}

0 commit comments

Comments
 (0)