Skip to content

Commit 65f15e1

Browse files
adriangbclaude
andcommitted
feat: adaptive filter selectivity tracking for Parquet pushdown
Introduces a runtime adaptive filter selectivity tracking system for Parquet pushdown. Each filter is monitored with Welford online stats and moves through a state machine: New -> RowFilter|PostScan -> (promoted / demoted / dropped). Key changes: - New selectivity.rs module (SelectivityTracker, TrackerConfig, SelectivityStats, FilterState, PartitionedFilters, FilterId). - New OptionalFilterPhysicalExpr wrapper in physical_expr_common. HashJoinExec wraps dynamic join filters in it. - Removes reorder_filters config + supporting code. - Adds filter_pushdown_min_bytes_per_sec, filter_collecting_byte_ratio_threshold, filter_confidence_z config. - Predicate storage: Option<Arc<PhysicalExpr>> -> Option<Vec<(FilterId, Arc<PhysicalExpr>)>> on ParquetSource/ParquetOpener. - build_row_filter takes Vec<(FilterId,...)> + SelectivityTracker, returns RowFilterWithMetrics. DatafusionArrowPredicate reports per-batch stats back to the tracker. - ParquetOpener calls tracker.partition_filters() and apply_post_scan_filters_with_stats; records filter_apply_time. - Proto reserves tag 6 (was reorder_filters); adds 3 new optional fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ec00112 commit 65f15e1

39 files changed

Lines changed: 3225 additions & 384 deletions

File tree

