Skip to content

Commit fa4144d

Browse files
committed
Disable prune_by_limit when output ordering is required
When a parquet scan has output ordering constraints (e.g. ORDER BY + LIMIT), prune_by_limit must not discard partially-matched row groups. A partially- matched group may contain rows that sort before any fully-matched group, so skipping it returns incorrect results. This matches the fix in upstream DataFusion (apache#21190) where prune_by_limit is guarded by preserve_order. Add preserve_order: bool to ParquetOpener. Set to true when output_ordering is non-empty. Guard prune_by_limit with !preserve_order.
1 parent 939ab61 commit fa4144d

2 files changed

Lines changed: 144 additions & 2 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ pub(super) struct ParquetOpener {
118118
pub max_predicate_cache_size: Option<usize>,
119119
/// Whether to read row groups in reverse order
120120
pub reverse_row_groups: bool,
121+
/// When `true`, row ordering must be preserved — `prune_by_limit` must not
122+
/// discard partially-matched row groups because they may contain rows that
123+
/// sort before fully-matched groups.
124+
pub preserve_order: bool,
121125
}
122126

123127
/// Represents a prepared access plan with optional row selection
@@ -262,6 +266,7 @@ impl FileOpener for ParquetOpener {
262266
let enable_bloom_filter = self.enable_bloom_filter;
263267
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
264268
let limit = self.limit;
269+
let preserve_order = self.preserve_order;
265270

266271
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
267272
.global_counter("num_predicate_creation_errors");
@@ -523,8 +528,11 @@ impl FileOpener for ParquetOpener {
523528
.add_matched(n_remaining_row_groups);
524529
}
525530

526-
// Prune by limit
527-
if let Some(limit) = limit {
531+
// Prune by limit: only safe when order does not matter.
532+
// With preserve_order=true, partially-matched row groups may
533+
// contain rows that sort before fully-matched groups, so
534+
// discarding them would return incorrect results.
535+
if let (Some(limit), false) = (limit, preserve_order) {
528536
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
529537
}
530538

@@ -1076,6 +1084,7 @@ mod test {
10761084
coerce_int96: Option<arrow::datatypes::TimeUnit>,
10771085
max_predicate_cache_size: Option<usize>,
10781086
reverse_row_groups: bool,
1087+
preserve_order: bool,
10791088
}
10801089

10811090
impl ParquetOpenerBuilder {
@@ -1101,6 +1110,7 @@ mod test {
11011110
coerce_int96: None,
11021111
max_predicate_cache_size: None,
11031112
reverse_row_groups: false,
1113+
preserve_order: false,
11041114
}
11051115
}
11061116

@@ -1158,6 +1168,18 @@ mod test {
11581168
self
11591169
}
11601170

1171+
/// Set preserve_order flag. When true, prune_by_limit is disabled.
1172+
fn with_preserve_order(mut self, enable: bool) -> Self {
1173+
self.preserve_order = enable;
1174+
self
1175+
}
1176+
1177+
/// Set the limit.
1178+
fn with_limit(mut self, limit: Option<usize>) -> Self {
1179+
self.limit = limit;
1180+
self
1181+
}
1182+
11611183
/// Build the ParquetOpener instance.
11621184
///
11631185
/// # Panics
@@ -1208,6 +1230,7 @@ mod test {
12081230
encryption_factory: None,
12091231
max_predicate_cache_size: self.max_predicate_cache_size,
12101232
reverse_row_groups: self.reverse_row_groups,
1233+
preserve_order: self.preserve_order,
12111234
}
12121235
}
12131236
}
@@ -2196,5 +2219,123 @@ mod test {
21962219
"Output field should be nullable"
21972220
);
21982221
}
2222+
2223+
/// Regression test: `prune_by_limit` must be disabled when
2224+
/// `preserve_order = true` to avoid skipping partially-matched
2225+
/// row groups that contain rows sorting before fully-matched groups.
2226+
///
2227+
/// Layout (3 rows/RG, sorted `[a ASC]`):
2228+
/// RG0: a=[1,2,3] — partially matched for `a > 2`
2229+
/// RG1: a=[4,5,6] — fully matched for `a > 2`
2230+
///
2231+
/// `WHERE a > 2 LIMIT 1` should return `a=3` from RG0.
2232+
/// Without preserve_order, prune_by_limit discards RG0 and
2233+
/// returns `a=4` from RG1.
2234+
#[tokio::test]
2235+
async fn test_preserve_order_prevents_limit_pruning() {
2236+
use arrow::array::Int32Array;
2237+
use datafusion_common::ScalarValue;
2238+
use datafusion_expr::Operator;
2239+
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
2240+
use object_store::memory::InMemory;
2241+
use parquet::arrow::ArrowWriter;
2242+
use parquet::file::properties::WriterProperties;
2243+
2244+
let schema =
2245+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2246+
2247+
// Write parquet with 2 RGs of 3 rows each into memory buffer
2248+
let mut buf = Vec::new();
2249+
let props = WriterProperties::builder()
2250+
.set_max_row_group_size(3)
2251+
.build();
2252+
{
2253+
let mut writer =
2254+
ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
2255+
// RG0: partially matched for a > 2
2256+
writer
2257+
.write(
2258+
&RecordBatch::try_new(
2259+
schema.clone(),
2260+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
2261+
)
2262+
.unwrap(),
2263+
)
2264+
.unwrap();
2265+
// RG1: fully matched for a > 2
2266+
writer
2267+
.write(
2268+
&RecordBatch::try_new(
2269+
schema.clone(),
2270+
vec![Arc::new(Int32Array::from(vec![4, 5, 6]))],
2271+
)
2272+
.unwrap(),
2273+
)
2274+
.unwrap();
2275+
writer.close().unwrap();
2276+
}
2277+
2278+
let file_size = buf.len() as u64;
2279+
let store = Arc::new(InMemory::new());
2280+
let path = object_store::path::Path::from("test.parquet");
2281+
store.put(&path, buf.into()).await.unwrap();
2282+
2283+
// Predicate: a > 2
2284+
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2285+
Arc::new(Column::new("a", 0)),
2286+
Operator::Gt,
2287+
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
2288+
));
2289+
2290+
// Run query with preserve_order=false: prune_by_limit may skip RG0
2291+
let opener_unordered = ParquetOpenerBuilder::new()
2292+
.with_store(store.clone())
2293+
.with_schema(schema.clone())
2294+
.with_limit(Some(1))
2295+
.with_pushdown_filters(true)
2296+
.with_predicate(predicate.clone())
2297+
.with_row_group_stats_pruning(true)
2298+
.with_preserve_order(false)
2299+
.build();
2300+
2301+
let file = PartitionedFile::new(path.clone(), file_size);
2302+
let mut stream = opener_unordered.open(file).unwrap().await.unwrap();
2303+
let batch = stream.next().await.unwrap().unwrap();
2304+
let col = batch
2305+
.column(0)
2306+
.as_any()
2307+
.downcast_ref::<Int32Array>()
2308+
.unwrap();
2309+
assert_eq!(
2310+
col.value(0),
2311+
4,
2312+
"Without preserve_order, prune_by_limit skips partially-matched RG0"
2313+
);
2314+
2315+
// Run query with preserve_order=true: RG0 must be kept
2316+
let opener_ordered = ParquetOpenerBuilder::new()
2317+
.with_store(store)
2318+
.with_schema(schema)
2319+
.with_limit(Some(1))
2320+
.with_pushdown_filters(true)
2321+
.with_predicate(predicate)
2322+
.with_row_group_stats_pruning(true)
2323+
.with_preserve_order(true)
2324+
.build();
2325+
2326+
let file = PartitionedFile::new(path, file_size);
2327+
let mut stream = opener_ordered.open(file).unwrap().await.unwrap();
2328+
let batch = stream.next().await.unwrap().unwrap();
2329+
let col = batch
2330+
.column(0)
2331+
.as_any()
2332+
.downcast_ref::<Int32Array>()
2333+
.unwrap();
2334+
assert_eq!(
2335+
col.value(0),
2336+
3,
2337+
"With preserve_order, partially-matched RG0 is scanned first"
2338+
);
2339+
}
21992340
}
22002341
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ impl FileSource for ParquetSource {
567567
encryption_factory: self.get_encryption_factory_with_config(),
568568
max_predicate_cache_size: self.max_predicate_cache_size(),
569569
reverse_row_groups: self.reverse_row_groups,
570+
preserve_order: !base_config.output_ordering.is_empty(),
570571
});
571572
Ok(opener)
572573
}

0 commit comments

Comments
 (0)