Skip to content

Commit e4e11b2

Browse files
committed
perf: move stats init before RG pruning so first file also benefits
Move try_init_topk_threshold() from build_stream() to prune_row_groups(), before prune_by_statistics(). This way: - File 1: stats init sets threshold from ALL its RG statistics, then prune_by_statistics uses it to prune file 1's own RGs. Only the best RG(s) are read, rest skipped with zero I/O. - File 2+: dynamic filter already has tight threshold from file 1, most RGs pruned immediately. This effectively achieves dynamic RG pruning without needing morsel- level scheduling — the threshold is computed from statistics (no data read), then used to prune RGs in the same file.
1 parent bccc42b commit e4e11b2

1 file changed

Lines changed: 16 additions & 17 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,22 @@ impl FiltersPreparedParquetOpen {
905905
row_groups.prune_by_range(rg_metadata, range);
906906
}
907907

908+
// Initialize TopK dynamic filter threshold from row group statistics
909+
// BEFORE row group pruning, so that this file's own RGs can be pruned
910+
// by the threshold. For the first file, this sets the initial threshold;
911+
// for subsequent files, the threshold is already set by earlier files.
912+
if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit)
913+
&& let Err(e) = try_init_topk_threshold(
914+
predicate,
915+
limit,
916+
rg_metadata,
917+
&prepared.physical_file_schema,
918+
loaded.reader_metadata.parquet_schema(),
919+
)
920+
{
921+
debug!("Skipping TopK threshold initialization from statistics: {e}");
922+
}
923+
908924
// If there is a predicate that can be evaluated against the metadata
909925
if let Some(predicate) = self.pruning_predicate.as_ref().map(|p| p.as_ref()) {
910926
if prepared.enable_row_group_stats_pruning {
@@ -1087,23 +1103,6 @@ impl RowGroupsPrunedParquetOpen {
10871103
let file_metadata = Arc::clone(reader_metadata.metadata());
10881104
let rg_metadata = file_metadata.row_groups();
10891105

1090-
// Initialize TopK dynamic filter from row group statistics.
1091-
// This sets an initial threshold before any data is read, so that
1092-
// subsequent row filtering can benefit immediately.
1093-
// Sort direction is read from the DynamicFilterPhysicalExpr's
1094-
// sort_options (set by SortExec for TopK queries).
1095-
if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit)
1096-
&& let Err(e) = try_init_topk_threshold(
1097-
predicate,
1098-
limit,
1099-
rg_metadata,
1100-
&prepared.physical_file_schema,
1101-
reader_metadata.parquet_schema(),
1102-
)
1103-
{
1104-
debug!("Skipping TopK threshold initialization from statistics: {e}");
1105-
}
1106-
11071106
// Filter pushdown: evaluate predicates during scan
11081107
let row_filter = if let Some(predicate) = prepared
11091108
.pushdown_filters

0 commit comments

Comments
 (0)