Skip to content

Commit e344306

Browse files
adriangbclaude
andcommitted
fix: start optional filters as RowFilter when byte ratio is small
Optional/dynamic filters from hash-join build sides were unconditionally placed as PostScan on first encounter, losing late-materialization benefits even when filter columns were small relative to the projection. With few file splits opened in parallel, the tracker rarely accumulated enough samples to promote them mid-query. Apply the same byte_ratio_threshold heuristic used for static filters. The CI lower-bound promotion and CI upper-bound demotion paths still apply, including Drop for ineffective optional filters. Local TPC-DS sf1 (M-series, pushdown_filters=true): | Query | Main | Branch | Branch+Fix | |-------|-------|--------|------------| | Q24 | 72 | 452 | 70 | | Q17 | 124 | 212 | 121 | | Q25 | 182 | 379 | 203 | | Q29 | 152 | 312 | 145 | | Q7 | 224 | 297 | 220 | | Q58 | 129 | 191 | 133 | | Q64 | 28213 | 672 | 578 | | Q9 | 228 | 96 | 87 | | Q76 | 172 | 105 | 156 | Q76 regresses slightly vs the no-fix branch (CASE/hash_lookup is CPU- heavy at row level) but still beats main. Also updates dynamic_filter_pushdown_config.slt to match the Optional(DynamicFilter ...) display introduced earlier in the branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ba83e88 commit e344306

3 files changed

Lines changed: 9 additions & 38 deletions

File tree

datafusion/datasource-parquet/src/selectivity.rs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -540,10 +540,7 @@ impl SelectivityTrackerInner {
540540
post_scan: &mut Vec<(FilterId, Arc<dyn PhysicalExpr>)>,
541541
stats_map: &HashMap<FilterId, Mutex<SelectivityStats>>,
542542
) {
543-
if expr
544-
.downcast_ref::<OptionalFilterPhysicalExpr>()
545-
.is_none()
546-
{
543+
if expr.downcast_ref::<OptionalFilterPhysicalExpr>().is_none() {
547544
self.filter_states.insert(id, FilterState::PostScan);
548545
post_scan.push((id, Arc::clone(expr)));
549546
// Reset stats for this filter so it can be re-evaluated as a post-scan filter.
@@ -640,29 +637,9 @@ impl SelectivityTrackerInner {
640637
let state = self.filter_states.get(&id).copied();
641638

642639
let Some(state) = state else {
643-
// New filter: decide initial placement.
644-
645-
// Optional/dynamic filters (e.g. hash join pushdown) always start
646-
// PostScan because their selectivity is unknown and they can be
647-
// dropped if ineffective. This is especially important for
648-
// single-file tables where there's no second file to adapt.
649-
let is_optional = expr
650-
.downcast_ref::<OptionalFilterPhysicalExpr>()
651-
.is_some();
652-
if is_optional {
653-
debug!(
654-
"FilterId {id}: New optional filter → Post-scan (conservative) — {expr}"
655-
);
656-
self.filter_states.insert(id, FilterState::PostScan);
657-
if !stats_map.contains_key(&id) {
658-
new_filter_ids.push(id);
659-
}
660-
post_scan_filters.push((id, expr));
661-
continue;
662-
}
663-
664-
// Static filters: use filter_bytes / projection_bytes to decide
665-
// initial placement. This ratio captures the I/O tradeoff:
640+
// New filter: decide initial placement using the
641+
// filter_bytes / projection_bytes ratio. This ratio captures
642+
// the I/O tradeoff:
666643
//
667644
// - Low ratio (filter columns are small vs projection): row-filter
668645
// enables late materialization — the large non-filter portion of
@@ -757,9 +734,7 @@ impl SelectivityTrackerInner {
757734
let stats = entry.lock();
758735
if let Some(ub) = stats.confidence_upper_bound(confidence_z)
759736
&& ub < config.min_bytes_per_sec
760-
&& expr
761-
.downcast_ref::<OptionalFilterPhysicalExpr>()
762-
.is_some()
737+
&& expr.downcast_ref::<OptionalFilterPhysicalExpr>().is_some()
763738
{
764739
drop(stats);
765740
debug!(

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,11 +1036,7 @@ mod test {
10361036
);
10371037

10381038
// Inner expr is NOT detectable as optional
1039-
assert!(
1040-
inner
1041-
.downcast_ref::<OptionalFilterPhysicalExpr>()
1042-
.is_none()
1043-
);
1039+
assert!(inner.downcast_ref::<OptionalFilterPhysicalExpr>().is_none());
10441040
}
10451041

10461042
#[test]

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,8 @@ physical_plan
337337
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
338338
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
339339
03)--SortExec: expr=[data@1 DESC], preserve_partitioning=[false]
340-
04)----FilterExec: DynamicFilter [ empty ]
341-
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
340+
04)----FilterExec: Optional(DynamicFilter [ empty ])
341+
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
342342

343343
statement count 0
344344
SET datafusion.execution.parquet.pushdown_filters = true;
@@ -361,7 +361,7 @@ physical_plan
361361
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
362362
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
363363
03)--SortExec: expr=[data@1 DESC], preserve_partitioning=[false]
364-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
364+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
365365

366366
statement count 0
367367
RESET datafusion.execution.parquet.pushdown_filters;

0 commit comments

Comments
 (0)