Skip to content

Commit e99c796

Browse files
committed
feat: adaptive filter selectivity tracking for Parquet row filters
1 parent 53b0ffb commit e99c796

24 files changed

Lines changed: 1228 additions & 43 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,16 @@ config_namespace! {
751751
/// parquet reader setting. 0 means no caching.
752752
pub max_predicate_cache_size: Option<usize>, default = None
753753

754+
/// (reading) Minimum filter effectiveness threshold for adaptive filter
755+
/// pushdown.
756+
/// Only filters that filter out at least this fraction of rows will be
757+
/// promoted to row filters during adaptive filter pushdown.
758+
/// A value of 1.0 means only filters that filter out all rows will be
759+
/// promoted. A value of 0.0 means all filters will be promoted.
760+
/// Because there can be a high I/O cost to pushing down ineffective filters,
761+
/// recommended values are in the range [0.8, 0.95], depending on random I/0 costs.
762+
pub filter_effectiveness_threshold: f64, default = 0.8
763+
754764
// The following options affect writing to parquet files
755765
// and map to parquet::file::properties::WriterProperties
756766

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212+
filter_effectiveness_threshold: _, // not used for writer props
212213
} = self;
213214

214215
let mut builder = WriterProperties::builder()
@@ -460,6 +461,7 @@ mod tests {
460461
skip_arrow_metadata: defaults.skip_arrow_metadata,
461462
coerce_int96: None,
462463
max_predicate_cache_size: defaults.max_predicate_cache_size,
464+
filter_effectiveness_threshold: defaults.filter_effectiveness_threshold,
463465
}
464466
}
465467

@@ -574,6 +576,8 @@ mod tests {
574576
binary_as_string: global_options_defaults.binary_as_string,
575577
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
576578
coerce_int96: None,
579+
filter_effectiveness_threshold: global_options_defaults
580+
.filter_effectiveness_threshold,
577581
},
578582
column_specific_options,
579583
key_value_metadata,

datafusion/core/src/dataframe/parquet.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,14 @@ mod tests {
152152
let plan = df.explain(false, false)?.collect().await?;
153153
// Filters all the way to Parquet
154154
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
155-
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
155+
let data_source_exec_row = formatted
156+
.lines()
157+
.find(|line| line.contains("DataSourceExec:"))
158+
.unwrap();
159+
assert!(
160+
data_source_exec_row.contains("predicate=id@0 = 1"),
161+
"{formatted}"
162+
);
156163

157164
Ok(())
158165
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ mod tests {
169169
if self.pushdown_predicate {
170170
source = source
171171
.with_pushdown_filters(true)
172-
.with_reorder_filters(true);
172+
.with_reorder_filters(true)
173+
.with_filter_effectiveness_threshold(0.0);
173174
} else {
174175
source = source.with_pushdown_filters(false);
175176
}

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,6 +1731,11 @@ async fn test_topk_dynamic_filter_pushdown_integration() {
17311731
let mut cfg = SessionConfig::new();
17321732
cfg.options_mut().execution.parquet.pushdown_filters = true;
17331733
cfg.options_mut().execution.parquet.max_row_group_size = 128;
1734+
// Always pushdown filters into row filters for this test
1735+
cfg.options_mut()
1736+
.execution
1737+
.parquet
1738+
.filter_effectiveness_threshold = 0.0;
17341739
let ctx = SessionContext::new_with_config(cfg);
17351740
ctx.register_object_store(
17361741
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ async fn parquet_explain_analyze() {
862862
.to_string();
863863

864864
// should contain aggregated stats
865-
assert_contains!(&formatted, "output_rows=8");
865+
assert_contains!(&formatted, "output_rows=5");
866866
assert_contains!(
867867
&formatted,
868868
"row_groups_pruned_bloom_filter=1 total \u{2192} 1 matched"

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::array::{
2424
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2525
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
2626
use datafusion_common::ScalarValue;
27-
use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
27+
use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter, SelectivityTracker};
2828
use datafusion_expr::{Expr, col};
2929
use datafusion_functions_nested::expr_fn::array_has;
3030
use datafusion_physical_expr::planner::logical2physical;
@@ -116,7 +116,7 @@ fn scan_with_predicate(
116116

117117
let builder = if pushdown {
118118
if let Some(row_filter) =
119-
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
119+
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics, &SelectivityTracker::default())?
120120
{
121121
builder.with_row_filter(row_filter)
122122
} else {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,12 @@ impl FileFormat for ParquetFormat {
507507
) -> Result<Arc<dyn ExecutionPlan>> {
508508
let mut metadata_size_hint = None;
509509

510+
let filter_effectiveness_threshold = state
511+
.config_options()
512+
.execution
513+
.parquet
514+
.filter_effectiveness_threshold;
515+
510516
if let Some(metadata) = self.metadata_size_hint() {
511517
metadata_size_hint = Some(metadata);
512518
}
@@ -518,6 +524,7 @@ impl FileFormat for ParquetFormat {
518524
.cloned()
519525
.ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
520526
source = source.with_table_parquet_options(self.options.clone());
527+
source = source.with_filter_pushdown_selectivity(filter_effectiveness_threshold);
521528

522529
// Use the CachedParquetFileReaderFactory
523530
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub struct ParquetFileMetrics {
9191
/// number of rows that were stored in the cache after evaluating predicates
9292
/// reused for the output.
9393
pub predicate_cache_records: Gauge,
94+
//// Time spent applying filters
95+
pub filter_apply_time: Time,
9496
}
9597

9698
impl ParquetFileMetrics {
@@ -186,6 +188,10 @@ impl ParquetFileMetrics {
186188
.with_new_label("filename", filename.to_string())
187189
.gauge("predicate_cache_records", partition);
188190

191+
let filter_apply_time = MetricBuilder::new(metrics)
192+
.with_new_label("filename", filename.to_string())
193+
.subset_time("filter_apply_time", partition);
194+
189195
Self {
190196
files_ranges_pruned_statistics,
191197
predicate_evaluation_errors,
@@ -205,6 +211,7 @@ impl ParquetFileMetrics {
205211
scan_efficiency_ratio,
206212
predicate_cache_inner_records,
207213
predicate_cache_records,
214+
filter_apply_time,
208215
}
209216
}
210217
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod page_filter;
2929
mod reader;
3030
mod row_filter;
3131
mod row_group_filter;
32+
mod selectivity;
3233
mod sort;
3334
pub mod source;
3435
mod supported_predicates;
@@ -39,7 +40,11 @@ pub use file_format::*;
3940
pub use metrics::ParquetFileMetrics;
4041
pub use page_filter::PagePruningAccessPlanFilter;
4142
pub use reader::*; // Expose so downstream crates can use it
43+
pub use row_filter::FilterMetrics;
44+
pub use row_filter::RowFilterWithMetrics;
4245
pub use row_filter::build_row_filter;
46+
pub use row_filter::build_row_filter_with_metrics;
4347
pub use row_filter::can_expr_be_pushed_down_with_schemas;
4448
pub use row_group_filter::RowGroupAccessPlanFilter;
49+
pub use selectivity::SelectivityTracker;
4550
pub use writer::plan_to_parquet;

0 commit comments

Comments
 (0)