Skip to content

Commit dbab02b

Browse files
committed
cleanup
1 parent bdb79be commit dbab02b

3 files changed

Lines changed: 301 additions & 168 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 121 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -606,78 +606,83 @@ impl FileOpener for ParquetOpener {
606606
}
607607
}
608608

609-
// Acquire tracker lock once for both partitioning and row filter building.
610-
let (post_scan_filters, projection, mask) = {
609+
// Partition filters under a brief read lock, then release it.
610+
let PartitionedFiltersGrouped {
611+
row_filter_groups,
612+
mut post_scan,
613+
} = {
611614
let tracker = selectivity_tracker.read();
612-
613-
let PartitionedFiltersGrouped {
614-
row_filter_groups,
615-
post_scan,
616-
} = if let Some(predicate) =
615+
if let Some(predicate) =
617616
pushdown_filters.then_some(predicate.as_ref()).flatten()
618617
{
619-
// Split predicate into conjuncts and partition based on selectivity
620-
// and correlation
621618
let conjuncts: Vec<Arc<dyn PhysicalExpr>> =
622619
split_conjunction(predicate)
623620
.into_iter()
624621
.map(Arc::clone)
625622
.collect();
626623
tracker.partition_filters_grouped(conjuncts)
627624
} else {
628-
PartitionedFiltersGrouped {
629-
row_filter_groups: vec![],
630-
post_scan: vec![],
631-
}
632-
};
633-
634-
// Include columns needed by post-scan filters in the projection mask,
635-
// but don't add filter expressions as projection columns.
636-
let mut all_indices: Vec<usize> = projection.column_indices();
637-
for filter in &post_scan {
638-
for col in datafusion_physical_expr::utils::collect_columns(filter) {
639-
let idx = col.index();
640-
if !all_indices.contains(&idx) {
641-
all_indices.push(idx);
642-
}
643-
}
625+
PartitionedFiltersGrouped::default()
644626
}
645-
let mask = ProjectionMask::roots(builder.parquet_schema(), all_indices);
646-
647-
// Build row filter from groups of correlated filters
648-
if !row_filter_groups.is_empty() {
649-
let row_filter_result = row_filter::build_row_filter_from_groups(
627+
};
628+
// read lock released
629+
630+
// Build row filter from groups of correlated filters.
631+
// build_row_filter_from_groups needs a brief read lock for reordering.
632+
if !row_filter_groups.is_empty() {
633+
let row_filter_result = {
634+
let tracker = selectivity_tracker.read();
635+
row_filter::build_row_filter_from_groups(
650636
row_filter_groups,
651637
&physical_file_schema,
652638
builder.metadata(),
653639
reorder_predicates,
654640
&file_metrics,
655641
&tracker,
656-
);
642+
)
643+
};
657644

658-
match row_filter_result {
659-
Ok(Some(result)) => {
645+
match row_filter_result {
646+
Ok(Some(result)) => {
647+
if !result.filter_metrics.is_empty() {
660648
builder = builder.with_row_filter(result.row_filter);
661-
filter_metrics = result.filter_metrics;
662649
}
663-
Ok(None) => {}
664-
Err(e) => {
665-
debug!("Ignoring error building row filter: {e}");
650+
filter_metrics = result.filter_metrics;
651+
// Unbuildable filters must be applied as post-scan
652+
// to preserve correctness.
653+
post_scan.extend(result.unbuildable_filters);
654+
}
655+
Ok(None) => {}
656+
Err(e) => {
657+
debug!("Ignoring error building row filter: {e}");
658+
}
659+
};
660+
}
661+
662+
let post_scan_filters = post_scan;
663+
664+
// Include columns needed by post-scan filters in the projection mask,
665+
// but don't add filter expressions as projection columns.
666+
let mask = {
667+
let mut all_indices: Vec<usize> = projection.column_indices();
668+
for filter in &post_scan_filters {
669+
for col in datafusion_physical_expr::utils::collect_columns(filter) {
670+
let idx = col.index();
671+
if !all_indices.contains(&idx) {
672+
all_indices.push(idx);
666673
}
667-
};
674+
}
668675
}
669-
670-
(post_scan, projection, mask)
676+
ProjectionMask::roots(builder.parquet_schema(), all_indices)
671677
};
672-
// tracker lock released here
673678

674679
// Apply limit to the reader only when there are no post-scan filters.
675680
// If post-scan filters exist, the limit must be enforced after filtering
676681
// (otherwise the reader stops reading before the filter can find matches).
677-
if post_scan_filters.is_empty() {
678-
if let Some(limit) = limit {
679-
builder = builder.with_limit(limit);
680-
}
682+
if post_scan_filters.is_empty()
683+
&& let Some(limit) = limit
684+
{
685+
builder = builder.with_limit(limit);
681686
}
682687

683688
let stream = builder
@@ -823,70 +828,89 @@ fn apply_post_scan_filters(
823828
let combined_mask = if collecting {
824829
// Collection phase: evaluate each filter individually with timing
825830
// to gather per-filter stats for adaptive decisions.
826-
let mut tracker = selectivity_tracker.write();
827-
831+
// Evaluation and correlation computation happen without the lock;
832+
// only the brief tracker updates acquire the write lock.
833+
let mut per_filter_stats: Vec<(u64, u64)> =
834+
Vec::with_capacity(filter_exprs.len());
828835
let mut bool_arrays = Vec::with_capacity(filter_exprs.len());
829836
for expr in filter_exprs {
830837
let start = datafusion_common::instant::Instant::now();
831838
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
832839
let nanos = start.elapsed().as_nanos() as u64;
833840
let bool_arr = as_boolean_array(result.as_ref()).clone();
834841
let matched = bool_arr.true_count() as u64;
835-
tracker.update(expr, matched, input_rows, nanos);
842+
per_filter_stats.push((matched, nanos));
836843
bool_arrays.push(bool_arr);
837844
}
838845

839846
// Compute pairwise joint pass counts for correlation tracking.
840847
// Only when 2+ filters and <= 10 filters (to bound O(n^2) cost).
841848
let n = filter_exprs.len();
842-
if n >= 2 && n <= 10 {
849+
let mut pairwise_stats: Vec<(usize, usize, u64)> = Vec::new();
850+
if (2..=10).contains(&n) {
843851
for i in 0..n {
844852
for j in (i + 1)..n {
845853
let both = and(&bool_arrays[i], &bool_arrays[j])?;
846-
tracker.update_correlation(
847-
&filter_exprs[i],
848-
&filter_exprs[j],
849-
both.true_count() as u64,
850-
input_rows,
851-
);
854+
pairwise_stats.push((i, j, both.true_count() as u64));
852855
}
853856
}
854857
}
855858

856-
// Combine all boolean arrays with AND
857-
match bool_arrays.len() {
858-
1 => bool_arrays.into_iter().next().unwrap(),
859-
_ => {
860-
let mut iter = bool_arrays.into_iter();
861-
let mut acc = iter.next().unwrap();
862-
for arr in iter {
863-
acc = and(&acc, &arr)?;
864-
}
865-
acc
859+
// Now acquire the write lock briefly to update the tracker.
860+
{
861+
let mut tracker = selectivity_tracker.write();
862+
for (idx, (matched, nanos)) in per_filter_stats.into_iter().enumerate() {
863+
tracker.update(&filter_exprs[idx], matched, input_rows, nanos);
864+
}
865+
for (i, j, both_passed) in pairwise_stats {
866+
tracker.update_correlation(
867+
&filter_exprs[i],
868+
&filter_exprs[j],
869+
both_passed,
870+
input_rows,
871+
);
866872
}
867873
}
874+
875+
// Combine all boolean arrays with AND
876+
and_boolean_arrays(bool_arrays)
868877
} else {
869878
// Post-collection: stats are frozen. Evaluate all filters together
870879
// without per-filter tracking overhead.
871-
let mut combined: Option<arrow::array::BooleanArray> = None;
872-
for expr in filter_exprs {
873-
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
874-
let bool_arr = as_boolean_array(result.as_ref()).clone();
875-
combined = Some(match combined {
876-
Some(mask) => and(&mask, &bool_arr)?,
877-
None => bool_arr,
878-
});
879-
}
880-
combined.unwrap()
880+
let bool_arrays: Vec<arrow::array::BooleanArray> = filter_exprs
881+
.iter()
882+
.map(|expr| {
883+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
884+
Ok(as_boolean_array(result.as_ref()).clone())
885+
})
886+
.collect::<Result<_>>()?;
887+
and_boolean_arrays(bool_arrays)
881888
};
882889

883890
Ok(filter_record_batch(&batch, &combined_mask)?)
884891
}
885892

893+
/// Combine multiple boolean arrays with AND, returning a single boolean array.
894+
///
895+
/// Panics if `arrays` is empty.
896+
fn and_boolean_arrays(
897+
arrays: Vec<arrow::array::BooleanArray>,
898+
) -> arrow::array::BooleanArray {
899+
use arrow::compute::and;
900+
let mut iter = arrays.into_iter();
901+
let mut acc = iter.next().expect("at least one boolean array");
902+
for arr in iter {
903+
acc = and(&acc, &arr).expect("boolean AND should not fail on same-length arrays");
904+
}
905+
acc
906+
}
907+
886908
/// Compute the average bytes per row from parquet metadata for the projected columns.
887909
///
888-
/// Uses compressed column sizes as an approximation. Returns `None` if the file
889-
/// has zero rows.
910+
/// Uses **compressed** column sizes as an approximation of the I/O savings from
911+
/// late materialization. This can diverge from actual savings when heavy dictionary
912+
/// encoding, RLE, or bloom filters cause the compressed-to-uncompressed ratio to
913+
/// vary significantly between columns. Returns `None` if the file has zero rows.
890914
fn compute_bytes_per_row(
891915
metadata: &parquet::file::metadata::ParquetMetaData,
892916
col_indices: &[usize],
@@ -1021,6 +1045,13 @@ impl<S> SelectivityUpdatingStream<S> {
10211045
fn update_selectivity(&mut self) {
10221046
let mut tracker = self.selectivity_tracker.write();
10231047
for (i, metrics) in self.filter_metrics.iter().enumerate() {
1048+
// Only feed stats back for single-filter predicates.
1049+
// Compound predicates (multiple correlated filters grouped together)
1050+
// have shared metrics that can't be attributed to individual filters.
1051+
if metrics.exprs.len() != 1 {
1052+
continue;
1053+
}
1054+
10241055
let current_matched = metrics.get_rows_matched() as u64;
10251056
let current_total = metrics.get_rows_total() as u64;
10261057
let current_nanos = metrics.get_eval_nanos() as u64;
@@ -1031,7 +1062,12 @@ impl<S> SelectivityUpdatingStream<S> {
10311062
let delta_nanos = current_nanos - last_nanos;
10321063

10331064
if delta_total > 0 {
1034-
tracker.update(&metrics.expr, delta_matched, delta_total, delta_nanos);
1065+
tracker.update(
1066+
&metrics.exprs[0],
1067+
delta_matched,
1068+
delta_total,
1069+
delta_nanos,
1070+
);
10351071
self.last_reported[i] = (current_matched, current_total, current_nanos);
10361072
}
10371073
}
@@ -1058,10 +1094,14 @@ where
10581094
self.done = true;
10591095
Poll::Ready(None)
10601096
}
1061-
Some(result) => {
1062-
// Update selectivity after each batch
1097+
Some(Ok(batch)) => {
1098+
// Update selectivity after each successful batch
10631099
self.update_selectivity();
1064-
Poll::Ready(Some(result))
1100+
Poll::Ready(Some(Ok(batch)))
1101+
}
1102+
Some(Err(e)) => {
1103+
// Don't update selectivity on error — metrics may be partial/corrupt
1104+
Poll::Ready(Some(Err(e)))
10651105
}
10661106
}
10671107
}

0 commit comments

Comments
 (0)