Skip to content

Commit 3a5b41c

Browse files
alambadriangbclaude
authored
[branch-52] fix: validate inter-file ordering in eq_properties() (#20329) (#20509)
- Part of #20287 - Closes #20508 on branch-52 This PR simply - Backports #20329 from @adriangb to the branch-52 line I simply cherry-picked 53b0ffb to the branch-52 branch ```shell andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ git cherry-pick 53b0ffb Auto-merging datafusion/core/tests/physical_optimizer/partition_statistics.rs Auto-merging datafusion/datasource/src/file_scan_config.rs [alamb/backport_20329 9286563] fix: validate inter-file ordering in eq_properties() (#20329) Author: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat Feb 14 14:04:01 2026 -0500 5 files changed, 660 insertions(+), 47 deletions(-) ``` Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 016e2ae commit 3a5b41c

5 files changed

Lines changed: 660 additions & 47 deletions

File tree

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ mod test {
864864
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
865865
assert_snapshot!(
866866
plan_string,
867-
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
867+
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
868868
);
869869

870870
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 118 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ impl DataSource for FileScanConfig {
665665
let schema = self.file_source.table_schema().table_schema();
666666
let mut eq_properties = EquivalenceProperties::new_with_orderings(
667667
Arc::clone(schema),
668-
self.output_ordering.clone(),
668+
self.validated_output_ordering(),
669669
)
670670
.with_constraints(self.constraints.clone());
671671

@@ -853,6 +853,40 @@ impl DataSource for FileScanConfig {
853853
}
854854

855855
impl FileScanConfig {
856+
/// Returns only the output orderings that are validated against actual
857+
/// file group statistics.
858+
///
859+
/// For example, individual files may be ordered by `col1 ASC`,
860+
/// but if we have files with these min/max statistics in a single partition / file group:
861+
///
862+
/// - file1: min(col1) = 10, max(col1) = 20
863+
/// - file2: min(col1) = 5, max(col1) = 15
864+
///
865+
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
866+
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
867+
///
868+
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
869+
///
870+
/// - file1: min(col1) = 20, max(col1) = 30
871+
/// - file2: min(col1) = 10, max(col1) = 15
872+
///
873+
/// On the other hand if we had:
874+
///
875+
/// - file1: min(col1) = 5, max(col1) = 15
876+
/// - file2: min(col1) = 16, max(col1) = 25
877+
///
878+
/// Then we know that reading file1 followed by file2 will produce ordered output,
879+
/// so `col1 ASC` would be retained.
880+
///
881+
/// Note that we are checking for ordering *within* *each* file group / partition,
882+
/// files in different partitions are read independently and do not affect each other's ordering.
883+
/// Merging of the multiple partition streams into a single ordered stream is handled
884+
/// upstream e.g. by `SortPreservingMergeExec`.
885+
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
886+
let schema = self.file_source.table_schema().table_schema();
887+
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
888+
}
889+
856890
/// Get the file schema (schema of the files without partition columns)
857891
pub fn file_schema(&self) -> &SchemaRef {
858892
self.file_source.table_schema().file_schema()
@@ -1202,6 +1236,51 @@ fn ordered_column_indices_from_projection(
12021236
.collect::<Option<Vec<usize>>>()
12031237
}
12041238

1239+
/// Check whether a given ordering is valid for all file groups by verifying
1240+
/// that files within each group are sorted according to their min/max statistics.
1241+
///
1242+
/// For single-file (or empty) groups, the ordering is trivially valid.
1243+
/// For multi-file groups, we check that the min/max statistics for the sort
1244+
/// columns are in order and non-overlapping (or touching at boundaries).
1245+
///
1246+
/// `projection` maps projected column indices back to table-schema indices
1247+
/// when validating after projection; pass `None` when validating at
1248+
/// table-schema level.
1249+
fn is_ordering_valid_for_file_groups(
1250+
file_groups: &[FileGroup],
1251+
ordering: &LexOrdering,
1252+
schema: &SchemaRef,
1253+
projection: Option<&[usize]>,
1254+
) -> bool {
1255+
file_groups.iter().all(|group| {
1256+
if group.len() <= 1 {
1257+
return true; // single-file groups are trivially sorted
1258+
}
1259+
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1260+
{
1261+
Ok(stats) => stats.is_sorted(),
1262+
Err(_) => false, // can't prove sorted → reject
1263+
}
1264+
})
1265+
}
1266+
1267+
/// Filters orderings to retain only those valid for all file groups,
1268+
/// verified via min/max statistics.
1269+
fn validate_orderings(
1270+
orderings: &[LexOrdering],
1271+
schema: &SchemaRef,
1272+
file_groups: &[FileGroup],
1273+
projection: Option<&[usize]>,
1274+
) -> Vec<LexOrdering> {
1275+
orderings
1276+
.iter()
1277+
.filter(|ordering| {
1278+
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1279+
})
1280+
.cloned()
1281+
.collect()
1282+
}
1283+
12051284
/// The various listing tables does not attempt to read all files
12061285
/// concurrently, instead they will read files in sequence within a
12071286
/// partition. This is an important property as it allows plans to
@@ -1268,52 +1347,47 @@ fn get_projected_output_ordering(
12681347
let projected_orderings =
12691348
project_orderings(&base_config.output_ordering, projected_schema);
12701349

1271-
let mut all_orderings = vec![];
1272-
for new_ordering in projected_orderings {
1273-
// Check if any file groups are not sorted
1274-
if base_config.file_groups.iter().any(|group| {
1275-
if group.len() <= 1 {
1276-
// File groups with <= 1 files are always sorted
1277-
return false;
1278-
}
1279-
1280-
let Some(indices) = base_config
1281-
.file_source
1282-
.projection()
1283-
.as_ref()
1284-
.map(|p| ordered_column_indices_from_projection(p))
1285-
else {
1286-
// Can't determine if ordered without a simple projection
1287-
return true;
1288-
};
1289-
1290-
let statistics = match MinMaxStatistics::new_from_files(
1291-
&new_ordering,
1350+
let indices = base_config
1351+
.file_source
1352+
.projection()
1353+
.as_ref()
1354+
.map(|p| ordered_column_indices_from_projection(p));
1355+
1356+
match indices {
1357+
Some(Some(indices)) => {
1358+
// Simple column projection — validate with statistics
1359+
validate_orderings(
1360+
&projected_orderings,
12921361
projected_schema,
1293-
indices.as_deref(),
1294-
group.iter(),
1295-
) {
1296-
Ok(statistics) => statistics,
1297-
Err(e) => {
1298-
log::trace!("Error fetching statistics for file group: {e}");
1299-
// we can't prove that it's ordered, so we have to reject it
1300-
return true;
1301-
}
1302-
};
1303-
1304-
!statistics.is_sorted()
1305-
}) {
1306-
debug!(
1307-
"Skipping specified output ordering {:?}. \
1308-
Some file groups couldn't be determined to be sorted: {:?}",
1309-
base_config.output_ordering[0], base_config.file_groups
1310-
);
1311-
continue;
1362+
&base_config.file_groups,
1363+
Some(indices.as_slice()),
1364+
)
1365+
}
1366+
None => {
1367+
// No projection — validate with statistics (no remapping needed)
1368+
validate_orderings(
1369+
&projected_orderings,
1370+
projected_schema,
1371+
&base_config.file_groups,
1372+
None,
1373+
)
1374+
}
1375+
Some(None) => {
1376+
// Complex projection (expressions, not simple columns) — can't
1377+
// determine column indices for statistics. Still valid if all
1378+
// file groups have at most one file.
1379+
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1380+
projected_orderings
1381+
} else {
1382+
debug!(
1383+
"Skipping specified output orderings. \
1384+
Some file groups couldn't be determined to be sorted: {:?}",
1385+
base_config.file_groups
1386+
);
1387+
vec![]
1388+
}
13121389
}
1313-
1314-
all_orderings.push(new_ordering);
13151390
}
1316-
all_orderings
13171391
}
13181392

13191393
/// Convert type to a type suitable for use as a `ListingTable`

datafusion/datasource/src/statistics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,12 @@ impl MinMaxStatistics {
266266
}
267267

268268
/// Check if the min/max statistics are in order and non-overlapping
269+
/// (or touching at boundaries)
269270
pub fn is_sorted(&self) -> bool {
270271
self.max_by_sort_order
271272
.iter()
272273
.zip(self.min_by_sort_order.iter().skip(1))
273-
.all(|(max, next_min)| max < next_min)
274+
.all(|(max, next_min)| max <= next_min)
274275
}
275276
}
276277

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,4 @@ logical_plan
274274
02)--TableScan: test_table projection=[constant_col]
275275
physical_plan
276276
01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST]
277-
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet
277+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet

0 commit comments

Comments
 (0)