Skip to content

Commit b95348b

Browse files
committed
add correlation, bytes/sec
1 parent ce15e5d commit b95348b

19 files changed

Lines changed: 1290 additions & 408 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -751,15 +751,30 @@ config_namespace! {
751751
/// parquet reader setting. 0 means no caching.
752752
pub max_predicate_cache_size: Option<usize>, default = None
753753

754-
/// (reading) Minimum filter effectiveness threshold for adaptive filter
755-
/// pushdown.
756-
/// Only filters that filter out at least this fraction of rows will be
757-
/// promoted to row filters during adaptive filter pushdown.
758-
/// A value of 1.0 means only filters that filter out all rows will be
759-
/// promoted. A value of 0.0 means all filters will be promoted.
760-
/// Because there can be a high I/O cost to pushing down ineffective filters,
761-
/// recommended values are in the range [0.8, 0.95], depending on random I/0 costs.
762-
pub filter_effectiveness_threshold: f64, default = 0.8
754+
/// (reading) Minimum bytes/sec throughput for adaptive filter pushdown.
755+
/// Filters that achieve at least this throughput (bytes_saved / eval_time)
756+
/// are promoted to row filters.
757+
/// f64::INFINITY (default) = no filters promoted (feature disabled).
758+
/// 0.0 = all filters pushed as row filters (no adaptive logic).
759+
pub filter_pushdown_min_bytes_per_sec: f64, default = f64::INFINITY
760+
761+
/// (reading) Correlation ratio threshold for grouping filters.
762+
/// The ratio is P(A ∧ B) / (P(A) * P(B)):
763+
/// 1.0 = independent (keep separate for late materialization benefit)
764+
/// 1.5 = filters co-pass 50% more often than chance (default threshold)
765+
/// 2.0 = filters co-pass twice as often as chance (conservative)
766+
/// Higher values = less grouping = more late materialization, more overhead.
767+
/// Lower values = more grouping = less overhead, less late materialization.
768+
/// Set to f64::MAX to disable grouping entirely.
769+
pub filter_correlation_threshold: f64, default = 1.5
770+
771+
/// (reading) Minimum rows of post-scan evaluation before statistics-based
772+
/// optimization activates. During collection, all filters are evaluated
773+
/// as post-scan to gather accurate marginal and joint selectivity statistics.
774+
/// Used for BOTH individual filter effectiveness decisions AND correlation-
775+
/// based grouping. Larger values = more accurate estimates, longer collection.
776+
/// Set to 0 to disable the collection phase entirely.
777+
pub filter_statistics_collection_min_rows: u64, default = 10_000
763778

764779
// The following options affect writing to parquet files
765780
// and map to parquet::file::properties::WriterProperties

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212-
filter_effectiveness_threshold: _, // not used for writer props
212+
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
213+
filter_correlation_threshold: _, // not used for writer props
214+
filter_statistics_collection_min_rows: _, // not used for writer props
213215
} = self;
214216

215217
let mut builder = WriterProperties::builder()
@@ -461,7 +463,7 @@ mod tests {
461463
skip_arrow_metadata: defaults.skip_arrow_metadata,
462464
coerce_int96: None,
463465
max_predicate_cache_size: defaults.max_predicate_cache_size,
464-
filter_effectiveness_threshold: defaults.filter_effectiveness_threshold,
466+
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
465467
}
466468
}
467469

@@ -576,8 +578,8 @@ mod tests {
576578
binary_as_string: global_options_defaults.binary_as_string,
577579
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
578580
coerce_int96: None,
579-
filter_effectiveness_threshold: global_options_defaults
580-
.filter_effectiveness_threshold,
581+
filter_pushdown_min_bytes_per_sec: global_options_defaults
582+
.filter_pushdown_min_bytes_per_sec,
581583
},
582584
column_specific_options,
583585
key_value_metadata,

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ mod tests {
170170
source = source
171171
.with_pushdown_filters(true)
172172
.with_reorder_filters(true)
173-
.with_filter_effectiveness_threshold(0.0);
173+
.with_filter_pushdown_min_bytes_per_sec(0.0);
174174
} else {
175175
source = source.with_pushdown_filters(false);
176176
}

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1735,7 +1735,7 @@ async fn test_topk_dynamic_filter_pushdown_integration() {
17351735
cfg.options_mut()
17361736
.execution
17371737
.parquet
1738-
.filter_effectiveness_threshold = 0.0;
1738+
.filter_pushdown_min_bytes_per_sec = 0.0;
17391739
let ctx = SessionContext::new_with_config(cfg);
17401740
ctx.register_object_store(
17411741
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,11 @@ impl FileFormat for ParquetFormat {
507507
) -> Result<Arc<dyn ExecutionPlan>> {
508508
let mut metadata_size_hint = None;
509509

510-
let filter_effectiveness_threshold = state
510+
let filter_pushdown_min_bytes_per_sec = state
511511
.config_options()
512512
.execution
513513
.parquet
514-
.filter_effectiveness_threshold;
514+
.filter_pushdown_min_bytes_per_sec;
515515

516516
if let Some(metadata) = self.metadata_size_hint() {
517517
metadata_size_hint = Some(metadata);
@@ -524,7 +524,7 @@ impl FileFormat for ParquetFormat {
524524
.cloned()
525525
.ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
526526
source = source.with_table_parquet_options(self.options.clone());
527-
source = source.with_filter_pushdown_selectivity(filter_effectiveness_threshold);
527+
source = source.with_filter_pushdown_min_bytes_per_sec(filter_pushdown_min_bytes_per_sec);
528528

529529
// Use the CachedParquetFileReaderFactory
530530
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

0 commit comments

Comments
 (0)