Skip to content

Commit 88bdaac

Browse files
committed
feat: reorder files in shared work queue by statistics for TopK
Add FileSource::reorder_files() trait method and ParquetSource implementation that sorts files by column statistics before placing them in the shared work queue. For DESC queries, files with the highest min value come first; for ASC, lowest max first. This ensures the first file read by any partition is the globally optimal one for TopK threshold convergence, complementing the intra-file RG reorder.
1 parent be7d7b1 commit 88bdaac

3 files changed

Lines changed: 103 additions & 1 deletion

File tree

datafusion/datasource-parquet/src/source.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,24 @@ impl ParquetSource {
485485
pub(crate) fn reverse_row_groups(&self) -> bool {
486486
self.reverse_row_groups
487487
}
488+
489+
/// Extract the sort key from a file's statistics for reordering.
490+
///
491+
/// For DESC sorts, returns the column's min_value (we want highest min first).
492+
/// For ASC sorts, returns the column's max_value (we want lowest max first).
493+
fn sort_key_for_file(
494+
file: &datafusion_datasource::PartitionedFile,
495+
col_idx: usize,
496+
descending: bool,
497+
) -> Option<datafusion_common::ScalarValue> {
498+
let stats = file.statistics.as_ref()?;
499+
let col_stats = stats.column_statistics.get(col_idx)?;
500+
if descending {
501+
col_stats.min_value.get_value().cloned()
502+
} else {
503+
col_stats.max_value.get_value().cloned()
504+
}
505+
}
488506
}
489507

490508
/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
@@ -587,6 +605,67 @@ impl FileSource for ParquetSource {
587605
}))
588606
}
589607

608+
fn reorder_files(
609+
&self,
610+
mut files: Vec<datafusion_datasource::PartitionedFile>,
611+
) -> Vec<datafusion_datasource::PartitionedFile> {
612+
let sort_order = match &self.sort_order_for_reorder {
613+
Some(order) if !order.is_empty() => order,
614+
_ => return files,
615+
};
616+
617+
// We only handle single-column sort for now
618+
let first_expr = sort_order.first();
619+
620+
// The sort expression must be a Column so we can look up statistics
621+
let col: &datafusion_physical_expr::expressions::Column =
622+
match first_expr.expr.downcast_ref() {
623+
Some(col) => col,
624+
None => return files,
625+
};
626+
627+
let col_name = col.name();
628+
let descending = self.reverse_row_groups;
629+
630+
// Find the column index in the table schema
631+
let table_schema = self.table_schema.table_schema();
632+
let col_idx = match table_schema.index_of(col_name) {
633+
Ok(idx) => idx,
634+
Err(_) => return files,
635+
};
636+
637+
// Stable sort: files with usable stats first, ordered by the
638+
// relevant bound; files without stats go to the end.
639+
files.sort_by(|a, b| {
640+
let key_a = Self::sort_key_for_file(a, col_idx, descending);
641+
let key_b = Self::sort_key_for_file(b, col_idx, descending);
642+
match (key_a, key_b) {
643+
(Some(va), Some(vb)) => {
644+
if descending {
645+
// DESC: highest min first (reverse order)
646+
vb.partial_cmp(&va).unwrap_or(std::cmp::Ordering::Equal)
647+
} else {
648+
// ASC: lowest max first
649+
va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal)
650+
}
651+
}
652+
// Files without stats go to the end
653+
(Some(_), None) => std::cmp::Ordering::Less,
654+
(None, Some(_)) => std::cmp::Ordering::Greater,
655+
(None, None) => std::cmp::Ordering::Equal,
656+
}
657+
});
658+
659+
log::debug!(
660+
"Reordered {} files by {} {} for TopK optimization",
661+
files.len(),
662+
col_name,
663+
if descending { "DESC" } else { "ASC" }
664+
);
665+
666+
files
667+
}
668+
590669
fn table_schema(&self) -> &TableSchema {
591670
&self.table_schema
592671
}

datafusion/datasource/src/file.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,19 @@ pub trait FileSource: Any + Send + Sync {
280280
Ok(SortOrderPushdownResult::Unsupported)
281281
}
282282

283+
/// Reorder files in the shared work queue to optimize query performance.
284+
///
285+
/// For example, TopK queries benefit from reading files with the best
286+
/// statistics first, so the dynamic filter threshold tightens quickly.
287+
///
288+
/// The default implementation returns files unchanged (no reordering).
289+
fn reorder_files(
290+
&self,
291+
files: Vec<crate::PartitionedFile>,
292+
) -> Vec<crate::PartitionedFile> {
293+
files
294+
}
295+
283296
/// Try to push down a projection into this FileSource.
284297
///
285298
/// `FileSource` implementations that support projection pushdown should

datafusion/datasource/src/file_stream/work_source.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,18 @@ impl SharedWorkSource {
8585
}
8686

8787
/// Create a shared work source for the unopened files in `config`.
88+
///
89+
/// Files are reordered by the file source (e.g. by statistics for TopK)
90+
/// before being placed in the shared queue.
8891
pub(crate) fn from_config(config: &FileScanConfig) -> Self {
89-
Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned())
92+
let files: Vec<_> = config
93+
.file_groups
94+
.iter()
95+
.flat_map(FileGroup::iter)
96+
.cloned()
97+
.collect();
98+
let files = config.file_source.reorder_files(files);
99+
Self::new(files)
90100
}
91101

92102
/// Pop the next file from the shared work queue.

0 commit comments

Comments
 (0)