Skip to content

Commit 06f27e9

Browse files
shifluxxcalamb
andauthored
fix : correct nullability propagation for spark.bitwise_not (#19224)
## Which issue does this PR close? - Closes #19150. ## Rationale for this change The Spark `bitwise_not` UDF always appeared as **nullable** in logical plans, even when its input column was **non-nullable**. This happened because the UDF implemented only `return_type()`, which returns a `DataType` but **does not propagate nullability**. DataFusion requires UDFs to implement `return_field_from_args()` when nullability depends on input fields. As a result: - `bitwise_not(non_nullable_col)` incorrectly produced a **nullable** output. - Downstream query planning and schema inference became inconsistent. - This differed from both **Spark semantics** and **Arrow kernel behavior**, where nullability is preserved. This PR corrects the nullability inference. ## What changes are included in this PR? - Implemented `return_field_from_args()` for the Spark `bitwise_not` UDF. - Output type = input type - Output nullability = input nullability - Updated `return_type()` to return an error, per DataFusion API guidelines when overriding nullability. - Added unit tests verifying: - Non-nullable input → non-nullable output - Nullable input → nullable output - Behavior across multiple integer types (`Int32`, `Int64`) - Code comments and minor cleanup. ## Are these changes tested? Yes. This PR includes new unit tests that validate: - correct nullability propagation - correct output types - consistent behavior across supported integer types ## Are there any user-facing changes? Yes, but they are **behavior-correcting**, not breaking: - The `spark.bitwise_not` UDF now correctly reports nullability in schemas and logical plans. - No API changes. - No behavioral change for actual runtime values — Arrow kernels already preserved null bitmaps; only planner metadata was incorrect. This is not considered a breaking change. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 10c933c commit 06f27e9

1 file changed

Lines changed: 124 additions & 6 deletions

File tree

datafusion/spark/src/function/bitwise/bitwise_not.rs

Lines changed: 124 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::array::*;
1819
use arrow::compute::kernels::bitwise;
19-
use arrow::datatypes::{Int16Type, Int32Type, Int64Type, Int8Type};
20-
use arrow::{array::*, datatypes::DataType};
21-
use datafusion_common::{plan_err, Result};
20+
use arrow::datatypes::{
21+
DataType, Field, FieldRef, Int16Type, Int32Type, Int64Type, Int8Type,
22+
};
23+
use datafusion_common::{internal_err, plan_err, Result};
2224
use datafusion_expr::{ColumnarValue, TypeSignature, Volatility};
23-
use datafusion_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature};
25+
use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature};
2426
use datafusion_functions::utils::make_scalar_function;
2527
use std::{any::Any, sync::Arc};
2628

@@ -64,8 +66,32 @@ impl ScalarUDFImpl for SparkBitwiseNot {
6466
&self.signature
6567
}
6668

67-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
68-
Ok(arg_types[0].clone())
69+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
70+
internal_err!(
71+
"SparkBitwiseNot: return_type() is not used; return_field_from_args() is implemented"
72+
)
73+
}
74+
75+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
76+
if args.arg_fields.len() != 1 {
77+
return plan_err!("bitwise_not expects exactly 1 argument");
78+
}
79+
80+
let input_field = &args.arg_fields[0];
81+
82+
let out_dt = input_field.data_type().clone();
83+
let mut out_nullable = input_field.is_nullable();
84+
85+
let scalar_null_present = args
86+
.scalar_arguments
87+
.iter()
88+
.any(|opt_s| opt_s.is_some_and(|sv| sv.is_null()));
89+
90+
if scalar_null_present {
91+
out_nullable = true;
92+
}
93+
94+
Ok(Arc::new(Field::new(self.name(), out_dt, out_nullable)))
6995
}
7096

