Skip to content

Commit 7e04974

Browse files
kazantsev-maksimKazantsev Maksim
andauthored
feat: Implement Spark function space (#19610)
## Which issue does this PR close? - N/A ## Rationale for this change Add new function: https://spark.apache.org/docs/latest/api/sql/index.html#space ## What changes are included in this PR? - Implementation - Unit Tests - SLT tests ## Are these changes tested? Yes, tests added as part of this PR. ## Are there any user-facing changes? No, these are new function. --------- Co-authored-by: Kazantsev Maksim <mn.kazantsev@gmail.com>
1 parent 955fd41 commit 7e04974

5 files changed

Lines changed: 354 additions & 0 deletions

File tree

datafusion/spark/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,7 @@ criterion = { workspace = true }
6161
[[bench]]
6262
harness = false
6363
name = "char"
64+
65+
[[bench]]
66+
harness = false
67+
name = "space"

datafusion/spark/benches/space.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
extern crate criterion;
19+
20+
use arrow::array::PrimitiveArray;
21+
use arrow::datatypes::{DataType, Field, Int32Type};
22+
use criterion::{Criterion, criterion_group, criterion_main};
23+
use datafusion_common::config::ConfigOptions;
24+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
25+
use datafusion_spark::function::string::space;
26+
use rand::prelude::StdRng;
27+
use rand::{Rng, SeedableRng};
28+
use std::hint::black_box;
29+
use std::sync::Arc;
30+
31+
fn criterion_benchmark(c: &mut Criterion) {
32+
let space_func = space();
33+
let size = 1024;
34+
let input: PrimitiveArray<Int32Type> = {
35+
let null_density = 0.2;
36+
let mut rng = StdRng::seed_from_u64(42);
37+
(0..size)
38+
.map(|_| {
39+
if rng.random::<f32>() < null_density {
40+
None
41+
} else {
42+
Some(rng.random_range::<i32, _>(1i32..10))
43+
}
44+
})
45+
.collect()
46+
};
47+
let input = Arc::new(input);
48+
let args = vec![ColumnarValue::Array(input)];
49+
let arg_fields = args
50+
.iter()
51+
.enumerate()
52+
.map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into())
53+
.collect::<Vec<_>>();
54+
let config_options = Arc::new(ConfigOptions::default());
55+
c.bench_function("space", |b| {
56+
b.iter(|| {
57+
black_box(
58+
space_func
59+
.invoke_with_args(ScalarFunctionArgs {
60+
args: args.clone(),
61+
arg_fields: arg_fields.clone(),
62+
number_rows: size,
63+
return_field: Arc::new(Field::new("f", DataType::Utf8, true)),
64+
config_options: Arc::clone(&config_options),
65+
})
66+
.unwrap(),
67+
)
68+
})
69+
});
70+
}
71+
72+
criterion_group!(benches, criterion_benchmark);
73+
criterion_main!(benches);

datafusion/spark/src/function/string/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod ilike;
2424
pub mod length;
2525
pub mod like;
2626
pub mod luhn_check;
27+
pub mod space;
2728

2829
use datafusion_expr::ScalarUDF;
2930
use datafusion_functions::make_udf_function;
@@ -38,6 +39,7 @@ make_udf_function!(elt::SparkElt, elt);
3839
make_udf_function!(like::SparkLike, like);
3940
make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check);
4041
make_udf_function!(format_string::FormatStringFunc, format_string);
42+
make_udf_function!(space::SparkSpace, space);
4143

4244
pub mod expr_fn {
4345
use datafusion_functions::export_functions;
@@ -87,6 +89,7 @@ pub mod expr_fn {
8789
"Returns a formatted string from printf-style format strings.",
8890
strfmt args
8991
));
92+
export_functions!((space, "Returns a string consisting of n spaces.", arg1));
9093
}
9194

