(Test) Advanced adaptive filter selectivity evaluation#20363
(Test) Advanced adaptive filter selectivity evaluation#20363adriangb wants to merge 4 commits intoapache:mainfrom
Conversation
|
run benchmark tpcds |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
Hm it seems stuck again |
|
FYI @alamb
|
|
@Dandandan this is mostly vibe coded, I'm only 50% confident it even makes sense without reviewing the code fwiw |
e0240af to
09cdb0b
Compare
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
Wonder if I'm infinite looping it or something :( |
Yes I think previously it got stuck during infinite loops / extremely long running tasks. |
My bad I’ll try to add a PR to have timeouts and a cancel command |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
run benchmark tpch |
|
run benchmark clickbench_partitioned |
|
run benchmark clickbench_partitioned env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (f8ee955) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (f8ee955) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
f8ee955 to
290784a
Compare
|
run benchmark clickbench_partitioned |
|
run benchmark clickbench_partitioned env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (290784a) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (290784a) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
290784a to
18ab75f
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
65f15e1 to
72f296f
Compare
|
run benchmark clickbench_partitioned env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (72f296f) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes (72f296f) to ec00112 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Applies apache#20363 on top of the io-dynamic branch, resolving conflicts in opener.rs and source.rs by adapting the new predicate_conjuncts/selectivity_tracker fields to the direct ParquetMorselizer construction pattern used on this branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduces a runtime adaptive filter selectivity tracking system for Parquet pushdown. Each filter is monitored with Welford online stats and moves through a state machine: New -> RowFilter|PostScan -> (promoted / demoted / dropped). Key changes: - New selectivity.rs module (SelectivityTracker, TrackerConfig, SelectivityStats, FilterState, PartitionedFilters, FilterId). - New OptionalFilterPhysicalExpr wrapper in physical_expr_common. HashJoinExec wraps dynamic join filters in it. - Removes reorder_filters config + supporting code. - Adds filter_pushdown_min_bytes_per_sec, filter_collecting_byte_ratio_threshold, filter_confidence_z config. - Predicate storage: Option<Arc<PhysicalExpr>> -> Option<Vec<(FilterId, Arc<PhysicalExpr>)>> on ParquetSource/ParquetOpener. - build_row_filter takes Vec<(FilterId,...)> + SelectivityTracker, returns RowFilterWithMetrics. DatafusionArrowPredicate reports per-batch stats back to the tracker. - ParquetOpener calls tracker.partition_filters() and apply_post_scan_filters_with_stats; records filter_apply_time. - Proto reserves tag 6 (was reorder_filters); adds 3 new optional fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
72f296f to
17e8370
Compare
Optional/dynamic filters from hash-join build sides were unconditionally placed as PostScan on first encounter, losing late-materialization benefits even when filter columns were small relative to the projection. With few file splits opened in parallel, the tracker rarely accumulated enough samples to promote them mid-query. Apply the same byte_ratio_threshold heuristic used for static filters. The CI lower-bound promotion and CI upper-bound demotion paths still apply, including Drop for ineffective optional filters. Local TPC-DS sf1 (M-series, pushdown_filters=true): | Query | Main | Branch | Branch+Fix | |-------|-------|--------|------------| | Q24 | 72 | 452 | 70 | | Q17 | 124 | 212 | 121 | | Q25 | 182 | 379 | 203 | | Q29 | 152 | 312 | 145 | | Q7 | 224 | 297 | 220 | | Q58 | 129 | 191 | 133 | | Q64 | 28213 | 672 | 578 | | Q9 | 228 | 96 | 87 | | Q76 | 172 | 105 | 156 | Q76 regresses slightly vs the no-fix branch (CASE/hash_lookup is CPU- heavy at row level) but still beats main. Also updates dynamic_filter_pushdown_config.slt to match the Optional(DynamicFilter ...) display introduced earlier in the branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
After moving optional filters to RowFilter via byte_ratio, queries with
1-row-group-per-file inputs (e.g. TPC-DS) had no chance to demote when
the chosen filter turned out to be CPU-dominated and ineffective:
partition_filters runs once per file open, all 12 split openers fire in
parallel and see no stats, and the existing Demote/Drop branches never
re-trigger for the lifetime of the scan.
Add a per-FilterId Arc<AtomicBool> "skip flag" owned by
SelectivityTracker. Once a filter has accumulated enough samples and its
CI upper bound on bytes-per-second falls below min_bytes_per_sec, the
hot per-batch update() path flips the flag — but only for filters
recorded as optional at first encounter (mandatory filters must always
execute or the result set changes).
Both consumers honour it:
* DatafusionArrowPredicate::evaluate returns an all-true mask without
invoking physical_expr (filter columns are still decoded; CPU is
reclaimed but I/O is not, pending arrow-rs API).
* apply_post_scan_filters_with_stats `continue`s past the filter,
skipping evaluation and the per-batch tracker.update.
Local TPC-DS sf1 (M-series, pushdown_filters=true), worst regressors
from main pushdown=off baseline:
| Query | Main(off) | Branch(byte-ratio) | +skip-flag |
|-------|-----------|--------------------|------------|
| Q72 | 619 | 554 | 261 |
| Q50 | 221 | 521 | 135 |
| Q23 | 892 | 1217 | 680 |
| Q67 | 310 | 510 | 306 |
| Q18 | 128 | 312 | 178 |
| Q13 | 399 | 558 | 363 |
| Q53 | 103 | 167 | 93 |
| Q63 | 106 | 173 | 93 |
| Q76 | 132 | 268 | 105 |
Q24-class wins are unaffected (Q24 holds at 70 ms vs 379 ms on main).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Q18 (and several other TPC-DS regressions) had a post-scan customer_demographics filter — `Optional(DynamicFilter)` on the single projected column `cd_demo_sk` — that burned 90 ms of CPU per scan but could never be skipped. The filter was correctly placed at PostScan (projection ⊆ filter columns ⇒ byte_ratio = 1.0 > threshold) but the mid-stream skip path never fired. Root cause: `SelectivityStats::update` only incremented `sample_count` when `batch_bytes > 0`. When the projection is a subset of the filter columns, `other_bytes_per_row = 0` and therefore `batch_bytes = 0` on every call, so the Welford counter stayed at zero, the CI upper bound stayed `None`, and the skip check short-circuited. Meanwhile the filter kept running per batch. Admit samples with `batch_bytes = 0`. The recorded effectiveness for those samples is legitimately zero (no late-materialization payoff), so the CI upper bound converges on zero after a few batches and the skip flag flips for optional filters — exactly what we want: CPU spent, no byte savings, optional ⇒ drop. Local TPC-DS sf1 (M-series, pushdown=on) vs main pushdown=off: | Query | Main(off) | Before | After | |-------|-----------|--------|-------| | Q18 | 99 | 182 | 118 | | Q67 | 312 | 503 | 346 | | Q26 | 80 | 151 | 94 | | Q85 | 149 | 246 | 157 | | Q91 | 64 | 108 | 58 | | Q53 | 103 | 144 | 99 | | Q63 | 103 | 148 | 99 | | Q13 | 399 | 558 | 376 | | Q72 | 619 | 489 | 277 | | Q24 | 379 | 70 | 70 | | Q64 | 28213 | -- | 519 | Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Related to filter pushdown performance optimization work.
Rationale for this change
Currently when
pushdown_filters = true, DataFusion pushes all filter predicates into the Parquet reader as row-level filters (ArrowPredicates) unconditionally. This is suboptimal because:reorder_filtersheuristic was static. It used compressed column size as a proxy for cost and sorted filters by that metric, but never measured actual runtime selectivity or evaluation cost. It could not adapt to data skew or runtime conditions.HashJoinExec) cannot be dropped even when they provide no benefit. Without a way to mark filters as optional, the system was forced to always evaluate them.This PR introduces an adaptive filter selectivity tracking system that observes filter behavior at runtime and makes data-driven decisions about whether each filter should be pushed down as a row-level predicate or applied post-scan.
What changes are included in this PR?
1. New module:
selectivity.rs(1,554 lines)The core of this PR. Introduces
SelectivityTracker, a shared, lock-guarded structure that:New -> RowFilter | PostScan -> (promoted/demoted/dropped)states based on:filter_bytes / projection_bytes) to cheaply decide whether a new filter starts as a row filter or post-scan filter.filter_pushdown_min_bytes_per_sec.OptionalFilterPhysicalExprcan be dropped entirely when ineffective.snapshot_generation(), resetting statistics when a filter's predicate changes (e.g., when aDynamicFilterPhysicalExprfrom a hash join updates its value set).Key types:
SelectivityTracker-- cross-file tracker shared by allParquetOpenerinstancesTrackerConfig-- immutable configuration (built fromParquetOptions)SelectivityStats-- per-filter Welford statistics with confidence interval methodsFilterState--RowFilter | PostScan | DroppedenumPartitionedFilters-- output ofpartition_filters(), consumed by the openerFilterId-- stableusizeidentifier assigned byParquetSource::with_predicate2. New wrapper:
OptionalFilterPhysicalExpr(inphysical_expr_common)A transparent
PhysicalExprwrapper that marks a filter as optional -- droppable without affecting query correctness. AllPhysicalExprtrait methods delegate to the inner expression. The selectivity tracker detects this viadowncast_ref::<OptionalFilterPhysicalExpr>()and can drop the filter entirely when it is ineffective, rather than demoting it to post-scan.HashJoinExecnow wraps its dynamic join filters inOptionalFilterPhysicalExprbefore pushing them down. This is why plan output now showsOptional(DynamicFilter [...])instead ofDynamicFilter [...].3. Removal of
reorder_filtersconfig optionThe old static
reorder_filtersboolean and its associated heuristic (sort byrequired_bytes, thencan_use_index) are removed entirely. The adaptive system subsumes this:FilterCandidateno longer storesrequired_bytesorcan_use_indexfields.size_of_columns()andcolumns_sorted()helper functions inrow_filter.rsare removed.SelectivityTracker::partition_filters()based on measured effectiveness or byte-ratio fallback.4. Three new configuration options (in
ParquetOptions)filter_pushdown_min_bytes_per_sec0.0= all promoted,INFINITY= none promoted (feature disabled).filter_collecting_byte_ratio_thresholdfilter_confidence_z5. Changes to
ParquetOpener/ opener.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>instead of a single combinedArc<dyn PhysicalExpr>.selectivity_tracker.partition_filters()to split filters into row-level vs. post-scan.build_row_filter()(updated signature).apply_post_scan_filters_with_stats(), a new function that evaluates each filter individually, reports per-filter timing and selectivity back to the tracker, and combines results into a single boolean mask.limitis only applied to the Parquet reader when there are no post-scan filters (otherwise limiting would cut off rows before the filter could find matches).filter_apply_timemetric tracks post-scan filter evaluation time.6. Changes to
ParquetSource/ source.rsOption<Arc<dyn PhysicalExpr>>toOption<Vec<(FilterId, Arc<dyn PhysicalExpr>)>>.with_predicate()now splits the predicate into conjuncts and assigns stableFilterIds (indices).SelectivityTrackeris stored as a sharedArconParquetSourceand passed to all openers.with_table_parquet_options()now builds a freshSelectivityTrackerfrom the three new config values.with_reorder_filters()andreorder_filters()methods are removed.7. Changes to
build_row_filter()/ row_filter.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>+&Arc<SelectivityTracker>instead of&Arc<dyn PhysicalExpr>+reorder_predicates: bool.RowFilterWithMetrics(new struct) containing both theRowFilterand any unbuildable filters that must be applied post-scan.DatafusionArrowPredicatenow carries aFilterIdandArc<SelectivityTracker>, reporting per-batch evaluation metrics back to the tracker after eachevaluate()call.build_row_filter-- filters arrive pre-ordered by the tracker.8. Changes to
HashJoinExecOptionalFilterPhysicalExprbefore being pushed down.OptionalFilterPhysicalExprto find the innerDynamicFilterPhysicalExpr.9. Protobuf schema updates
reorder_filtersfield (tag 6) marked asreservedindatafusion_common.proto.filter_pushdown_min_bytes_per_sec(tag 35),filter_collecting_byte_ratio_threshold(tag 40),filter_confidence_z(tag 41).pbjson.rs,prost.rs,from_proto,to_proto, andfile_formats.rs.10. Test and benchmark updates
reorder_filtersremoved from tests and benchmarks.filter_pushdown_min_bytes_per_sec = 0.0to preserve deterministic behavior (all filters always pushed down).DynamicFilter [...]toOptional(DynamicFilter [...]).selectivity.rscovering: effectiveness calculation, Welford's algorithm, confidence intervals, state machine transitions (initial placement, promotion, demotion, dropping), dynamic filter generation tracking, filter ordering, and integration lifecycle tests.explain_analyze.rs(output_rows=8->output_rows=5) due to the adaptive system now placing some filters as post-scan that were previously row-level, causing slight row count differences in EXPLAIN ANALYZE output.Are these changes tested?
Yes:
pushdown_filtersand filter pushdown SLT tests pass (withfilter_pushdown_min_bytes_per_sec = 0.0to force all filters to row-level for deterministic behavior).selectivity.rs(~450 lines of tests) covering theSelectivityStatscalculator,TrackerConfigbuilder, state machine transitions (initial placement, promotion, demotion, dropping, reset on generation change), filter ordering, and full promotion/demotion lifecycle integration tests.Optional(...)wrapper on dynamic filters.dynamic_filter_pushdown_config.slt,information_schema.slt,preserve_file_partitioning.slt,projection_pushdown.slt,push_down_filter.slt, andrepartition_subset_satisfaction.sltupdated.benchmarks/results.txtshows TPC-H (13 faster, 6 slower, 3 unchanged), TPC-DS (33 faster, 31 slower, 35 unchanged, with notable 24x improvement on Q64), and ClickBench (18 faster, 12 slower, 13 unchanged) results.Are there any user-facing changes?
Yes:
reorder_filtersconfig option removed. This is a breaking change. Users who setSET datafusion.execution.parquet.reorder_filters = truewill get an error. The adaptive system replaces this functionality automatically.Three new config options added under
datafusion.execution.parquet:filter_pushdown_min_bytes_per_sec(default: 52428800)filter_collecting_byte_ratio_threshold(default: 0.15)filter_confidence_z(default: 2.0)Changed default behavior when
pushdown_filters = true. Previously, all filters were unconditionally pushed into the Parquet reader. Now, the adaptive system decides per-filter based on byte-ratio thresholds and runtime effectiveness measurements. To restore the old behavior of pushing all filters unconditionally, setfilter_pushdown_min_bytes_per_sec = 0.0.EXPLAIN plan output changes. Dynamic join filters now display as
Optional(DynamicFilter [...])instead ofDynamicFilter [...], reflecting their new optional wrapper.Deprecated
predicate()method signature changed.ParquetSource::predicate()now returnsOption<Arc<dyn PhysicalExpr>>(owned) instead ofOption<&Arc<dyn PhysicalExpr>>(reference). This method was already deprecated in favor offilter().