Skip to content

Commit 09ae8b0

Browse files
committed
fix: compose reorder and reverse as sequential steps instead of mutually exclusive
Previously reorder_by_statistics and reverse_row_groups were mutually exclusive (else-if). This meant DESC queries on unsorted data could only get one optimization. Now they compose: reorder always sorts RGs by min ASC, then reverse flips for DESC. This ensures correct results for both sorted and unsorted inputs without regression. Also removes prepare_with_optimizer in favor of calling optimize() directly on each optimizer, and simplifies reorder_by_statistics to always use min ASC (direction handled by reverse).
1 parent d128035 commit 09ae8b0

2 files changed

Lines changed: 71 additions & 76 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -353,20 +353,6 @@ impl ParquetAccessPlan {
353353
let row_selection = self.into_overall_row_selection(row_group_meta_data)?;
354354
PreparedAccessPlan::new(row_group_indexes, row_selection)
355355
}
356-
357-
/// Like [`prepare`](Self::prepare), but also applies an
358-
/// `AccessPlanOptimizer` to reorder/reverse row groups after
359-
/// preparing the plan.
360-
pub(crate) fn prepare_with_optimizer(
361-
self,
362-
row_group_meta_data: &[RowGroupMetaData],
363-
file_metadata: &ParquetMetaData,
364-
arrow_schema: &Schema,
365-
optimizer: &dyn crate::access_plan_optimizer::AccessPlanOptimizer,
366-
) -> Result<PreparedAccessPlan> {
367-
let plan = self.prepare(row_group_meta_data)?;
368-
optimizer.optimize(plan, file_metadata, arrow_schema)
369-
}
370356
}
371357

372358
/// Represents a prepared, fully resolved [`ParquetAccessPlan`]
@@ -436,8 +422,6 @@ impl PreparedAccessPlan {
436422
}
437423
};
438424

439-
let descending = first_sort_expr.options.descending;
440-
441425
// Build statistics converter for this column
442426
let converter = match StatisticsConverter::try_new(
443427
column.name(),
@@ -451,40 +435,31 @@ impl PreparedAccessPlan {
451435
}
452436
};
453437

454-
// Get the relevant statistics for the selected row groups.
455-
// For ASC sort: use min values — we want the RG with the smallest min
456-
// to come first (best candidate for "smallest values").
457-
// For DESC sort: use max values — we want the RG with the largest max
458-
// to come first (best candidate for "largest values"). Using min for
459-
// DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60
460-
// vs RG1 40-100 — RG1 has larger values but smaller min).
438+
// Always sort by min values in ASC order to align row groups with
439+
// the file's declared output ordering. Direction (DESC) is handled
440+
// separately by ReverseRowGroups which is applied AFTER reorder.
441+
//
442+
// This composable design avoids the problem where reorder(DESC)
443+
// followed by reverse would double-flip the order, and ensures
444+
// that for already-sorted data, reorder is a no-op and reverse
445+
// gives the correct DESC order (including placing small tail RGs first).
461446
let rg_metadata: Vec<&RowGroupMetaData> = self
462447
.row_group_indexes
463448
.iter()
464449
.map(|&idx| file_metadata.row_group(idx))
465450
.collect();
466451

