Skip to content

Commit d7d6461

Browse files
kazantsev-maksimKazantsev Maksim
andauthored
feat: Implement Spark bin function (#20479)
## Which issue does this PR close? N/A ## Rationale for this change Add new function: https://spark.apache.org/docs/latest/api/sql/index.html#bin ## What changes are included in this PR? - Implementation - Unit Tests - SLT tests ## Are these changes tested? Yes, tests added as part of this PR. ## Are there any user-facing changes? No, these are new function. --------- Co-authored-by: Kazantsev Maksim <mn.kazantsev@gmail.com>
1 parent e937cad commit d7d6461

3 files changed

Lines changed: 176 additions & 20 deletions

File tree

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, AsArray, StringArray};
19+
use arrow::datatypes::{DataType, Field, FieldRef, Int64Type};
20+
use datafusion::logical_expr::{ColumnarValue, Signature, TypeSignature, Volatility};
21+
use datafusion_common::types::{NativeType, logical_int64};
22+
use datafusion_common::utils::take_function_args;
23+
use datafusion_common::{Result, internal_err};
24+
use datafusion_expr::{Coercion, ScalarFunctionArgs, ScalarUDFImpl, TypeSignatureClass};
25+
use datafusion_functions::utils::make_scalar_function;
26+
use std::any::Any;
27+
use std::sync::Arc;
28+
29+
/// Spark-compatible `bin` expression
30+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#bin>
31+
#[derive(Debug, PartialEq, Eq, Hash)]
32+
pub struct SparkBin {
33+
signature: Signature,
34+
}
35+
36+
impl Default for SparkBin {
37+
fn default() -> Self {
38+
Self::new()
39+
}
40+
}
41+
42+
impl SparkBin {
43+
pub fn new() -> Self {
44+
Self {
45+
signature: Signature::one_of(
46+
vec![TypeSignature::Coercible(vec![Coercion::new_implicit(
47+
TypeSignatureClass::Native(logical_int64()),
48+
vec![TypeSignatureClass::Numeric],
49+
NativeType::Int64,
50+
)])],
51+
Volatility::Immutable,
52+
),
53+
}
54+
}
55+
}
56+
57+
impl ScalarUDFImpl for SparkBin {
58+
fn as_any(&self) -> &dyn Any {
59+
self
60+
}
61+
62+
fn name(&self) -> &str {
63+
"bin"
64+
}
65+
66+
fn signature(&self) -> &Signature {
67+
&self.signature
68+
}
69+
70+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
71+
internal_err!("return_field_from_args should be used instead")
72+
}
73+
74+
fn return_field_from_args(
75+
&self,
76+
args: datafusion_expr::ReturnFieldArgs,
77+
) -> Result<FieldRef> {
78+
Ok(Arc::new(Field::new(
79+
self.name(),
80+
DataType::Utf8,
81+
args.arg_fields[0].is_nullable(),
82+
)))
83+
}
84+
85+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
86+
make_scalar_function(spark_bin_inner, vec![])(&args.args)
87+
}
88+
}
89+
90+
fn spark_bin_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
91+
let [array] = take_function_args("bin", arg)?;
92+
match &array.data_type() {
93+
DataType::Int64 => {
94+
let result: StringArray = array
95+
.as_primitive::<Int64Type>()
96+
.iter()
97+
.map(|opt| opt.map(spark_bin))
98+
.collect();
99+
Ok(Arc::new(result))
100+
}
101+
data_type => {
102+
internal_err!("bin does not support: {data_type}")
103+
}
104+
}
105+
}
106+
107+
fn spark_bin(value: i64) -> String {
108+
format!("{value:b}")
109+
}

datafusion/spark/src/function/math/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod abs;
19+
pub mod bin;
1920
pub mod expm1;
2021
pub mod factorial;
2122
pub mod hex;
@@ -42,6 +43,7 @@ make_udf_function!(width_bucket::SparkWidthBucket, width_bucket);
4243
make_udf_function!(trigonometry::SparkCsc, csc);
4344
make_udf_function!(trigonometry::SparkSec, sec);
4445
make_udf_function!(negative::SparkNegative, negative);
46+
make_udf_function!(bin::SparkBin, bin);
4547

4648
pub mod expr_fn {
4749
use datafusion_functions::export_functions;
@@ -70,6 +72,11 @@ pub mod expr_fn {
7072
"Returns the negation of expr (unary minus).",
7173
arg1
7274
));
75+
export_functions!((
76+
bin,
77+
"Returns the string representation of the long value represented in binary.",
78+
arg1
79+
));
7380
}
7481

7582
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -86,5 +93,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
8693
csc(),
8794
sec(),
8895
negative(),
96+
bin(),
8997
]
9098
}

datafusion/sqllogictest/test_files/spark/math/bin.slt

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,62 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
# This file was originally created by a porting script from:
19-
# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function
20-
# This file is part of the implementation of the datafusion-spark function library.
21-
# For more information, please see:
22-
# https://github.com/apache/datafusion/issues/15914
23-
24-
## Original Query: SELECT bin(-13);
25-
## PySpark 3.5.5 Result: {'bin(-13)': '1111111111111111111111111111111111111111111111111111111111110011', 'typeof(bin(-13))': 'string', 'typeof(-13)': 'int'}
26-
#query
27-
#SELECT bin(-13::int);
28-
29-
## Original Query: SELECT bin(13);
30-
## PySpark 3.5.5 Result: {'bin(13)': '1101', 'typeof(bin(13))': 'string', 'typeof(13)': 'int'}
31-
#query
32-
#SELECT bin(13::int);
33-
34-
## Original Query: SELECT bin(13.3);
35-
## PySpark 3.5.5 Result: {'bin(13.3)': '1101', 'typeof(bin(13.3))': 'string', 'typeof(13.3)': 'decimal(3,1)'}
36-
#query
37-
#SELECT bin(13.3::decimal(3,1));
18+
query T
19+
SELECT bin(arrow_cast(NULL, 'Int8'));
20+
----
21+
NULL
22+
23+
query T
24+
SELECT bin(arrow_cast(0, 'Int8'));
25+
----
26+
0
27+
28+
query T
29+
SELECT bin(arrow_cast(13, 'Int8'));
30+
----
31+
1101
32+
33+
query T
34+
SELECT bin(arrow_cast(13.36, 'Float16'));
35+
----
36+
1101
37+
38+
query T
39+
SELECT bin(13.3::decimal(3,1));
40+
----
41+
1101
42+
43+
query T
44+
SELECT bin(arrow_cast(-13, 'Int8'));
45+
----
46+
1111111111111111111111111111111111111111111111111111111111110011
47+
48+
query T
49+
SELECT bin(arrow_cast(256, 'Int16'));
50+
----
51+
100000000
52+
53+
query T
54+
SELECT bin(arrow_cast(-32768, 'Int16'));
55+
----
56+
1111111111111111111111111111111111111111111111111000000000000000
57+
58+
query T
59+
SELECT bin(arrow_cast(-2147483648, 'Int32'));
60+
----
61+
1111111111111111111111111111111110000000000000000000000000000000
62+
63+
query T
64+
SELECT bin(arrow_cast(1073741824, 'Int32'));
65+
----
66+
1000000000000000000000000000000
67+
68+
query T
69+
SELECT bin(arrow_cast(-9223372036854775808, 'Int64'));
70+
----
71+
1000000000000000000000000000000000000000000000000000000000000000
72+
73+
query T
74+
SELECT bin(arrow_cast(9223372036854775807, 'Int64'));
75+
----
76+
111111111111111111111111111111111111111111111111111111111111111

0 commit comments

Comments
 (0)