Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 166 additions & 11 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit(
file.statistics = Some(Arc::clone(&file_stats));
result_files.push(file);

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
if collect_stats {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing the returned summary here. I think this introduces a separate issue in the limit path though.

get_statistics_with_limit still relies on num_rows to decide whether it can stop early. With collect_stats=false, num_rows now stays Absent, so conservative_num_rows falls back to usize::MIN and we keep scanning the stream instead of stopping early.

Before this change, a known first-file row count could still let us stop after the first file. With this patch, even something like limit=0 can continue reading later files.

Could we preserve a separate row-count accumulator, or at least keep the first-file row count around for the limit check, while still returning a bare Statistics value when collect_stats is false? I think we should also add a regression test for collect_stats=false with a limit so this behavior is locked in.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small refactor suggestion. It may be worth extracting the "seed stats from first file" logic into a helper that can optionally populate the summary accumulator.

This block and the later merge loop now both need to stay in sync on fields like byte_size and the sum_value casting. Pulling that into one place would make future stats-field additions a bit less likely to drift.

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
col_stats_set[index].byte_size = file_column.byte_size;
}
}

// If the number of rows exceeds the limit, we can stop processing
Expand Down Expand Up @@ -520,6 +523,39 @@ mod tests {
}
}

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
}

fn make_file_stats(
num_rows: usize,
total_byte_size: usize,
col_stats: ColumnStatistics,
) -> Arc<Statistics> {
Arc::new(Statistics {
num_rows: Precision::Exact(num_rows),
total_byte_size: Precision::Exact(total_byte_size),
column_statistics: vec![col_stats],
})
}

fn rich_col_stats(
null_count: usize,
min: i64,
max: i64,
sum: i64,
byte_size: usize,
) -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
distinct_count: Precision::Absent,
sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
byte_size: Precision::Exact(byte_size),
}
}

#[tokio::test]
#[expect(deprecated)]
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
Expand All @@ -533,7 +569,7 @@ mod tests {
))]);

let (_group, stats) =
get_statistics_with_limit(files, schema, None, false).await?;
get_statistics_with_limit(files, schema, None, true).await?;

assert_eq!(
stats.column_statistics[0].sum_value,
Expand Down Expand Up @@ -571,4 +607,123 @@ mod tests {

Ok(())
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
)),
]);

let (_files, statistics) =
get_statistics_with_limit(all_files, test_schema(), None, false)
.await
.unwrap();

assert_eq!(statistics.num_rows, Precision::Absent);
assert_eq!(statistics.total_byte_size, Precision::Absent);
assert_eq!(statistics.column_statistics.len(), 1);
assert_eq!(
statistics.column_statistics[0].null_count,
Precision::Absent
);
assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
)),
]);

let (_files, statistics) =
get_statistics_with_limit(all_files, test_schema(), None, true)
.await
.unwrap();

assert_eq!(statistics.num_rows, Precision::Exact(15));
assert_eq!(statistics.total_byte_size, Precision::Exact(150));
assert_eq!(
statistics.column_statistics[0].null_count,
Precision::Exact(3)
);
assert_eq!(
statistics.column_statistics[0].min_value,
Precision::Exact(ScalarValue::Int64(Some(1)))
);
assert_eq!(
statistics.column_statistics[0].max_value,
Precision::Exact(ScalarValue::Int64(Some(99)))
);
assert_eq!(
statistics.column_statistics[0].sum_value,
Precision::Exact(ScalarValue::Int64(Some(315)))
);
assert_eq!(
statistics.column_statistics[0].byte_size,
Precision::Exact(192)
);
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
)),
Ok((
PartitionedFile::new("third.parquet", 20),
make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
)),
]);

let (files, statistics) =
get_statistics_with_limit(all_files, test_schema(), Some(8), true)
.await
.unwrap();

assert_eq!(files.len(), 2);
assert_eq!(statistics.num_rows, Precision::Inexact(10));
assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
assert_eq!(
statistics.column_statistics[0].min_value,
Precision::Inexact(ScalarValue::Int64(Some(1)))
);
assert_eq!(
statistics.column_statistics[0].max_value,
Precision::Inexact(ScalarValue::Int64(Some(10)))
);
assert_eq!(
statistics.column_statistics[0].sum_value,
Precision::Inexact(ScalarValue::Int64(Some(55)))
);
assert_eq!(
statistics.column_statistics[0].byte_size,
Precision::Inexact(128)
);
}
}
Loading