Skip to content

Commit c919054

Browse files
zhangxffffadriangb
andauthored
perf: short-circuit and collect_bool for IN list with column references (#20694)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20428 . ## Rationale for this change Third PR in the IN list optimization series (split from #20428): - PR1: benchmarks (#20444, merged) - PR2: Arrow vectorized eq kernel (#20528, merged) - **PR3 (this): short-circuit, collect_bool, and first-expr initialization** <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - **Short-circuit break**: convert `try_fold` to `for` loop; when all non-null rows are already `true`, skip remaining list items (up to 27x faster for match=100%/nulls=0%) - **`BooleanBuffer::collect_bool`**: use in `make_comparator` fallback path for nested types instead `(0..n).map().collect()` (suggested by @Dandandan in #20428 ) - **First-expr initialization**: evaluate the first list expression directly as the accumulator, avoiding a redundant `or_kleene(all_false, rhs)` (suggested by @Dandandan in #20428 ) - **Tests**: added 3 new tests covering short-circuit, short-circuit with nulls, and struct column references (make_comparator fallback path) <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes, and add test to cover short-circuit, short-circuit with nulls, and struct column references (make_comparator fallback path) Benchmark result: ``` (zhangxffff) zhangxffff/datafusion@95d3d60664da ~/W/datafusion ((bcc52cd))> critcmp after before group after before ----- ----- ------ in_list_cols/Int32/list=28/match=0%/nulls=0% 1.02 93.8±1.80µs ? ?/sec 1.00 91.8±1.52µs ? ?/sec in_list_cols/Int32/list=28/match=0%/nulls=20% 1.03 105.3±1.95µs ? ?/sec 1.00 102.2±1.59µs ? ?/sec in_list_cols/Int32/list=28/match=100%/nulls=0% 1.00 3.4±0.07µs ? ?/sec 27.14 91.7±1.52µs ? ?/sec in_list_cols/Int32/list=28/match=100%/nulls=20% 1.07 107.7±1.91µs ? ?/sec 1.00 100.4±1.33µs ? ?/sec in_list_cols/Int32/list=28/match=50%/nulls=0% 1.00 50.1±1.15µs ? ?/sec 1.84 92.4±1.36µs ? ?/sec in_list_cols/Int32/list=28/match=50%/nulls=20% 1.05 105.1±1.49µs ? ?/sec 1.00 100.0±0.84µs ? ?/sec in_list_cols/Int32/list=3/match=0%/nulls=0% 1.00 9.9±0.17µs ? ?/sec 1.01 10.1±0.19µs ? ?/sec in_list_cols/Int32/list=3/match=0%/nulls=20% 1.02 11.0±0.18µs ? ?/sec 1.00 10.8±0.16µs ? ?/sec in_list_cols/Int32/list=3/match=100%/nulls=0% 1.00 3.3±0.06µs ? ?/sec 2.95 9.9±0.16µs ? ?/sec in_list_cols/Int32/list=3/match=100%/nulls=20% 1.01 10.9±0.19µs ? ?/sec 1.00 10.8±0.09µs ? ?/sec in_list_cols/Int32/list=3/match=50%/nulls=0% 1.00 10.0±0.17µs ? ?/sec 1.00 9.9±0.18µs ? ?/sec in_list_cols/Int32/list=3/match=50%/nulls=20% 1.05 11.3±0.24µs ? ?/sec 1.00 10.8±0.11µs ? ?/sec in_list_cols/Int32/list=8/match=0%/nulls=0% 1.02 26.7±0.58µs ? ?/sec 1.00 26.2±0.50µs ? ?/sec in_list_cols/Int32/list=8/match=0%/nulls=20% 1.04 29.6±0.57µs ? ?/sec 1.00 28.5±0.45µs ? ?/sec in_list_cols/Int32/list=8/match=100%/nulls=0% 1.00 3.4±0.05µs ? ?/sec 7.78 26.2±0.36µs ? ?/sec in_list_cols/Int32/list=8/match=100%/nulls=20% 1.05 30.0±0.65µs ? ?/sec 1.00 28.7±0.55µs ? ?/sec in_list_cols/Int32/list=8/match=50%/nulls=0% 1.03 26.7±0.59µs ? ?/sec 1.00 26.0±0.37µs ? ?/sec in_list_cols/Int32/list=8/match=50%/nulls=20% 1.04 29.9±0.57µs ? ?/sec 1.00 28.7±0.46µs ? ?/sec in_list_cols/Utf8/list=28/match=0% 1.17 155.0±2.44µs ? ?/sec 1.00 132.8±2.97µs ? ?/sec in_list_cols/Utf8/list=28/match=100% 1.02 726.6±14.54µs ? ?/sec 1.00 712.4±9.09µs ? ?/sec in_list_cols/Utf8/list=28/match=50% 1.02 1070.1±13.06µs ? ?/sec 1.00 1051.8±8.17µs ? ?/sec in_list_cols/Utf8/list=3/match=0% 1.14 16.4±0.37µs ? ?/sec 1.00 14.4±0.22µs ? ?/sec in_list_cols/Utf8/list=3/match=100% 1.02 68.0±1.29µs ? ?/sec 1.00 66.5±0.99µs ? ?/sec in_list_cols/Utf8/list=3/match=50% 1.15 107.6±2.05µs ? ?/sec 1.00 93.6±1.88µs ? ?/sec in_list_cols/Utf8/list=8/match=0% 1.16 44.0±0.61µs ? ?/sec 1.00 37.9±0.95µs ? ?/sec in_list_cols/Utf8/list=8/match=100% 1.00 190.4±2.71µs ? ?/sec 1.03 195.7±2.01µs ? ?/sec in_list_cols/Utf8/list=8/match=50% 1.03 295.9±4.45µs ? ?/sec 1.00 287.3±3.26µs ? ?/sec ``` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 29e8495 commit c919054

1 file changed

Lines changed: 209 additions & 55 deletions

File tree

  • datafusion/physical-expr/src/expressions

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 209 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -793,63 +793,71 @@ impl PhysicalExpr for InListExpr {
793793
// comparator for unsupported types (nested, RunEndEncoded, etc.).
794794
let value = value.into_array(num_rows)?;
795795
let lhs_supports_arrow_eq = supports_arrow_eq(value.data_type());
796-
let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
797-
BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
798-
|result, expr| -> Result<BooleanArray> {
799-
let rhs = match expr? {
800-
ColumnarValue::Array(array) => {
801-
if lhs_supports_arrow_eq
802-
&& supports_arrow_eq(array.data_type())
803-
{
804-
arrow_eq(&value, &array)?
805-
} else {
806-
let cmp = make_comparator(
807-
value.as_ref(),
808-
array.as_ref(),
809-
SortOptions::default(),
810-
)?;
811-
(0..num_rows)
812-
.map(|i| {
813-
if value.is_null(i) || array.is_null(i) {
814-
return None;
815-
}
816-
Some(cmp(i, i).is_eq())
817-
})
818-
.collect::<BooleanArray>()
819-
}
796+
797+
// Helper: compare value against a single list expression
798+
let compare_one = |expr: &Arc<dyn PhysicalExpr>| -> Result<BooleanArray> {
799+
match expr.evaluate(batch)? {
800+
ColumnarValue::Array(array) => {
801+
if lhs_supports_arrow_eq
802+
&& supports_arrow_eq(array.data_type())
803+
{
804+
Ok(arrow_eq(&value, &array)?)
805+
} else {
806+
let cmp = make_comparator(
807+
value.as_ref(),
808+
array.as_ref(),
809+
SortOptions::default(),
810+
)?;
811+
let buffer = BooleanBuffer::collect_bool(num_rows, |i| {
812+
cmp(i, i).is_eq()
813+
});
814+
let nulls =
815+
NullBuffer::union(value.nulls(), array.nulls());
816+
Ok(BooleanArray::new(buffer, nulls))
820817
}
821-
ColumnarValue::Scalar(scalar) => {
822-
// Check if scalar is null once, before the loop
823-
if scalar.is_null() {
824-
// If scalar is null, all comparisons return null
825-
BooleanArray::from(vec![None; num_rows])
826-
} else if lhs_supports_arrow_eq {
827-
let scalar_datum = scalar.to_scalar()?;
828-
arrow_eq(&value, &scalar_datum)?
829-
} else {
830-
// Convert scalar to 1-element array
831-
let array = scalar.to_array()?;
832-
let cmp = make_comparator(
833-
value.as_ref(),
834-
array.as_ref(),
835-
SortOptions::default(),
836-
)?;
837-
// Compare each row of value with the single scalar element
838-
(0..num_rows)
839-
.map(|i| {
840-
if value.is_null(i) {
841-
None
842-
} else {
843-
Some(cmp(i, 0).is_eq())
844-
}
845-
})
846-
.collect::<BooleanArray>()
847-
}
818+
}
819+
ColumnarValue::Scalar(scalar) => {
820+
// Check if scalar is null once, before the loop
821+
if scalar.is_null() {
822+
// If scalar is null, all comparisons return null
823+
Ok(BooleanArray::from(vec![None; num_rows]))
824+
} else if lhs_supports_arrow_eq {
825+
let scalar_datum = scalar.to_scalar()?;
826+
Ok(arrow_eq(&value, &scalar_datum)?)
827+
} else {
828+
// Convert scalar to 1-element array
829+
let array = scalar.to_array()?;
830+
let cmp = make_comparator(
831+
value.as_ref(),
832+
array.as_ref(),
833+
SortOptions::default(),
834+
)?;
835+
// Compare each row of value with the single scalar element
836+
let buffer = BooleanBuffer::collect_bool(num_rows, |i| {
837+
cmp(i, 0).is_eq()
838+
});
839+
Ok(BooleanArray::new(buffer, value.nulls().cloned()))
848840
}
849-
};
850-
Ok(or_kleene(&result, &rhs)?)
851-
},
852-
)?;
841+
}
842+
}
843+
};
844+
845+
// Evaluate first expression directly to avoid a redundant
846+
// or_kleene with an all-false accumulator.
847+
let mut found = if let Some(first) = self.list.first() {
848+
compare_one(first)?
849+
} else {
850+
BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
851+
};
852+
853+
for expr in self.list.iter().skip(1) {
854+
// Short-circuit: if every non-null row is already true,
855+
// no further list items can change the result.
856+
if found.null_count() == 0 && found.true_count() == num_rows {
857+
break;
858+
}
859+
found = or_kleene(&found, &compare_one(expr)?)?;
860+
}
853861

854862
if self.negated { not(&found)? } else { found }
855863
}
@@ -3724,4 +3732,150 @@ mod tests {
37243732
assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
37253733
Ok(())
37263734
}
3735+
3736+
/// Tests that short-circuit evaluation produces correct results.
3737+
/// When all rows match after the first list item, remaining items
3738+
/// should be skipped without affecting correctness.
3739+
#[test]
3740+
fn test_in_list_with_columns_short_circuit() -> Result<()> {
3741+
// a IN (b, c) where b already matches every row of a
3742+
// The short-circuit should skip evaluating c
3743+
let schema = Schema::new(vec![
3744+
Field::new("a", DataType::Int32, false),
3745+
Field::new("b", DataType::Int32, false),
3746+
Field::new("c", DataType::Int32, false),
3747+
]);
3748+
let batch = RecordBatch::try_new(
3749+
Arc::new(schema.clone()),
3750+
vec![
3751+
Arc::new(Int32Array::from(vec![1, 2, 3])),
3752+
Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all rows
3753+
Arc::new(Int32Array::from(vec![99, 99, 99])),
3754+
],
3755+
)?;
3756+
3757+
let col_a = col("a", &schema)?;
3758+
let list = vec![col("b", &schema)?, col("c", &schema)?];
3759+
let expr = make_in_list_with_columns(col_a, list, false);
3760+
3761+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3762+
let result = as_boolean_array(&result);
3763+
assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
3764+
Ok(())
3765+
}
3766+
3767+
/// Short-circuit must NOT skip when nulls are present (three-valued logic).
3768+
/// Even if all non-null values are true, null rows keep the result as null.
3769+
#[test]
3770+
fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
3771+
// a IN (b, c) where a has nulls
3772+
// Even if b matches all non-null rows, result should preserve nulls
3773+
let schema = Schema::new(vec![
3774+
Field::new("a", DataType::Int32, true),
3775+
Field::new("b", DataType::Int32, false),
3776+
Field::new("c", DataType::Int32, false),
3777+
]);
3778+
let batch = RecordBatch::try_new(
3779+
Arc::new(schema.clone()),
3780+
vec![
3781+
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
3782+
Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null rows
3783+
Arc::new(Int32Array::from(vec![99, 99, 99])),
3784+
],
3785+
)?;
3786+
3787+
let col_a = col("a", &schema)?;
3788+
let list = vec![col("b", &schema)?, col("c", &schema)?];
3789+
let expr = make_in_list_with_columns(col_a, list, false);
3790+
3791+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3792+
let result = as_boolean_array(&result);
3793+
// row 0: 1 IN (1, 99) → true
3794+
// row 1: NULL IN (2, 99) → NULL
3795+
// row 2: 3 IN (3, 99) → true
3796+
assert_eq!(
3797+
result,
3798+
&BooleanArray::from(vec![Some(true), None, Some(true)])
3799+
);
3800+
Ok(())
3801+
}
3802+
3803+
/// Tests the make_comparator + collect_bool fallback path using
3804+
/// struct column references (nested types don't support arrow_eq).
3805+
#[test]
3806+
fn test_in_list_with_columns_struct() -> Result<()> {
3807+
let struct_fields = Fields::from(vec![
3808+
Field::new("x", DataType::Int32, false),
3809+
Field::new("y", DataType::Utf8, false),
3810+
]);
3811+
let struct_dt = DataType::Struct(struct_fields.clone());
3812+
3813+
let schema = Schema::new(vec![
3814+
Field::new("a", struct_dt.clone(), true),
3815+
Field::new("b", struct_dt.clone(), false),
3816+
Field::new("c", struct_dt.clone(), false),
3817+
]);
3818+
3819+
// a: [{1,"a"}, {2,"b"}, NULL, {4,"d"}]
3820+
// b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
3821+
// c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
3822+
let a = Arc::new(StructArray::new(
3823+
struct_fields.clone(),
3824+
vec![
3825+
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3826+
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3827+
],
3828+
Some(vec![true, true, false, true].into()),
3829+
));
3830+
let b = Arc::new(StructArray::new(
3831+
struct_fields.clone(),
3832+
vec![
3833+
Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
3834+
Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
3835+
],
3836+
None,
3837+
));
3838+
let c = Arc::new(StructArray::new(
3839+
struct_fields.clone(),
3840+
vec![
3841+
Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
3842+
Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
3843+
],
3844+
None,
3845+
));
3846+
3847+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, c])?;
3848+
3849+
let col_a = col("a", &schema)?;
3850+
let list = vec![col("b", &schema)?, col("c", &schema)?];
3851+
let expr = make_in_list_with_columns(col_a, list, false);
3852+
3853+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3854+
let result = as_boolean_array(&result);
3855+
// row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true (matches b)
3856+
// row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true (matches c)
3857+
// row 2: NULL IN ({3,"c"}, {9,"z"}) → NULL
3858+
// row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true (matches b)
3859+
assert_eq!(
3860+
result,
3861+
&BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
3862+
);
3863+
3864+
// Also test NOT IN
3865+
let col_a = col("a", &schema)?;
3866+
let list = vec![col("b", &schema)?, col("c", &schema)?];
3867+
let expr = make_in_list_with_columns(col_a, list, true);
3868+
3869+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3870+
let result = as_boolean_array(&result);
3871+
// row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
3872+
// row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
3873+
// row 2: NULL NOT IN ({3,"c"}, {9,"z"}) → NULL
3874+
// row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
3875+
assert_eq!(
3876+
result,
3877+
&BooleanArray::from(vec![Some(false), Some(false), None, Some(false)])
3878+
);
3879+
Ok(())
3880+
}
37273881
}

0 commit comments

Comments
 (0)