Skip to content

Commit 6ebaaa2

Browse files
adriangbclaude
andcommitted
fix: mid-stream skip for ineffective optional row/post-scan filters
After moving optional filters to RowFilter via byte_ratio, queries with 1-row-group-per-file inputs (e.g. TPC-DS) had no chance to demote when the chosen filter turned out to be CPU-dominated and ineffective: partition_filters runs once per file open, all 12 split openers fire in parallel and see no stats, and the existing Demote/Drop branches never re-trigger for the lifetime of the scan. Add a per-FilterId Arc<AtomicBool> "skip flag" owned by SelectivityTracker. Once a filter has accumulated enough samples and its CI upper bound on bytes-per-second falls below min_bytes_per_sec, the hot per-batch update() path flips the flag — but only for filters recorded as optional at first encounter (mandatory filters must always execute or the result set changes). Both consumers honour it: * DatafusionArrowPredicate::evaluate returns an all-true mask without invoking physical_expr (filter columns are still decoded; CPU is reclaimed but I/O is not, pending arrow-rs API). * apply_post_scan_filters_with_stats `continue`s past the filter, skipping evaluation and the per-batch tracker.update. Local TPC-DS sf1 (M-series, pushdown_filters=true), worst regressors from main pushdown=off baseline: | Query | Main(off) | Branch(byte-ratio) | +skip-flag | |-------|-----------|--------------------|------------| | Q72 | 619 | 554 | 261 | | Q50 | 221 | 521 | 135 | | Q23 | 892 | 1217 | 680 | | Q67 | 310 | 510 | 306 | | Q18 | 128 | 312 | 178 | | Q13 | 399 | 558 | 363 | | Q53 | 103 | 167 | 93 | | Q63 | 106 | 173 | 93 | | Q76 | 132 | 268 | 105 | Q24-class wins are unaffected (Q24 holds at 70 ms vs 379 ms on main). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4b730df commit 6ebaaa2

3 files changed