9295
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -100,5 +103,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
100103
like(),
101104
luhn_check(),
102105
format_string(),
106+
space(),
103107
]
104108
}
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
20+
as_dictionary_array,
21+
};
22+
use arrow::datatypes::{DataType, Int32Type};
23+
use datafusion_common::cast::as_int32_array;
24+
use datafusion_common::{Result, ScalarValue, exec_err};
25+
use datafusion_expr::{
26+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27+
};
28+
use std::any::Any;
29+
use std::sync::Arc;
30+
31+
/// Spark-compatible `space` expression
32+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#space>
33+
#[derive(Debug, PartialEq, Eq, Hash)]
34+
pub struct SparkSpace {
35+
signature: Signature,
36+
}
37+
38+
impl Default for SparkSpace {
39+
fn default() -> Self {
40+
Self::new()
41+
}
42+
}
43+
44+
impl SparkSpace {
45+
pub fn new() -> Self {
46+
Self {
47+
signature: Signature::uniform(
48+
1,
49+
vec![
50+
DataType::Int32,
51+
DataType::Dictionary(
52+
Box::new(DataType::Int32),
53+
Box::new(DataType::Int32),
54+
),
55+
],
56+
Volatility::Immutable,
57+
),
58+
}
59+
}
60+
}
61+
62+
impl ScalarUDFImpl for SparkSpace {
63+
fn as_any(&self) -> &dyn Any {
64+
self
65+
}
66+
67+
fn name(&self) -> &str {
68+
"space"
69+
}
70+
71+
fn signature(&self) -> &Signature {
72+
&self.signature
73+
}
74+
75+
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
76+
let return_type = match &args[0] {
77+
DataType::Dictionary(key_type, _) => {
78+
DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
79+
}
80+
_ => DataType::Utf8,
81+
};
82+
Ok(return_type)
83+
}
84+
85+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
86+
spark_space(&args.args)
87+
}
88+
}
89+
90+
pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
91+
if args.len() != 1 {
92+
return exec_err!("space function takes exactly one argument");
93+
}
94+
match &args[0] {
95+
ColumnarValue::Array(array) => {
96+
let result = spark_space_array(array)?;
97+
Ok(ColumnarValue::Array(result))
98+
}
99+
ColumnarValue::Scalar(scalar) => {
100+
let result = spark_space_scalar(scalar)?;
101+
Ok(ColumnarValue::Scalar(result))
102+
}
103+
}
104+
}
105+
106+
fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
107+
match array.data_type() {
108+
DataType::Int32 => {
109+
let array = as_int32_array(array)?;
110+
Ok(Arc::new(spark_space_array_inner(array)))
111+
}
112+
DataType::Dictionary(_, _) => {
113+
let dict = as_dictionary_array::<Int32Type>(array);
114+
let values = spark_space_array(dict.values())?;
115+
let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
116+
Ok(Arc::new(result))
117+
}
118+
other => {
119+
exec_err!("Unsupported data type {other:?} for function `space`")
120+
}
121+
}
122+
}
123+
124+
fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
125+
match scalar {
126+
ScalarValue::Int32(value) => {
127+
let result = value.map(|v| {
128+
if v <= 0 {
129+
String::new()
130+
} else {
131+
" ".repeat(v as usize)
132+
}
133+
});
134+
Ok(ScalarValue::Utf8(result))
135+
}
136+
other => {
137+
exec_err!("Unsupported data type {other:?} for function `space`")
138+
}
139+
}
140+
}
141+
142+
fn spark_space_array_inner(array: &Int32Array) -> StringArray {
143+
let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
144+
let mut space_buf = String::new();
145+
for value in array.iter() {
146+
match value {
147+
None => builder.append_null(),
148+
Some(l) if l > 0 => {
149+
let l = l as usize;
150+
if space_buf.len() < l {
151+
space_buf = " ".repeat(l);
152+
}
153+
builder.append_value(&space_buf[..l]);
154+
}
155+
Some(_) => builder.append_value(""),
156+
}
157+
}
158+
builder.finish()
159+
}
160+
161+
#[cfg(test)]
162+
mod tests {
163+
use crate::function::string::space::spark_space;
164+
use arrow::array::{Array, Int32Array, Int32DictionaryArray};
165+
use arrow::datatypes::Int32Type;
166+
use datafusion_common::cast::{as_dictionary_array, as_string_array};
167+
use datafusion_common::{Result, ScalarValue};
168+
use datafusion_expr::ColumnarValue;
169+
use std::sync::Arc;
170+
171+
#[test]
172+
fn test_spark_space_int32_array() -> Result<()> {
173+
let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
174+
Some(1),
175+
Some(-3),
176+
Some(0),
177+
Some(5),
178+
None,
179+
])));
180+
let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
181+
unreachable!()
182+
};
183+
let result = as_string_array(&result)?;
184+
185+
assert_eq!(result.value(0), " ");
186+
assert_eq!(result.value(1), "");
187+
assert_eq!(result.value(2), "");
188+
assert_eq!(result.value(3), " ");
189+
assert!(result.is_null(4));
190+
Ok(())
191+
}
192+
193+
#[test]
194+
fn test_spark_space_dictionary() -> Result<()> {
195+
let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
196+
Int32Array::from(vec![0, 1, 2, 3, 4]),
197+
Arc::new(Int32Array::from(vec![
198+
Some(1),
199+
Some(-3),
200+
Some(0),
201+
Some(5),
202+
None,
203+
])),
204+
)));
205+
let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
206+
unreachable!()
207+
};
208+
let result =
209+
as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
210+
assert_eq!(result.value(0), " ");
211+
assert_eq!(result.value(1), "");
212+
assert_eq!(result.value(2), "");
213+
assert_eq!(result.value(3), " ");
214+
assert!(result.is_null(4));
215+
Ok(())
216+
}
217+
218+
#[test]
219+
fn test_spark_space_scalar() -> Result<()> {
220+
let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
221+
let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
222+
unreachable!()
223+
};
224+
match result {
225+
ScalarValue::Utf8(Some(result)) => {
226+
assert_eq!(result, "");
227+
}
228+
_ => unreachable!(),
229+
}
230+
Ok(())
231+
}
232+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
query T
19+
SELECT concat(space(1::INT), 'Spark');
20+
----
21+
Spark
22+
23+
query T
24+
SELECT concat(space(5::INT), 'Spark');
25+
----
26+
Spark
27+
28+
query T
29+
SELECT space(0::INT);
30+
----
31+
(empty)
32+
33+
query T
34+
SELECT space(-1::INT);
35+
----
36+
(empty)
37+
38+
query T
39+
SELECT space(NULL);
40+
----
41+
NULL

0 commit comments

Comments
 (0)