7197
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
@@ -107,3 +133,95 @@ pub fn spark_bitwise_not(args: &[ArrayRef]) -> Result<ArrayRef> {
107133
}
108134
}
109135
}
136+
137+
#[cfg(test)]
138+
mod tests {
139+
use super::*;
140+
use arrow::datatypes::{DataType, Field};
141+
use std::sync::Arc;
142+
143+
use datafusion_expr::ReturnFieldArgs;
144+
145+
#[test]
146+
fn test_bitwise_not_nullability() {
147+
let bitwise_not = SparkBitwiseNot::new();
148+
149+
// --- non-nullable Int32 input ---
150+
let non_nullable_i32 = Arc::new(Field::new("c", DataType::Int32, false));
151+
let out_non_null = bitwise_not
152+
.return_field_from_args(ReturnFieldArgs {
153+
arg_fields: &[Arc::clone(&non_nullable_i32)],
154+
// single-argument function -> one scalar_argument slot (None)
155+
scalar_arguments: &[None],
156+
})
157+
.unwrap();
158+
159+
// result should be non-nullable and the same DataType as input
160+
assert!(!out_non_null.is_nullable());
161+
assert_eq!(out_non_null.data_type(), &DataType::Int32);
162+
163+
// --- nullable Int32 input ---
164+
let nullable_i32 = Arc::new(Field::new("c", DataType::Int32, true));
165+
let out_nullable = bitwise_not
166+
.return_field_from_args(ReturnFieldArgs {
167+
arg_fields: &[Arc::clone(&nullable_i32)],
168+
scalar_arguments: &[None],
169+
})
170+
.unwrap();
171+
172+
// result should be nullable and the same DataType as input
173+
assert!(out_nullable.is_nullable());
174+
assert_eq!(out_nullable.data_type(), &DataType::Int32);
175+
176+
// --- also test another integer type (Int64) for completeness ---
177+
let non_nullable_i64 = Arc::new(Field::new("c", DataType::Int64, false));
178+
let out_i64 = bitwise_not
179+
.return_field_from_args(ReturnFieldArgs {
180+
arg_fields: &[Arc::clone(&non_nullable_i64)],
181+
scalar_arguments: &[None],
182+
})
183+
.unwrap();
184+
185+
assert!(!out_i64.is_nullable());
186+
assert_eq!(out_i64.data_type(), &DataType::Int64);
187+
188+
let nullable_i64 = Arc::new(Field::new("c", DataType::Int64, true));
189+
let out_i64_null = bitwise_not
190+
.return_field_from_args(ReturnFieldArgs {
191+
arg_fields: &[Arc::clone(&nullable_i64)],
192+
scalar_arguments: &[None],
193+
})
194+
.unwrap();
195+
196+
assert!(out_i64_null.is_nullable());
197+
assert_eq!(out_i64_null.data_type(), &DataType::Int64);
198+
}
199+
200+
#[test]
201+
fn test_bitwise_not_nullability_with_null_scalar() -> Result<()> {
202+
use arrow::datatypes::{DataType, Field};
203+
use datafusion_common::ScalarValue;
204+
use std::sync::Arc;
205+
206+
let func = SparkBitwiseNot::new();
207+
208+
let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Int32, false));
209+
210+
let out = func.return_field_from_args(ReturnFieldArgs {
211+
arg_fields: &[Arc::clone(&non_nullable)],
212+
scalar_arguments: &[None],
213+
})?;
214+
assert!(!out.is_nullable());
215+
assert_eq!(out.data_type(), &DataType::Int32);
216+
217+
let null_scalar = ScalarValue::Int32(None);
218+
let out_with_null_scalar = func.return_field_from_args(ReturnFieldArgs {
219+
arg_fields: &[Arc::clone(&non_nullable)],
220+
scalar_arguments: &[Some(&null_scalar)],
221+
})?;
222+
assert!(out_with_null_scalar.is_nullable());
223+
assert_eq!(out_with_null_scalar.data_type(), &DataType::Int32);
224+
225+
Ok(())
226+
}
227+
}

0 commit comments

Comments
 (0)