Skip to content

Commit b5c481f

Browse files
xudong963blaginin
andauthored
Fix fully matched row groups with null counts (#21907)
## Which issue does this PR close? - Related to #21637 ## Rationale for this change This is split out from review feedback on #21637. Row groups can only be marked fully matched when all rows are guaranteed to pass the filter. For nullable predicate columns, proving `NOT(predicate)` is not enough because rows where the predicate evaluates to NULL do not pass the filter. ## What changes are included in this PR? This PR makes the fully matched row-group proof conservative for nulls by adding `IS NULL` checks for nullable columns referenced by the predicate before evaluating the inverted pruning predicate. It also threads `with_missing_null_counts_as_zero` through `RowGroupPruningStatistics` so normal row-group pruning keeps the existing default behavior, while fully matched proofs treat missing null counts as unknown. This reuses the existing statistics conversion path instead of adding a separate null-count conversion pass. ## Are these changes tested? Added a regression test covering row groups with known nulls, known zero nulls, and missing null counts. ## Are there any user-facing changes? No API changes. This only prevents false positives in the row-group fully matched optimization. --------- Co-authored-by: Dmitrii Blaginin <dmitrii@blaginin.me>
1 parent fa9ada3 commit b5c481f

1 file changed

Lines changed: 102 additions & 6 deletions

File tree

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ use arrow::datatypes::Schema;
2424
use datafusion_common::pruning::PruningStatistics;
2525
use datafusion_common::{Column, Result, ScalarValue};
2626
use datafusion_datasource::FileRange;
27-
use datafusion_physical_expr::PhysicalExprSimplifier;
28-
use datafusion_physical_expr::expressions::NotExpr;
27+
use datafusion_expr::Operator;
28+
use datafusion_physical_expr::expressions::{BinaryExpr, IsNullExpr, NotExpr};
29+
use datafusion_physical_expr::utils::collect_columns;
30+
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprSimplifier};
2931
use datafusion_pruning::PruningPredicate;
3032
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3133
use parquet::basic::Type;
@@ -272,6 +274,11 @@ impl RowGroupAccessPlanFilter {
272274
parquet_schema,
273275
row_group_metadatas,
274276
arrow_schema,
277+
// Preserve the existing row-group pruning behavior. This path only
278+
// proves whether matching rows may exist, so it uses the
279+
// StatisticsConverter default for older parquet-rs files where a
280+
// missing null count can mean there are zero nulls.
281+
missing_null_counts_as_zero: true,
275282
};
276283

277284
// try to prune the row groups in a single call
@@ -327,10 +334,33 @@ impl RowGroupAccessPlanFilter {
327334
return;
328335
}
329336

330-
// Use NotExpr to create the inverted predicate
331-
let inverted_expr = Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
337+
let mut inverted_expr: Arc<dyn PhysicalExpr> =
338+
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
332339

333-
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
340+
// Rows where the predicate evaluates to NULL do not pass the filter.
341+
// Include NULL checks in the inverted expression so a row group is only
342+
// considered fully matched when every referenced column is known non-null.
343+
// This is conservative for null-accepting predicates, but fully matched
344+
// row groups must not have false positives.
345+
let mut columns = collect_columns(predicate.orig_expr())
346+
.into_iter()
347+
.filter(|column| arrow_schema.field(column.index()).is_nullable())
348+
.collect::<Vec<_>>();
349+
columns.sort_by(|a, b| {
350+
a.index()
351+
.cmp(&b.index())
352+
.then_with(|| a.name().cmp(b.name()))
353+
});
354+
355+
for column in columns {
356+
inverted_expr = Arc::new(BinaryExpr::new(
357+
inverted_expr,
358+
Operator::Or,
359+
Arc::new(IsNullExpr::new(Arc::new(column))),
360+
));
361+
}
362+
363+
// Simplify the inverted expression (e.g., NOT(c1 = 0) -> c1 != 0)
334364
// before building the pruning predicate
335365
let simplifier = PhysicalExprSimplifier::new(arrow_schema);
336366
let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else {
@@ -350,6 +380,11 @@ impl RowGroupAccessPlanFilter {
350380
.map(|&i| &groups[i])
351381
.collect::<Vec<_>>(),
352382
arrow_schema,
383+
// Fully matched row groups require a stronger proof: every row
384+
// must pass the predicate. Missing null counts are unknown here;
385+
// treating them as zero can incorrectly mark nullable row groups as
386+
// fully matched and make limit pruning unsound.
387+
missing_null_counts_as_zero: false,
353388
};
354389

355390
let Ok(inverted_values) = inverted_predicate.prune(&inverted_pruning_stats)
@@ -582,6 +617,7 @@ struct RowGroupPruningStatistics<'a> {
582617
parquet_schema: &'a SchemaDescriptor,
583618
row_group_metadatas: Vec<&'a RowGroupMetaData>,
584619
arrow_schema: &'a Schema,
620+
missing_null_counts_as_zero: bool,
585621
}
586622

587623
impl<'a> RowGroupPruningStatistics<'a> {
@@ -598,7 +634,8 @@ impl<'a> RowGroupPruningStatistics<'a> {
598634
&column.name,
599635
self.arrow_schema,
600636
self.parquet_schema,
601-
)?)
637+
)?
638+
.with_missing_null_counts_as_zero(self.missing_null_counts_as_zero))
602639
}
603640
}
604641

@@ -767,6 +804,65 @@ mod tests {
767804
assert_pruned(row_groups, ExpectedPruning::Some(vec![1]))
768805
}
769806

807+
#[test]
808+
fn row_group_fully_matched_requires_known_non_null_predicate_columns() {
809+
use datafusion_expr::{col, lit};
810+
811+
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
812+
let expr = logical2physical(&col("c1").gt(lit(15)), &schema);
813+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
814+
815+
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
816+
let schema_descr = get_test_schema_descr(vec![field]);
817+
818+
// All three row groups have non-null values in the predicate range,
819+
// so none are pruned. Only the second row group can be proven fully
820+
// matched because it is the only one with a known zero null count.
821+
let rg_with_null = get_row_group_meta_data(
822+
&schema_descr,
823+
vec![ParquetStatistics::int32(
824+
Some(16),
825+
Some(20),
826+
None,
827+
Some(1),
828+
false,
829+
)],
830+
);
831+
let rg_without_null = get_row_group_meta_data(
832+
&schema_descr,
833+
vec![ParquetStatistics::int32(
834+
Some(16),
835+
Some(20),
836+
None,
837+
Some(0),
838+
false,
839+
)],
840+
);
841+
let rg_unknown_null_count = get_row_group_meta_data(
842+
&schema_descr,
843+
vec![ParquetStatistics::int32(
844+
Some(16),
845+
Some(20),
846+
None,
847+
None,
848+
false,
849+
)],
850+
);
851+
852+
let metrics = parquet_file_metrics();
853+
let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
854+
row_groups.prune_by_statistics(
855+
&schema,
856+
&schema_descr,
857+
&[rg_with_null, rg_without_null, rg_unknown_null_count],
858+
&pruning_predicate,
859+
&metrics,
860+
);
861+
862+
assert_eq!(row_groups.access_plan.row_group_indexes(), vec![0, 1, 2]);
863+
assert_eq!(row_groups.is_fully_matched(), &vec![false, true, false]);
864+
}
865+
770866
#[test]
771867
fn row_group_pruning_predicate_missing_stats() {
772868
use datafusion_expr::{col, lit};

0 commit comments

Comments
 (0)