Skip to content

Commit 1c81b51

Browse files
committed
some improvements
1 parent 491d495 commit 1c81b51

3 files changed

Lines changed: 248 additions & 37 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,12 @@ impl FileOpener for ParquetOpener {
606606
}
607607
}
608608

609-
// Partition filters under a brief read lock, then release it.
609+
// Partition filters under a brief write lock (needed for decision caching).
610610
let PartitionedFiltersGrouped {
611611
row_filter_groups,
612612
mut post_scan,
613613
} = {
614-
let tracker = selectivity_tracker.read();
614+
let mut tracker = selectivity_tracker.write();
615615
if let Some(predicate) =
616616
pushdown_filters.then_some(predicate.as_ref()).flatten()
617617
{
@@ -1045,13 +1045,6 @@ impl<S> SelectivityUpdatingStream<S> {
10451045
fn update_selectivity(&mut self) {
10461046
let mut tracker = self.selectivity_tracker.write();
10471047
for (i, metrics) in self.filter_metrics.iter().enumerate() {
1048-
// Only feed stats back for single-filter predicates.
1049-
// Compound predicates (multiple correlated filters grouped together)
1050-
// have shared metrics that can't be attributed to individual filters.
1051-
if metrics.exprs.len() != 1 {
1052-
continue;
1053-
}
1054-
10551048
let current_matched = metrics.get_rows_matched() as u64;
10561049
let current_total = metrics.get_rows_total() as u64;
10571050
let current_nanos = metrics.get_eval_nanos() as u64;
@@ -1063,7 +1056,7 @@ impl<S> SelectivityUpdatingStream<S> {
10631056

10641057
if delta_total > 0 {
10651058
tracker.update(
1066-
&metrics.exprs[0],
1059+
&metrics.expr,
10671060
delta_matched,
10681061
delta_total,
10691062
delta_nanos,

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,11 @@ use super::supported_predicates::supports_list_predicates;
9696
/// to update selectivity statistics after processing completes.
9797
#[derive(Debug, Clone)]
9898
pub struct FilterMetrics {
99-
/// The original filter expressions that make up this predicate.
100-
/// A single-element vec means a standalone filter; multiple elements
101-
/// means a compound predicate built from correlated filters.
102-
/// Only single-element metrics are fed back into the selectivity tracker,
103-
/// because compound metrics cannot be attributed to individual filters.
104-
pub exprs: Vec<Arc<dyn PhysicalExpr>>,
99+
/// The filter expression for this predicate.
100+
/// For standalone filters this is the original expression.
101+
/// For compound predicates (correlated filters), this is the conjunction.
102+
/// Stats are fed back into the selectivity tracker keyed on this expression.
103+
pub expr: Arc<dyn PhysicalExpr>,
105104
/// Counter for rows that matched (passed) this filter
106105
rows_matched: metrics::Count,
107106
/// Counter for rows that were pruned (filtered out) by this filter
@@ -808,7 +807,7 @@ pub fn build_row_filter_with_metrics(
808807

809808
// Store references to the metrics for the filter
810809
filter_metrics.push(FilterMetrics {
811-
exprs: vec![original_expr],
810+
expr: original_expr,
812811
rows_matched: predicate_rows_matched.clone(),
813812
rows_pruned: predicate_rows_pruned.clone(),
814813
eval_time: local_eval_time.clone(),
@@ -882,7 +881,7 @@ pub fn build_row_filter_from_groups(
882881

883882
// For each group, combine into a single expression and build a candidate.
884883
// Track original expressions for metrics and unbuildable filters.
885-
let mut candidates_with_exprs: Vec<(Vec<Arc<dyn PhysicalExpr>>, FilterCandidate)> =
884+
let mut candidates_with_exprs: Vec<(Arc<dyn PhysicalExpr>, FilterCandidate)> =
886885
Vec::new();
887886
let mut unbuildable_filters: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
888887

@@ -901,7 +900,7 @@ pub fn build_row_filter_from_groups(
901900
.build(metadata)
902901
{
903902
Ok(Some(candidate)) => {
904-
candidates_with_exprs.push((group, candidate));
903+
candidates_with_exprs.push((combined_expr, candidate));
905904
}
906905
Ok(None) | Err(_) => {
907906
// Compound can't push down — try each individually.
@@ -915,7 +914,7 @@ pub fn build_row_filter_from_groups(
915914
.build(metadata)
916915
{
917916
Ok(Some(candidate)) => {
918-
candidates_with_exprs.push((vec![expr], candidate));
917+
candidates_with_exprs.push((Arc::clone(&expr), candidate));
919918
}
920919
_ => {
921920
unbuildable_filters.push(expr);
@@ -958,7 +957,7 @@ pub fn build_row_filter_from_groups(
958957
let mut filter_metrics = Vec::new();
959958
let mut arrow_predicates = Vec::with_capacity(total_candidates);
960959

961-
for (idx, (original_exprs, candidate)) in
960+
for (idx, (original_expr, candidate)) in
962961
candidates_with_exprs.into_iter().enumerate()
963962
{
964963
let is_last = idx == total_candidates - 1;
@@ -968,11 +967,10 @@ pub fn build_row_filter_from_groups(
968967
let predicate_rows_pruned = metrics::Count::new();
969968
let local_eval_time = metrics::Time::new();
970969

971-
// One FilterMetrics entry per predicate, storing all original
972-
// expressions in the group. Only single-element groups will have
973-
// their stats fed back into the selectivity tracker.
970+
// One FilterMetrics entry per predicate, keyed on the conjunction
971+
// expression (or original expression for single filters).
974972
filter_metrics.push(FilterMetrics {
975-
exprs: original_exprs,
973+
expr: original_expr,
976974
rows_matched: predicate_rows_matched.clone(),
977975
rows_pruned: predicate_rows_pruned.clone(),
978976
eval_time: local_eval_time.clone(),

0 commit comments

Comments
 (0)