Tolerate schema evolution in statistics merge (v2)#50
Tolerate schema evolution in statistics merge (v2)#50zhuqi-lucas wants to merge 7 commits intobranch-52from
Conversation
When parquet files have different column counts (e.g. newer files added columns via schema evolution), statistics merge now pads the shorter side with unknown column statistics instead of returning an error. Previously, `try_merge` rejected files with mismatched column counts with "Cannot merge statistics with different number of columns: N vs M". This broke any query touching listing tables with evolved schemas (not just materialization, but also ORDER BY, GROUP BY, etc.). Columns present in only some files will have Absent statistics after merge, which is correct — the optimizer cannot make assumptions about data ranges for columns that don't exist in all files.
Address Copilot review: when one side has fewer columns, explicitly merge with unknown statistics instead of skipping. This ensures Exact.min(Absent) = Absent regardless of file iteration order, making the merge deterministic.
try_merge_iter now pads the final result to match schema.fields().len(). This handles cases where all files have fewer columns than the table schema (e.g. schema evolution added columns, or partition columns are included in the schema but not in file statistics). Without this, downstream code expecting table-schema-aligned statistics would see fewer columns than expected.
try_merge_iter should not pad statistics to table schema column count because the schema may include partition columns that are not part of file statistics. Padding would break downstream code that computes partition column indices via `i - column_statistics.len()`. The try_merge level pad-to-max already handles schema evolution (different file column counts). Partition column statistics are the caller's responsibility.
There was a problem hiding this comment.
Pull request overview
This PR updates DataFusion’s statistics merging to tolerate Parquet schema evolution where different files can have different numbers of columns, preventing planning failures when listing tables span multiple schema versions.
Changes:
- Update
Statistics::try_mergeto pad the shorter side withColumnStatistics::new_unknown()instead of erroring on mismatched column counts. - Ensure merging is symmetric with respect to file iteration order when columns are missing on one side.
- Add/adjust unit tests covering mismatched sizes and schema-evolution scenarios.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn test_try_merge_iter_does_not_pad_for_partition_columns() { | ||
| // Schema includes partition column; files don't have stats for it. | ||
| // try_merge_iter should NOT pad to schema — partition columns are | ||
| // handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs). |
There was a problem hiding this comment.
The comment references “Atlas's plan.rs”, but there’s no corresponding code in this repo and it’s unclear to DataFusion contributors. Please replace this with an in-repo reference (e.g. the DataFusion module that pads/handles partition column statistics) or remove the external mention.
| // handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs). | |
| // expected to be handled by higher-level planning/statistics code. |
| // Tolerate schema evolution: when files have different column counts | ||
| // (e.g. newer files added columns), pad the shorter side with unknown | ||
| // statistics so the merge can proceed. Columns present in only some | ||
| // files will have Absent statistics after the merge. | ||
| let max_cols = column_statistics.len().max(other.column_statistics.len()); | ||
| column_statistics.resize(max_cols, ColumnStatistics::new_unknown()); | ||
|
|
There was a problem hiding this comment.
Statistics::try_merge no longer errors when column_statistics lengths differ, but the function docs still say it returns an error for mismatched schemas. Please update the doc comment to reflect the new schema-evolution behavior (and what kinds of mismatches, if any, still produce an error).
| fn test_try_merge_iter_pads_to_schema() { | ||
| // Table schema has 3 columns but all files only have 2. | ||
| // Merged stats should be padded to 3 columns. | ||
| let schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, false), | ||
| Field::new("b", DataType::Int32, false), | ||
| Field::new("c", DataType::Int32, true), // new column, no files have it | ||
| ])); |
There was a problem hiding this comment.
This test’s name/comment says merge_iter “pads to schema”, but the assertions verify the opposite (column_statistics.len() remains 2, not 3). Please rename/update the test (and its comments) so it matches the intended behavior described in the PR (no padding to table schema / partition columns handled elsewhere).
When files have different column counts (schema evolution), `s.column_statistics.len()` varies per file. Using it to determine the boundary between file columns and partition columns causes partition values to be misindexed — a file with more columns would treat the partition column index as a file column index. Fix: compute the file column count from `projected_schema.fields().len() - partition_values.len()`, which is constant across all files.
When a hive-partitioned listing table has stale files in the root directory (not inside any partition path), these files have no partition values. Previously they were included with empty partition_values, causing "Unable to get field named" errors when queries reference partition columns. Now try_into_partitioned_file returns None for files that don't match the partition structure, and the caller skips them.
62282b8 to
c7ba34f
Compare
Summary
Fix
Error during planning: Cannot merge statistics with different number of columns: N vs Mwhen parquet files have different column counts due to schema evolution.Changes
try_merge: pads shorter side withColumnStatistics::new_unknown()instead of returning errorTests (10 passing)
test_try_merge_mismatched_size_pads_shortertest_try_merge_schema_evolution_new_columns(2 cols + 3 cols)test_try_merge_schema_evolution_reversed_order(symmetric)test_try_merge_same_column_count_unchangedtest_try_merge_iter_pads_to_schema(files < schema cols)test_try_merge_iter_does_not_pad_for_partition_columns