467-
let stat_values = if descending {
468-
match converter.row_group_maxes(rg_metadata.iter().copied()) {
469-
Ok(vals) => vals,
470-
Err(e) => {
471-
debug!("Skipping RG reorder: cannot get max values: {e}");
472-
return Ok(self);
473-
}
474-
}
475-
} else {
476-
match converter.row_group_mins(rg_metadata.iter().copied()) {
477-
Ok(vals) => vals,
478-
Err(e) => {
479-
debug!("Skipping RG reorder: cannot get min values: {e}");
480-
return Ok(self);
481-
}
452+
let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
453+
Ok(vals) => vals,
454+
Err(e) => {
455+
debug!("Skipping RG reorder: cannot get min values: {e}");
456+
return Ok(self);
482457
}
483458
};
484459

485-
// Sort indices by statistic values (min for ASC, max for DESC)
460+
// Always sort ASC by min values — direction is handled by reverse
486461
let sort_options = arrow::compute::SortOptions {
487-
descending,
462+
descending: false,
488463
nulls_first: first_sort_expr.options.nulls_first,
489464
};
490465
let sorted_indices =
@@ -836,7 +811,11 @@ mod test {
836811
}
837812

838813
#[test]
839-
fn test_reorder_by_statistics_desc() {
814+
fn test_reorder_by_statistics_desc_sorts_asc() {
815+
// reorder_by_statistics always sorts by min ASC regardless of sort
816+
// direction. DESC is handled separately by ReverseRowGroups which
817+
// is applied after reorder in the optimizer pipeline.
818+
//
840819
// RGs: [50-99, 200-299, 1-30]
841820
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
842821
let schema = make_arrow_schema();
@@ -847,8 +826,9 @@ mod test {
847826
.reorder_by_statistics(&sort_order, &metadata, &schema)
848827
.unwrap();
849828

850-
// DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30)
851-
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
829+
// Always ASC by min: RG2(min=1), RG0(min=50), RG1(min=200)
830+
// Reverse is applied separately for DESC queries.
831+
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
852832
}
853833

854834
#[test]
@@ -949,17 +929,14 @@ mod test {
949929
}
950930

951931
#[test]
952-
fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() {
953-
// Overlapping ranges where min DESC would pick worse RG than max DESC:
954-
// RG0: 50-60 (small range, moderate max)
955-
// RG1: 40-100 (wide range, high max but lower min)
956-
// RG2: 20-30 (low max)
957-
//
958-
// For ORDER BY DESC LIMIT N:
959-
// Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only)
960-
// Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100)
932+
fn test_reorder_by_statistics_overlapping_rgs_sorts_asc() {
933+
// Overlapping ranges — reorder always uses min ASC:
934+
// RG0: 50-60
935+
// RG1: 40-100 (lower min, wider range)
936+
// RG2: 20-30 (lowest min)
961937
//
962-
// RG1 is the better first choice for DESC because it contains the largest values.
938+
// Sorted by min ASC: [RG2(20), RG1(40), RG0(50)]
939+
// For DESC queries, ReverseRowGroups is applied after to flip order.
963940
let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]);
964941
let schema = make_arrow_schema();
965942
let sort_order = make_sort_order_desc();
@@ -969,8 +946,8 @@ mod test {
969946
.reorder_by_statistics(&sort_order, &metadata, &schema)
970947
.unwrap();
971948

972-
// Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30)
973-
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
949+
// Always ASC by min: RG2(min=20), RG1(min=40), RG0(min=50)
950+
assert_eq!(plan.row_group_indexes, vec![2, 1, 0]);
974951
}
975952

976953
#[test]

datafusion/datasource-parquet/src/opener.rs

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,35 +1130,53 @@ impl RowGroupsPrunedParquetOpen {
11301130
);
11311131
}
11321132

1133-
// Build the access plan optimizer from sort pushdown hints.
1134-
// ReorderByStatistics is preferred (handles both ASC and DESC via
1135-
// min/max stats). ReverseRowGroups is a fallback when no statistics
1136-
// are available on the sort column.
1137-
let optimizer: Option<
1133+
// Row group ordering optimization (two composable steps):
1134+
//
1135+
// 1. reorder_by_statistics: sort RGs by min values (ASC) to align
1136+
// with the file's declared output ordering. This fixes out-of-order
1137+
// RGs (e.g., from append-heavy workloads) without changing direction.
1138+
// Skipped gracefully when statistics are unavailable.
1139+
//
1140+
// 2. reverse: flip the order for DESC queries. Applied AFTER reorder
1141+
// so the reversed order is correct whether or not reorder changed
1142+
// anything. Also handles row_selection remapping.
1143+
//
1144+
// For sorted data: reorder is a no-op, reverse gives perfect DESC.
1145+
// For unsorted data: reorder fixes the order, reverse flips for DESC.
1146+
let reorder_optimizer: Option<
11381147
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1139-
> = if let Some(sort_order) = &prepared.sort_order_for_reorder {
1140-
Some(Box::new(
1141-
crate::access_plan_optimizer::ReorderByStatistics::new(
1142-
sort_order.clone(),
1143-
),
1144-
))
1145-
} else if prepared.reverse_row_groups {
1148+
> = prepared.sort_order_for_reorder.as_ref().map(|sort_order| {
1149+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
1150+
sort_order.clone(),
1151+
)) as Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>
1152+
});
1153+
1154+
let reverse_optimizer: Option<
1155+
Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
1156+
> = if prepared.reverse_row_groups {
11461157
Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
11471158
} else {
11481159
None
11491160
};
11501161

1151-
// Prepare the access plan and apply row group optimizer if configured.
1152-
let prepared_plan = if let Some(opt) = &optimizer {
1153-
access_plan.prepare_with_optimizer(
1154-
rg_metadata,
1162+
// Prepare the access plan and apply optimizers in order:
1163+
// 1. reorder (fix out-of-order RGs to match declared ordering)
1164+
// 2. reverse (flip for DESC queries)
1165+
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
1166+
if let Some(opt) = &reorder_optimizer {
1167+
prepared_plan = opt.optimize(
1168+
prepared_plan,
11551169
file_metadata.as_ref(),
11561170
&prepared.physical_file_schema,
1157-
opt.as_ref(),
1158-
)?
1159-
} else {
1160-
access_plan.prepare(rg_metadata)?
1161-
};
1171+
)?;
1172+
}
1173+
if let Some(opt) = &reverse_optimizer {
1174+
prepared_plan = opt.optimize(
1175+
prepared_plan,
1176+
file_metadata.as_ref(),
1177+
&prepared.physical_file_schema,
1178+
)?;
1179+
}
11621180

11631181
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
11641182
let read_plan = build_projection_read_plan(

0 commit comments

Comments
 (0)