Skip to content

Commit fd7df66

Browse files
feat(spark): implement Spark bitmap function bitmap_count (#17179)
* feat(spark): implement Spark `misc` function `bitmap_count` * chore: add ASF license text * chore: move bitmap_count to spark/bitmap module, improve error handling, add sqllogictests for different types, remove hint * fix: BitmapCount derive PartialEq, Eq, Hash * chore: reminder to implement TypeSignature for BitmapCount when possible
1 parent 2c9f42b commit fd7df66

5 files changed

Lines changed: 280 additions & 0 deletions

File tree

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{
22+
Array, ArrayRef, BinaryArray, BinaryViewArray, FixedSizeBinaryArray, Int64Array,
23+
LargeBinaryArray,
24+
};
25+
use arrow::datatypes::DataType;
26+
use arrow::datatypes::DataType::{
27+
Binary, BinaryView, FixedSizeBinary, Int64, LargeBinary,
28+
};
29+
use datafusion_common::utils::take_function_args;
30+
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
31+
use datafusion_expr::{
32+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
33+
};
34+
use datafusion_functions::utils::make_scalar_function;
35+
use datafusion_functions::{downcast_arg, downcast_named_arg};
36+
37+
#[derive(Debug, PartialEq, Eq, Hash)]
38+
pub struct BitmapCount {
39+
signature: Signature,
40+
}
41+
42+
impl Default for BitmapCount {
43+
fn default() -> Self {
44+
Self::new()
45+
}
46+
}
47+
48+
impl BitmapCount {
49+
pub fn new() -> Self {
50+
Self {
51+
// TODO: add definitive TypeSignature after https://github.com/apache/datafusion/issues/17291 is done
52+
signature: Signature::any(1, Volatility::Immutable),
53+
}
54+
}
55+
}
56+
57+
impl ScalarUDFImpl for BitmapCount {
58+
fn as_any(&self) -> &dyn Any {
59+
self
60+
}
61+
62+
fn name(&self) -> &str {
63+
"bitmap_count"
64+
}
65+
66+
fn signature(&self) -> &Signature {
67+
&self.signature
68+
}
69+
70+
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
71+
match arg_types.first() {
72+
Some(Binary | BinaryView | FixedSizeBinary(_) | LargeBinary) => Ok(Int64),
73+
Some(data_type) => plan_err!(
74+
"bitmap_count expects Binary/BinaryView/FixedSizeBinary/LargeBinary as argument, got {:?}",
75+
data_type
76+
),
77+
None => internal_err!("bitmap_count does not support zero arguments"),
78+
}
79+
}
80+
81+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
82+
make_scalar_function(bitmap_count_inner, vec![])(&args.args)
83+
}
84+
}
85+
86+
fn binary_count_ones(opt: Option<&[u8]>) -> Option<i64> {
87+
opt.map(|value| value.iter().map(|b| b.count_ones() as i64).sum())
88+
}
89+
90+
macro_rules! downcast_and_count_ones {
91+
($input_array:expr, $array_type:ident) => {{
92+
let arr = downcast_arg!($input_array, $array_type);
93+
Ok(arr.iter().map(binary_count_ones).collect::<Int64Array>())
94+
}};
95+
}
96+
97+
pub fn bitmap_count_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
98+
let [input_array] = take_function_args("bitmap_count", arg)?;
99+
100+
let res: Result<Int64Array> = match &input_array.data_type() {
101+
Binary => downcast_and_count_ones!(input_array, BinaryArray),
102+
BinaryView => downcast_and_count_ones!(input_array, BinaryViewArray),
103+
LargeBinary => downcast_and_count_ones!(input_array, LargeBinaryArray),
104+
FixedSizeBinary(_size) => {
105+
downcast_and_count_ones!(input_array, FixedSizeBinaryArray)
106+
}
107+
data_type => {
108+
internal_err!("bitmap_count does not support {:?}", data_type)
109+
}
110+
};
111+
112+
Ok(Arc::new(res?))
113+
}
114+
115+
#[cfg(test)]
116+
mod tests {
117+
use crate::function::bitmap::bitmap_count::BitmapCount;
118+
use crate::function::utils::test::test_scalar_function;
119+
use arrow::array::{Array, Int64Array};
120+
use arrow::datatypes::DataType::Int64;
121+
use datafusion_common::{Result, ScalarValue};
122+
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
123+
124+
macro_rules! test_bitmap_count_binary_invoke {
125+
($INPUT:expr, $EXPECTED:expr) => {
126+
test_scalar_function!(
127+
BitmapCount::new(),
128+
vec![ColumnarValue::Scalar(ScalarValue::Binary($INPUT))],
129+
$EXPECTED,
130+
i64,
131+
Int64,
132+
Int64Array
133+
);
134+
135+
test_scalar_function!(
136+
BitmapCount::new(),
137+
vec![ColumnarValue::Scalar(ScalarValue::LargeBinary($INPUT))],
138+
$EXPECTED,
139+
i64,
140+
Int64,
141+
Int64Array
142+
);
143+
144+
test_scalar_function!(
145+
BitmapCount::new(),
146+
vec![ColumnarValue::Scalar(ScalarValue::BinaryView($INPUT))],
147+
$EXPECTED,
148+
i64,
149+
Int64,
150+
Int64Array
151+
);
152+
153+
test_scalar_function!(
154+
BitmapCount::new(),
155+
vec![ColumnarValue::Scalar(ScalarValue::FixedSizeBinary(
156+
$INPUT.map(|a| a.len()).unwrap_or(0) as i32,
157+
$INPUT
158+
))],
159+
$EXPECTED,
160+
i64,
161+
Int64,
162+
Int64Array
163+
);
164+
};
165+
}
166+
167+
#[test]
168+
fn test_bitmap_count_invoke() -> Result<()> {
169+
test_bitmap_count_binary_invoke!(None::<Vec<u8>>, Ok(None));
170+
test_bitmap_count_binary_invoke!(Some(vec![0x0Au8]), Ok(Some(2)));
171+
test_bitmap_count_binary_invoke!(Some(vec![0xFFu8, 0xFFu8]), Ok(Some(16)));
172+
test_bitmap_count_binary_invoke!(
173+
Some(vec![0x0Au8, 0xB0u8, 0xCDu8]),
174+
Ok(Some(10))
175+
);
176+
Ok(())
177+
}
178+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod bitmap_count;
19+
20+
use datafusion_expr::ScalarUDF;
21+
use datafusion_functions::make_udf_function;
22+
use std::sync::Arc;
23+
24+
make_udf_function!(bitmap_count::BitmapCount, bitmap_count);
25+
26+
pub mod expr_fn {
27+
use datafusion_functions::export_functions;
28+
29+
export_functions!((
30+
bitmap_count,
31+
"Returns the number of set bits in the input bitmap.",
32+
arg
33+
));
34+
}
35+
36+
pub fn functions() -> Vec<Arc<ScalarUDF>> {
37+
vec![bitmap_count()]
38+
}

datafusion/spark/src/function/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod aggregate;
1919
pub mod array;
20+
pub mod bitmap;
2021
pub mod bitwise;
2122
pub mod collection;
2223
pub mod conditional;

datafusion/spark/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ use std::sync::Arc;
104104
pub mod expr_fn {
105105
pub use super::function::aggregate::expr_fn::*;
106106
pub use super::function::array::expr_fn::*;
107+
pub use super::function::bitmap::expr_fn::*;
107108
pub use super::function::bitwise::expr_fn::*;
108109
pub use super::function::collection::expr_fn::*;
109110
pub use super::function::conditional::expr_fn::*;
@@ -130,6 +131,7 @@ pub mod expr_fn {
130131
pub fn all_default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
131132
function::array::functions()
132133
.into_iter()
134+
.chain(function::bitmap::functions())
133135
.chain(function::bitwise::functions())
134136
.chain(function::collection::functions())
135137
.chain(function::conditional::functions())
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
query I
19+
SELECT bitmap_count(X'1010');
20+
----
21+
2
22+
23+
query I
24+
SELECT bitmap_count(X'FFFF');
25+
----
26+
16
27+
28+
query I
29+
SELECT bitmap_count(X'0');
30+
----
31+
0
32+
33+
query I
34+
SELECT bitmap_count(a) FROM (VALUES (X'0AB0'), (X'0AB0CD'), (NULL)) AS t(a);
35+
----
36+
5
37+
10
38+
NULL
39+
40+
# Tests with different binary types
41+
query I
42+
SELECT bitmap_count(arrow_cast(a, 'LargeBinary')) FROM (VALUES (X'0AB0'), (X'0AB0CD'), (NULL)) AS t(a);
43+
----
44+
5
45+
10
46+
NULL
47+
48+
query I
49+
SELECT bitmap_count(arrow_cast(a, 'BinaryView')) FROM (VALUES (X'0AB0'), (X'0AB0CD'), (NULL)) AS t(a);
50+
----
51+
5
52+
10
53+
NULL
54+
55+
query I
56+
SELECT bitmap_count(arrow_cast(a, 'FixedSizeBinary(2)')) FROM (VALUES (X'1010'), (X'0AB0'), (X'FFFF'), (NULL)) AS t(a);
57+
----
58+
2
59+
5
60+
16
61+
NULL

0 commit comments

Comments
 (0)