benchmarks/bench.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ run_clickbench_partitioned() {
805805
run_clickbench_pushdown() {
806806
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
807807
echo "RESULTS_FILE: ${RESULTS_FILE}"
808-
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
808+
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true..."
809809
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
810810
}
811811

benchmarks/src/clickbench.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ pub struct RunOpt {
5858
///
5959
/// Specifically, it enables:
6060
/// * `pushdown_filters = true`
61-
/// * `reorder_filters = true`
6261
#[arg(long = "pushdown")]
6362
pushdown: bool,
6463

@@ -196,14 +195,12 @@ impl RunOpt {
196195
// Turn on Parquet filter pushdown if requested
197196
if self.pushdown {
198197
parquet_options.pushdown_filters = true;
199-
parquet_options.reorder_filters = true;
200198
}
201199

202200
if self.sorted_by.is_some() {
203201
// We should compare the dynamic topk optimization when data is sorted, so we make the
204202
// assumption that filter pushdown is also enabled in this case.
205203
parquet_options.pushdown_filters = true;
206-
parquet_options.reorder_filters = true;
207204
}
208205
}
209206

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ pub async fn json_shredding() -> Result<()> {
9090
store.put(&path, payload).await?;
9191

9292
// Set up query execution
93-
let mut cfg = SessionConfig::new();
93+
let mut cfg = SessionConfig::default().set(
94+
"datafusion.execution.parquet.filter_pushdown_min_bytes_per_sec",
95+
&ScalarValue::Float64(Some(0.0)),
96+
);
9497
cfg.options_mut().execution.parquet.pushdown_filters = true;
9598
let ctx = SessionContext::new_with_config(cfg);
9699
ctx.runtime_env().register_object_store(

datafusion/common/src/config.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -868,11 +868,6 @@ config_namespace! {
868868
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
869869
pub pushdown_filters: bool, default = false
870870

871-
/// (reading) If true, filter expressions evaluated during the parquet decoding operation
872-
/// will be reordered heuristically to minimize the cost of evaluation. If false,
873-
/// the filters are applied in the same order as written in the query
874-
pub reorder_filters: bool, default = false
875-
876871
/// (reading) Force the use of RowSelections for filter results, when
877872
/// pushdown_filters is enabled. If false, the reader will automatically
878873
/// choose between a RowSelection and a Bitmap based on the number and
@@ -910,6 +905,49 @@ config_namespace! {
910905
/// parquet reader setting. 0 means no caching.
911906
pub max_predicate_cache_size: Option<usize>, default = None
912907

908+
/// (reading) Minimum bytes/sec throughput for adaptive filter pushdown.
909+
/// Filters that achieve at least this throughput (bytes_saved / eval_time)
910+
/// are promoted to row filters.
911+
/// f64::INFINITY = no filters promoted (feature disabled).
912+
/// 0.0 = all filters pushed as row filters (no adaptive logic).
913+
/// Default: 104,857,600 bytes/sec (100 MiB/sec), empirically chosen based on
914+
/// TPC-H, TPC-DS, and ClickBench benchmarks on an m4 MacBook Pro.
915+
/// The optimal value for this setting likely depends on the relative
916+
/// cost of CPU vs. IO in your environment, and to some extent the shape
917+
/// of your query.
918+
///
919+
/// **Interaction with `pushdown_filters`:**
920+
/// This option only takes effect when `pushdown_filters = true`.
921+
/// When pushdown is disabled, all filters run post-scan and this
922+
/// threshold is ignored.
923+
pub filter_pushdown_min_bytes_per_sec: f64, default = 104_857_600.0
924+
925+
/// (reading) Byte-ratio threshold for applying filters one at a time
926+
/// (iterative pruning; aka row-level) vs. all at once (post-scan).
927+
/// The ratio is computed as: (extra filter bytes not in projection) / (projected bytes).
928+
/// Filters whose extra columns consume a smaller fraction than this threshold are placed as row filters.
929+
/// Ratio of filter column bytes to projection bytes that controls
930+
/// initial filter placement. Computed as
931+
/// `filter_compressed_bytes / projection_compressed_bytes`.
932+
/// Filters below this ratio start as row-level filters (enabling late
933+
/// materialization); those above start as post-scan filters.
934+
/// Default: 0.20 — filters whose columns are less than 20% of the
935+
/// projection bytes start at row-level.
936+
///
937+
/// **Interaction with `pushdown_filters`:**
938+
/// Only takes effect when `pushdown_filters = true`.
939+
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20
940+
941+
/// (reading) Z-score for confidence intervals on filter effectiveness.
942+
/// Controls how much statistical evidence is required before promoting
943+
/// or demoting a filter. Lower values = faster decisions with less
944+
/// confidence. Higher values = more conservative, requiring more data.
945+
/// Default: 2.0 (~95% confidence).
946+
///
947+
/// **Interaction with `pushdown_filters`:**
948+
/// Only takes effect when `pushdown_filters = true`.
949+
pub filter_confidence_z: f64, default = 2.0
950+
913951
// The following options affect writing to parquet files
914952
// and map to parquet::file::properties::WriterProperties
915953

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl ParquetOptions {
199199
skip_metadata: _,
200200
metadata_size_hint: _,
201201
pushdown_filters: _,
202-
reorder_filters: _,
202+
203203
force_filter_selections: _, // not used for writer props
204204
allow_single_file_parallelism: _,
205205
maximum_parallel_row_group_writers: _,
@@ -210,6 +210,9 @@ impl ParquetOptions {
210210
coerce_int96: _, // not used for writer props
211211
skip_arrow_metadata: _,
212212
max_predicate_cache_size: _,
213+
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
214+
filter_collecting_byte_ratio_threshold: _, // not used for writer props
215+
filter_confidence_z: _, // not used for writer props
213216
} = self;
214217

215218
let mut builder = WriterProperties::builder()
@@ -470,7 +473,7 @@ mod tests {
470473
skip_metadata: defaults.skip_metadata,
471474
metadata_size_hint: defaults.metadata_size_hint,
472475
pushdown_filters: defaults.pushdown_filters,
473-
reorder_filters: defaults.reorder_filters,
476+
474477
force_filter_selections: defaults.force_filter_selections,
475478
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
476479
maximum_parallel_row_group_writers: defaults
@@ -484,6 +487,10 @@ mod tests {
484487
coerce_int96: None,
485488
max_predicate_cache_size: defaults.max_predicate_cache_size,
486489
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
490+
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
491+
filter_collecting_byte_ratio_threshold: defaults
492+
.filter_collecting_byte_ratio_threshold,
493+
filter_confidence_z: defaults.filter_confidence_z,
487494
}
488495
}
489496

@@ -585,7 +592,7 @@ mod tests {
585592
skip_metadata: global_options_defaults.skip_metadata,
586593
metadata_size_hint: global_options_defaults.metadata_size_hint,
587594
pushdown_filters: global_options_defaults.pushdown_filters,
588-
reorder_filters: global_options_defaults.reorder_filters,
595+
589596
force_filter_selections: global_options_defaults.force_filter_selections,
590597
allow_single_file_parallelism: global_options_defaults
591598
.allow_single_file_parallelism,
@@ -607,6 +614,11 @@ mod tests {
607614
norm_level: c.norm_level,
608615
}
609616
}),
617+
filter_pushdown_min_bytes_per_sec: global_options_defaults
618+
.filter_pushdown_min_bytes_per_sec,
619+
filter_collecting_byte_ratio_threshold: global_options_defaults
620+
.filter_collecting_byte_ratio_threshold,
621+
filter_confidence_z: global_options_defaults.filter_confidence_z,
610622
},
611623
column_specific_options,
612624
key_value_metadata,

datafusion/core/src/dataframe/parquet.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,14 @@ mod tests {
164164
let plan = df.explain(false, false)?.collect().await?;
165165
// Filters all the way to Parquet
166166
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
167-
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
167+
let data_source_exec_row = formatted
168+
.lines()
169+
.find(|line| line.contains("DataSourceExec:"))
170+
.unwrap();
171+
assert!(
172+
data_source_exec_row.contains("predicate=id@0 = 1"),
173+
"{formatted}"
174+
);
168175

169176
Ok(())
170177
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,11 @@ mod tests {
167167
}
168168

169169
if self.pushdown_predicate {
170+
let mut opts = source.table_parquet_options().clone();
171+
opts.global.filter_pushdown_min_bytes_per_sec = 0.0;
170172
source = source
171-
.with_pushdown_filters(true)
172-
.with_reorder_filters(true);
173+
.with_table_parquet_options(opts)
174+
.with_pushdown_filters(true);
173175
} else {
174176
source = source.with_pushdown_filters(false);
175177
}

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ pub struct TestParquetFile {
5757
pub struct ParquetScanOptions {
5858
/// Enable pushdown filters
5959
pub pushdown_filters: bool,
60-
/// enable reordering filters
61-
pub reorder_filters: bool,
6260
/// enable page index
6361
pub enable_page_index: bool,
6462
}
@@ -68,8 +66,9 @@ impl ParquetScanOptions {
6866
pub fn config(&self) -> SessionConfig {
6967
let mut config = ConfigOptions::new();
7068
config.execution.parquet.pushdown_filters = self.pushdown_filters;
71-
config.execution.parquet.reorder_filters = self.reorder_filters;
7269
config.execution.parquet.enable_page_index = self.enable_page_index;
70+
// Disable adaptive filter selection for tests that expect deterministic pushdown
71+
config.execution.parquet.filter_pushdown_min_bytes_per_sec = 0.0;
7372
config.into()
7473
}
7574
}

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,6 @@ impl<'a> TestCase<'a> {
444444
.read_with_options(
445445
ParquetScanOptions {
446446
pushdown_filters: false,
447-
reorder_filters: false,
448447
enable_page_index: false,
449448
},
450449
filter,
@@ -455,7 +454,6 @@ impl<'a> TestCase<'a> {
455454
.read_with_options(
456455
ParquetScanOptions {
457456
pushdown_filters: true,
458-
reorder_filters: false,
459457
enable_page_index: false,
460458
},
461459
filter,
@@ -464,43 +462,28 @@ impl<'a> TestCase<'a> {
464462

465463
assert_eq!(no_pushdown, only_pushdown);
466464

467-
let pushdown_and_reordering = self
468-
.read_with_options(
469-
ParquetScanOptions {
470-
pushdown_filters: true,
471-
reorder_filters: true,
472-
enable_page_index: false,
473-
},
474-
filter,
475-
)
476-
.await;
477-
478-
assert_eq!(no_pushdown, pushdown_and_reordering);
479-
480465
let page_index_only = self
481466
.read_with_options(
482467
ParquetScanOptions {
483468
pushdown_filters: false,
484-
reorder_filters: false,
485469
enable_page_index: true,
486470
},
487471
filter,
488472
)
489473
.await;
490474
assert_eq!(no_pushdown, page_index_only);
491475

492-
let pushdown_reordering_and_page_index = self
476+
let pushdown_and_page_index = self
493477
.read_with_options(
494478
ParquetScanOptions {
495479
pushdown_filters: true,
496-
reorder_filters: true,
497480
enable_page_index: true,
498481
},
499482
filter,
500483
)
501484
.await;
502485

503-
assert_eq!(no_pushdown, pushdown_reordering_and_page_index);
486+
assert_eq!(no_pushdown, pushdown_and_page_index);
504487
}
505488

506489
/// Reads data from a test parquet file using the specified scan options
@@ -633,6 +616,11 @@ async fn predicate_cache_default() -> datafusion_common::Result<()> {
633616
async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
634617
let mut config = SessionConfig::new();
635618
config.options_mut().execution.parquet.pushdown_filters = true;
619+
config
620+
.options_mut()
621+
.execution
622+
.parquet
623+
.filter_pushdown_min_bytes_per_sec = 0.0;
636624
let ctx = SessionContext::new_with_config(config);
637625
// The cache is on by default, and used when filter pushdown is enabled
638626
PredicateCacheTest {
@@ -647,6 +635,11 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
647635
async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> {
648636
let mut config = SessionConfig::new();
649637
config.options_mut().execution.parquet.pushdown_filters = true;
638+
config
639+
.options_mut()
640+
.execution
641+
.parquet
642+
.filter_pushdown_min_bytes_per_sec = 0.0;
650643
// force to get multiple batches to trigger repeated metric compound bug
651644
config.options_mut().execution.batch_size = 1;
652645
let ctx = SessionContext::new_with_config(config);
@@ -664,6 +657,11 @@ async fn predicate_cache_pushdown_default_selections_only()
664657
-> datafusion_common::Result<()> {
665658
let mut config = SessionConfig::new();
666659
config.options_mut().execution.parquet.pushdown_filters = true;
660+
config
661+
.options_mut()
662+
.execution
663+
.parquet
664+
.filter_pushdown_min_bytes_per_sec = 0.0;
667665
// forcing filter selections minimizes the number of rows read from the cache
668666
config
669667
.options_mut()

0 commit comments

Comments
 (0)