Lines changed: 175 additions & 11 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,15 @@ fn apply_post_scan_filters_with_stats(
14331433
let mut combined_mask: Option<BooleanArray> = None;
14341434

14351435
for (i, (id, expr)) in filters.iter().enumerate() {
1436+
// Mid-stream drop, mirror of `DatafusionArrowPredicate::evaluate`.
1437+
// Set by the tracker on `OptionalFilterPhysicalExpr` whose CI
1438+
// upper bound has fallen below `min_bytes_per_sec`; correctness is
1439+
// preserved because the originating join independently enforces
1440+
// the predicate. We do not update the tracker for a skipped batch.
1441+
if tracker.is_filter_skipped(*id) {
1442+
continue;
1443+
}
1444+
14361445
let start = datafusion_common::instant::Instant::now();
14371446
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
14381447
let bool_arr = as_boolean_array(result.as_ref())?;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,15 @@ pub(crate) struct DatafusionArrowPredicate {
138138
/// paths disagreed, the tracker would rank row-filter and post-scan
139139
/// candidates on incomparable axes and mis-promote or mis-demote.
140140
other_projected_bytes_per_row: f64,
141+
/// Mid-stream "drop" flag, shared with the
142+
/// [`crate::selectivity::SelectivityTracker`]. The tracker flips this
143+
/// when an `OptionalFilterPhysicalExpr` proves CPU-dominated and
144+
/// ineffective; once set, [`Self::evaluate`] returns an all-true mask
145+
/// without invoking `physical_expr`. Filter columns are still decoded
146+
/// (the parquet decoder cannot be reconfigured mid-scan), so this only
147+
/// reclaims CPU, not I/O. Flagged only for filters known to be
148+
/// optional, so correctness is preserved by the join itself.
149+
skip_flag: Arc<std::sync::atomic::AtomicBool>,
141150
}
142151

143152
impl DatafusionArrowPredicate {
@@ -153,6 +162,7 @@ impl DatafusionArrowPredicate {
153162
) -> Result<Self> {
154163
let physical_expr =
155164
reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?;
165+
let skip_flag = tracker.skip_flag(filter_id);
156166

157167
Ok(Self {
158168
physical_expr,
@@ -163,6 +173,7 @@ impl DatafusionArrowPredicate {
163173
filter_id,
164174
tracker,
165175
other_projected_bytes_per_row,
176+
skip_flag,
166177
})
167178
}
168179
}
@@ -173,6 +184,20 @@ impl ArrowPredicate for DatafusionArrowPredicate {
173184
}
174185

175186
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
187+
// Mid-stream drop: the tracker has decided this optional filter is
188+
// pulling its weight no longer. Return an all-true mask to bypass
189+
// expression evaluation entirely. We still bump `rows_matched` so
190+
// the per-predicate count stays consistent with input rows; the
191+
// tracker is intentionally NOT updated for skipped batches because
192+
// (a) we have nothing meaningful to report and (b) flooding it
193+
// with zero-cost samples would mask the underlying effectiveness
194+
// signal if the flag is ever cleared.
195+
if self.skip_flag.load(std::sync::atomic::Ordering::Acquire) {
196+
let rows_in_batch = batch.num_rows();
197+
self.rows_matched.add(rows_in_batch);
198+
return Ok(BooleanArray::from(vec![true; rows_in_batch]));
199+
}
200+
176201
// scoped timer updates on drop
177202
let mut timer = self.time.timer();
178203
let start_nanos = datafusion_common::instant::Instant::now();

datafusion/datasource-parquet/src/selectivity.rs

Lines changed: 141 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use parking_lot::{Mutex, RwLock};
2626
use parquet::file::metadata::ParquetMetaData;
2727
use std::collections::HashMap;
2828
use std::sync::Arc;
29+
use std::sync::atomic::{AtomicBool, Ordering};
2930

3031
use datafusion_physical_expr::utils::collect_columns;
3132
use datafusion_physical_expr_common::physical_expr::{
@@ -35,6 +36,11 @@ use datafusion_physical_expr_common::physical_expr::{
3536
/// Stable identifier for a filter conjunct, assigned by `ParquetSource::with_predicate`.
3637
pub type FilterId = usize;
3738

39+
/// Re-evaluate the per-filter skip flag every Nth batch update. The CI
40+
/// upper bound is a couple of arithmetic ops so this cap mostly serves to
41+
/// keep cache lines for `is_optional` / `skip_flags` cold on the hot path.
42+
const SKIP_FLAG_CHECK_INTERVAL: u64 = 4;
43+
3844
/// Per-filter lifecycle state in the adaptive filter system.
3945
///
4046
/// State transitions:
@@ -211,6 +217,8 @@ impl TrackerConfig {
211217
SelectivityTracker {
212218
config: self,
213219
filter_stats: RwLock::new(HashMap::new()),
220+
skip_flags: RwLock::new(HashMap::new()),
221+
is_optional: RwLock::new(HashMap::new()),
214222
inner: Mutex::new(SelectivityTrackerInner::new()),
215223
}
216224
}
@@ -334,6 +342,25 @@ pub struct SelectivityTracker {
334342
/// counters, so concurrent `update()` calls on *different* filters
335343
/// proceed in parallel with zero contention.
336344
filter_stats: RwLock<HashMap<FilterId, Mutex<SelectivityStats>>>,
345+
/// Per-filter "skip" flags — when set, the corresponding filter is
346+
/// treated as a no-op by both the row-filter
347+
/// (`DatafusionArrowPredicate::evaluate`) and the post-scan path
348+
/// (`apply_post_scan_filters_with_stats`). This is the mid-stream
349+
/// equivalent of dropping an optional filter: once the per-batch
350+
/// `update()` path proves an `OptionalFilterPhysicalExpr` is
351+
/// CPU-dominated and ineffective, it flips the flag and subsequent
352+
/// batches stop paying the evaluation cost. The decoder still decodes
353+
/// the filter columns (we cannot rebuild it mid-scan), so I/O is not
354+
/// reclaimed; only the predicate evaluation is skipped.
355+
///
356+
/// Only ever set for filters whose `is_optional` entry is `true` —
357+
/// mandatory filters must always execute or queries return wrong rows.
358+
skip_flags: RwLock<HashMap<FilterId, Arc<AtomicBool>>>,
359+
/// Whether each filter is wrapped in an `OptionalFilterPhysicalExpr`,
360+
/// captured at first-encounter in `partition_filters` so the per-batch
361+
/// `update()` path can decide whether the filter is safe to no-op
362+
/// without re-inspecting the expression tree on every batch.
363+
is_optional: RwLock<HashMap<FilterId, bool>>,
337364
/// Filter lifecycle state machine and dynamic-filter generation tracking.
338365
///
339366
/// Only `partition_filters()` acquires this lock (once per file open).
@@ -372,6 +399,16 @@ impl SelectivityTracker {
372399
/// before `partition_filters()` has registered the filter — in practice
373400
/// this cannot happen because `partition_filters()` runs during file open
374401
/// before any batches are processed).
402+
///
403+
/// **Mid-stream drop:** after every `SKIP_FLAG_CHECK_INTERVAL`'th batch
404+
/// we evaluate the CI upper bound; if it falls below
405+
/// `min_bytes_per_sec` and the filter is wrapped in
406+
/// `OptionalFilterPhysicalExpr`, we set the per-filter skip flag.
407+
/// Subsequent calls to `DatafusionArrowPredicate::evaluate` (row-level)
408+
/// and `apply_post_scan_filters_with_stats` (post-scan) observe the
409+
/// flag and short-circuit their work for that filter. Mandatory
410+
/// filters are never flagged because doing so would change the result
411+
/// set.
375412
pub(crate) fn update(
376413
&self,
377414
id: FilterId,
@@ -380,12 +417,77 @@ impl SelectivityTracker {
380417
eval_nanos: u64,
381418
batch_bytes: u64,
382419
) {
383-
let map = self.filter_stats.read();
384-
if let Some(entry) = map.get(&id) {
385-
entry.lock().update(matched, total, eval_nanos, batch_bytes);
420+
let stats_map = self.filter_stats.read();
421+
let Some(entry) = stats_map.get(&id) else {
422+
return;
423+
};
424+
let mut stats = entry.lock();
425+
stats.update(matched, total, eval_nanos, batch_bytes);
426+
427+
// Mid-stream drop check. Only consult the skip mechanism for
428+
// filters we already know to be optional, and only after enough
429+
// samples for `confidence_upper_bound` to be defined. The modulo
430+
// gate keeps the per-batch overhead tiny on the hot path.
431+
if !self.config.min_bytes_per_sec.is_finite()
432+
|| !stats.sample_count.is_multiple_of(SKIP_FLAG_CHECK_INTERVAL)
433+
{
434+
return;
435+
}
436+
let Some(ub) = stats.confidence_upper_bound(self.config.confidence_z) else {
437+
return;
438+
};
439+
if ub >= self.config.min_bytes_per_sec {
440+
return;
441+
}
442+
drop(stats);
443+
drop(stats_map);
444+
445+
// Optionality is captured at first sight in `partition_filters` so
446+
// we can answer this without re-walking the expression tree.
447+
let is_optional = self.is_optional.read().get(&id).copied().unwrap_or(false);
448+
if !is_optional {
449+
return;
450+
}
451+
if let Some(flag) = self.skip_flags.read().get(&id)
452+
&& !flag.swap(true, Ordering::Release)
453+
{
454+
debug!(
455+
"FilterId {id}: mid-stream skip — CI upper bound {ub} < {} bytes/sec",
456+
self.config.min_bytes_per_sec
457+
);
386458
}
387459
}
388460

461+
/// Returns the shared skip flag for `id`, creating one if absent.
462+
///
463+
/// Cloned into [`crate::row_filter::DatafusionArrowPredicate`] so the
464+
/// row-filter path can short-circuit when the per-batch update path
465+
/// decides the filter has stopped pulling its weight. The post-scan
466+
/// path uses [`Self::is_filter_skipped`] instead — it does not need a
467+
/// long-lived handle.
468+
pub(crate) fn skip_flag(&self, id: FilterId) -> Arc<AtomicBool> {
469+
if let Some(existing) = self.skip_flags.read().get(&id) {
470+
return Arc::clone(existing);
471+
}
472+
let mut write = self.skip_flags.write();
473+
Arc::clone(
474+
write
475+
.entry(id)
476+
.or_insert_with(|| Arc::new(AtomicBool::new(false))),
477+
)
478+
}
479+
480+
/// Returns `true` when `id` has been mid-stream-dropped by the tracker.
481+
///
482+
/// Cheap: a single `RwLock::read` plus an atomic load. Called from the
483+
/// post-scan filter loop in `apply_post_scan_filters_with_stats`.
484+
pub(crate) fn is_filter_skipped(&self, id: FilterId) -> bool {
485+
self.skip_flags
486+
.read()
487+
.get(&id)
488+
.is_some_and(|f| f.load(Ordering::Acquire))
489+
}
490+
389491
/// Partition filters into row-level predicates vs post-scan filters.
390492
///
391493
/// Called once per file open (cold path).
@@ -420,12 +522,22 @@ impl SelectivityTracker {
420522
// Phase 2: if new filters were seen, briefly acquire write lock to insert entries
421523
if !result.new_filter_ids.is_empty() {
422524
let mut stats_write = self.filter_stats.write();
423-
for id in result.new_filter_ids {
525+
for id in &result.new_filter_ids {
424526
stats_write
425-
.entry(id)
527+
.entry(*id)
426528
.or_insert_with(|| Mutex::new(SelectivityStats::default()));
427529
}
428530
}
531+
if !result.new_optional_flags.is_empty() {
532+
let mut optional_write = self.is_optional.write();
533+
let mut skip_write = self.skip_flags.write();
534+
for (id, is_optional) in result.new_optional_flags {
535+
optional_write.entry(id).or_insert(is_optional);
536+
skip_write
537+
.entry(id)
538+
.or_insert_with(|| Arc::new(AtomicBool::new(false)));
539+
}
540+
}
429541

430542
result.partitioned
431543
}
@@ -455,6 +567,12 @@ impl SelectivityTracker {
455567
struct PartitionResult {
456568
partitioned: PartitionedFilters,
457569
new_filter_ids: Vec<FilterId>,
570+
/// `(FilterId, is_optional)` entries observed for the first time in this
571+
/// `partition_filters` call. The outer `SelectivityTracker` records
572+
/// optionality alongside `filter_stats` so that the hot `update()` path
573+
/// can decide whether the per-filter skip flag is safe to flip without
574+
/// inspecting the expression tree.
575+
new_optional_flags: Vec<(FilterId, bool)>,
458576
}
459577

460578
/// Filter state-machine and generation tracking, guarded by the `Mutex`
@@ -578,6 +696,7 @@ impl SelectivityTrackerInner {
578696
stats_map: &HashMap<FilterId, Mutex<SelectivityStats>>,
579697
) -> PartitionResult {
580698
let mut new_filter_ids = Vec::new();
699+
let mut new_optional_flags: Vec<(FilterId, bool)> = Vec::new();
581700

582701
// If min_bytes_per_sec is INFINITY -> all filters are post-scan.
583702
if config.min_bytes_per_sec.is_infinite() {
@@ -586,9 +705,10 @@ impl SelectivityTrackerInner {
586705
filters.len()
587706
);
588707
// Register all filter IDs so update() can find them
589-
for &(id, _) in &filters {
590-
if !stats_map.contains_key(&id) {
591-
new_filter_ids.push(id);
708+
for (id, expr) in &filters {
709+
if !stats_map.contains_key(id) {
710+
new_filter_ids.push(*id);
711+
new_optional_flags.push((*id, is_optional_filter(expr)));
592712
}
593713
}
594714
return PartitionResult {
@@ -597,6 +717,7 @@ impl SelectivityTrackerInner {
597717
post_scan: filters,
598718
},
599719
new_filter_ids,
720+
new_optional_flags,
600721
};
601722
}
602723
// If min_bytes_per_sec is 0 -> all filters are promoted.
@@ -606,9 +727,10 @@ impl SelectivityTrackerInner {
606727
filters.len()
607728
);
608729
// Register all filter IDs so update() can find them
609-
for &(id, _) in &filters {
610-
if !stats_map.contains_key(&id) {
611-
new_filter_ids.push(id);
730+
for (id, expr) in &filters {
731+
if !stats_map.contains_key(id) {
732+
new_filter_ids.push(*id);
733+
new_optional_flags.push((*id, is_optional_filter(expr)));
612734
}
613735
}
614736
return PartitionResult {
@@ -617,6 +739,7 @@ impl SelectivityTrackerInner {
617739
post_scan: Vec::new(),
618740
},
619741
new_filter_ids,
742+
new_optional_flags,
620743
};
621744
}
622745

@@ -667,6 +790,7 @@ impl SelectivityTrackerInner {
667790

668791
if !stats_map.contains_key(&id) {
669792
new_filter_ids.push(id);
793+
new_optional_flags.push((id, is_optional_filter(&expr)));
670794
}
671795

672796
if byte_ratio <= config.byte_ratio_threshold {
@@ -787,10 +911,16 @@ impl SelectivityTrackerInner {
787911
post_scan: post_scan_filters,
788912
},
789913
new_filter_ids,
914+
new_optional_flags,
790915
}
791916
}
792917
}
793918

919+
/// Returns `true` if `expr` is wrapped in [`OptionalFilterPhysicalExpr`].
920+
fn is_optional_filter(expr: &Arc<dyn PhysicalExpr>) -> bool {
921+
expr.downcast_ref::<OptionalFilterPhysicalExpr>().is_some()
922+
}
923+
794924
/// Calculate the estimated number of bytes needed to evaluate a filter based on the columns
795925
/// it references as if it were applied to the entire file.
796926
/// This is used for initial placement of new filters before any stats are available, and as a fallback for filters without stats.

0 commit comments

Comments
 (0)