Skip to content

Commit 09b7d33

Browse files
committed
Disable prune_by_limit when output ordering is required
When a parquet scan has output ordering constraints (e.g. ORDER BY + LIMIT), prune_by_limit must not discard partially-matched row groups. A partially- matched group may contain rows that sort before any fully-matched group, so skipping it returns incorrect results. This matches the fix in upstream DataFusion (apache#21190) where prune_by_limit is guarded by preserve_order. Add preserve_order: bool to ParquetOpener. Set to true when output_ordering is non-empty. Guard prune_by_limit with !preserve_order.
1 parent 939ab61 commit 09b7d33

2 files changed

Lines changed: 14 additions & 2 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ pub(super) struct ParquetOpener {
118118
pub max_predicate_cache_size: Option<usize>,
119119
/// Whether to read row groups in reverse order
120120
pub reverse_row_groups: bool,
121+
/// When `true`, row ordering must be preserved — `prune_by_limit` must not
122+
/// discard partially-matched row groups because they may contain rows that
123+
/// sort before fully-matched groups.
124+
pub preserve_order: bool,
121125
}
122126

123127
/// Represents a prepared access plan with optional row selection
@@ -262,6 +266,7 @@ impl FileOpener for ParquetOpener {
262266
let enable_bloom_filter = self.enable_bloom_filter;
263267
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
264268
let limit = self.limit;
269+
let preserve_order = self.preserve_order;
265270

266271
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
267272
.global_counter("num_predicate_creation_errors");
@@ -523,8 +528,11 @@ impl FileOpener for ParquetOpener {
523528
.add_matched(n_remaining_row_groups);
524529
}
525530

526-
// Prune by limit
527-
if let Some(limit) = limit {
531+
// Prune by limit: only safe when order does not matter.
532+
// With preserve_order=true, partially-matched row groups may
533+
// contain rows that sort before fully-matched groups, so
534+
// discarding them would return incorrect results.
535+
if let (Some(limit), false) = (limit, preserve_order) {
528536
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
529537
}
530538

@@ -1076,6 +1084,7 @@ mod test {
10761084
coerce_int96: Option<arrow::datatypes::TimeUnit>,
10771085
max_predicate_cache_size: Option<usize>,
10781086
reverse_row_groups: bool,
1087+
preserve_order: bool,
10791088
}
10801089

10811090
impl ParquetOpenerBuilder {
@@ -1101,6 +1110,7 @@ mod test {
11011110
coerce_int96: None,
11021111
max_predicate_cache_size: None,
11031112
reverse_row_groups: false,
1113+
preserve_order: false,
11041114
}
11051115
}
11061116

@@ -1208,6 +1218,7 @@ mod test {
12081218
encryption_factory: None,
12091219
max_predicate_cache_size: self.max_predicate_cache_size,
12101220
reverse_row_groups: self.reverse_row_groups,
1221+
preserve_order: self.preserve_order,
12111222
}
12121223
}
12131224
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ impl FileSource for ParquetSource {
567567
encryption_factory: self.get_encryption_factory_with_config(),
568568
max_predicate_cache_size: self.max_predicate_cache_size(),
569569
reverse_row_groups: self.reverse_row_groups,
570+
preserve_order: !base_config.output_ordering.is_empty(),
570571
});
571572
Ok(opener)
572573
}

0 commit comments

Comments
 (0)