Skip to content

Commit 701850d

Browse files
committed
fix: UTF-32 to match Spark output (no BOM) and document target version
Spark 3.5 and 4.1 both emit UTF-32 as UTF-32BE without a BOM. Our previous implementation prepended a 0000FEFF BOM, which didn't match any Spark version. Fix this so encode('A', 'UTF-32') produces 00000041 (4 bytes), matching Spark. Also add a doc comment clarifying: - Target Spark version (3.5 charset behavior, accepts aliases) - UTF-32 semantics (alias for UTF-32BE) - ANSI mode mapping to Spark 3.5 vs 4.0 unmappable-char behavior
1 parent b6dcd45 commit 701850d

2 files changed

Lines changed: 65 additions & 130 deletions

File tree

datafusion/spark/src/function/string/encode.rs

Lines changed: 59 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@ use datafusion_expr::{
3232
/// Encodes a string or binary value into binary using the specified character encoding.
3333
/// Binary input is interpreted as UTF-8 with lossy conversion (invalid bytes become U+FFFD).
3434
///
35+
/// # Target Spark version
36+
/// Emulates Spark 3.5 semantics:
37+
/// - Accepts canonical charset names and common aliases (`UTF8`, `LATIN1`,
38+
/// `ISO88591`, `ASCII`, `UTF-32BE`, etc.).
39+
/// - Unmappable characters (non-ASCII in `US-ASCII`, code points above
40+
/// `U+00FF` in `ISO-8859-1`) are silently replaced with `?`.
41+
/// - `UTF-32` is an alias for `UTF-32BE` (no BOM), matching both Spark 3.5
42+
/// and Spark 4.1.
43+
///
44+
/// # Spark 4.0 differences (not implemented)
45+
/// Spark 4.0 tightened `encode` in two ways, each gated by a `spark.sql.legacy.*`
46+
/// config that can restore the 3.5 behavior:
47+
///
48+
/// - Charset whitelist — rejects aliases with `INVALID_PARAMETER_VALUE.CHARSET`.
49+
/// Controlled by `spark.sql.legacy.javaCharsets`.
50+
/// - Unmappable characters — raises `MALFORMED_CHARACTER_CODING`.
51+
/// Controlled by `spark.sql.legacy.codingErrorAction`.
52+
///
53+
/// TODO: wire both configs so Spark 4.0 behavior can be selected at runtime.
54+
/// See: <https://spark.apache.org/docs/4.0.0/sql-migration-guide.html>
55+
///
3556
/// <https://spark.apache.org/docs/latest/api/sql/index.html#encode>
3657
#[derive(Debug, PartialEq, Eq, Hash)]
3758
pub struct SparkEncode {
@@ -70,46 +91,21 @@ impl SparkEncode {
7091
}
7192

7293
/// Encodes a single string value using the specified charset.
73-
/// In ANSI mode, unmappable characters cause an error.
74-
/// In legacy mode, unmappable characters are replaced with `?`.
75-
fn encode_string(s: &str, charset: &str, enable_ansi_mode: bool) -> Result<Vec<u8>> {
94+
/// Unmappable characters are silently replaced with `?` (Spark 3.5 behavior).
95+
fn encode_string(s: &str, charset: &str) -> Result<Vec<u8>> {
7696
match charset {
7797
"UTF-8" | "UTF8" => Ok(s.as_bytes().to_vec()),
78-
"US-ASCII" | "ASCII" => {
79-
let mut bytes = Vec::with_capacity(s.len());
80-
for c in s.chars() {
81-
if c.is_ascii() {
82-
bytes.push(c as u8);
83-
} else if enable_ansi_mode {
84-
return exec_err!(
85-
"cannot encode character '{}' (U+{:04X}) in US-ASCII",
86-
c,
87-
c as u32
88-
);
89-
} else {
90-
bytes.push(b'?');
91-
}
92-
}
93-
Ok(bytes)
94-
}
95-
"ISO-8859-1" | "ISO88591" | "LATIN1" => {
96-
let mut bytes = Vec::with_capacity(s.len());
97-
for c in s.chars() {
98+
"US-ASCII" | "ASCII" => Ok(s
99+
.chars()
100+
.map(|c| if c.is_ascii() { c as u8 } else { b'?' })
101+
.collect()),
102+
"ISO-8859-1" | "ISO88591" | "LATIN1" => Ok(s
103+
.chars()
104+
.map(|c| {
98105
let cp = c as u32;
99-
if cp <= 255 {
100-
bytes.push(cp as u8);
101-
} else if enable_ansi_mode {
102-
return exec_err!(
103-
"cannot encode character '{}' (U+{:04X}) in ISO-8859-1",
104-
c,
105-
c as u32
106-
);
107-
} else {
108-
bytes.push(b'?');
109-
}
110-
}
111-
Ok(bytes)
112-
}
106+
if cp <= 255 { cp as u8 } else { b'?' }
107+
})
108+
.collect()),
113109
"UTF-16BE" | "UTF16BE" => {
114110
let mut bytes = Vec::new();
115111
for code_unit in s.encode_utf16() {
@@ -132,28 +128,21 @@ fn encode_string(s: &str, charset: &str, enable_ansi_mode: bool) -> Result<Vec<u
132128
}
133129
Ok(bytes)
134130
}
135-
"UTF-32BE" | "UTF32BE" => {
136-
let mut bytes = Vec::new();
131+
// Spark treats UTF-32 as UTF-32BE (no BOM), matching Spark 3.5 and 4.1.
132+
"UTF-32" | "UTF32" | "UTF-32BE" | "UTF32BE" => {
133+
let mut bytes = Vec::with_capacity(s.len() * 4);
137134
for c in s.chars() {
138135
bytes.extend_from_slice(&(c as u32).to_be_bytes());
139136
}
140137
Ok(bytes)
141138
}
142139
"UTF-32LE" | "UTF32LE" => {
143-
let mut bytes = Vec::new();
140+
let mut bytes = Vec::with_capacity(s.len() * 4);
144141
for c in s.chars() {
145142
bytes.extend_from_slice(&(c as u32).to_le_bytes());
146143
}
147144
Ok(bytes)
148145
}
149-
"UTF-32" | "UTF32" => {
150-
// BOM (big-endian marker) followed by UTF-32BE encoded bytes
151-
let mut bytes = vec![0x00, 0x00, 0xFE, 0xFF];
152-
for c in s.chars() {
153-
bytes.extend_from_slice(&(c as u32).to_be_bytes());
154-
}
155-
Ok(bytes)
156-
}
157146
_ => exec_err!(
158147
"Unsupported charset for encode: '{}'. Supported: US-ASCII, ISO-8859-1, UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE",
159148
charset
@@ -165,7 +154,6 @@ fn encode_string(s: &str, charset: &str, enable_ansi_mode: bool) -> Result<Vec<u
165154
fn encode_array<'a, S: StringArrayType<'a>>(
166155
string_array: &S,
167156
charset: &str,
168-
enable_ansi_mode: bool,
169157
) -> Result<ArrayRef> {
170158
let mut builder =
171159
BinaryBuilder::with_capacity(string_array.len(), string_array.len() * 4);
@@ -174,7 +162,7 @@ fn encode_array<'a, S: StringArrayType<'a>>(
174162
builder.append_null();
175163
} else {
176164
let s = string_array.value(i);
177-
let encoded = encode_string(s, charset, enable_ansi_mode)?;
165+
let encoded = encode_string(s, charset)?;
178166
builder.append_value(&encoded);
179167
}
180168
}
@@ -186,7 +174,6 @@ fn encode_array<'a, S: StringArrayType<'a>>(
186174
fn encode_binary_array<'a, B: arrow::array::BinaryArrayType<'a>>(
187175
binary_array: &'a B,
188176
charset: &str,
189-
enable_ansi_mode: bool,
190177
) -> Result<ArrayRef> {
191178
let mut builder =
192179
BinaryBuilder::with_capacity(binary_array.len(), binary_array.len() * 4);
@@ -195,38 +182,22 @@ fn encode_binary_array<'a, B: arrow::array::BinaryArrayType<'a>>(
195182
builder.append_null();
196183
} else {
197184
let s = String::from_utf8_lossy(binary_array.value(i));
198-
let encoded = encode_string(&s, charset, enable_ansi_mode)?;
185+
let encoded = encode_string(&s, charset)?;
199186
builder.append_value(&encoded);
200187
}
201188
}
202189
Ok(Arc::new(builder.finish()))
203190
}
204191

205192
/// Dispatches to the correct typed array encoder based on the DataType.
206-
fn encode_dispatch(
207-
arr: &ArrayRef,
208-
charset: &str,
209-
enable_ansi_mode: bool,
210-
) -> Result<ArrayRef> {
193+
fn encode_dispatch(arr: &ArrayRef, charset: &str) -> Result<ArrayRef> {
211194
match arr.data_type() {
212-
DataType::Utf8 => {
213-
encode_array(&arr.as_string::<i32>(), charset, enable_ansi_mode)
214-
}
215-
DataType::LargeUtf8 => {
216-
encode_array(&arr.as_string::<i64>(), charset, enable_ansi_mode)
217-
}
218-
DataType::Utf8View => {
219-
encode_array(&arr.as_string_view(), charset, enable_ansi_mode)
220-
}
221-
DataType::Binary => {
222-
encode_binary_array(&arr.as_binary::<i32>(), charset, enable_ansi_mode)
223-
}
224-
DataType::LargeBinary => {
225-
encode_binary_array(&arr.as_binary::<i64>(), charset, enable_ansi_mode)
226-
}
227-
DataType::BinaryView => {
228-
encode_binary_array(&arr.as_binary_view(), charset, enable_ansi_mode)
229-
}
195+
DataType::Utf8 => encode_array(&arr.as_string::<i32>(), charset),
196+
DataType::LargeUtf8 => encode_array(&arr.as_string::<i64>(), charset),
197+
DataType::Utf8View => encode_array(&arr.as_string_view(), charset),
198+
DataType::Binary => encode_binary_array(&arr.as_binary::<i32>(), charset),
199+
DataType::LargeBinary => encode_binary_array(&arr.as_binary::<i64>(), charset),
200+
DataType::BinaryView => encode_binary_array(&arr.as_binary_view(), charset),
230201
DataType::Null => {
231202
let mut builder = BinaryBuilder::new();
232203
for _ in 0..arr.len() {
@@ -309,7 +280,6 @@ impl ScalarUDFImpl for SparkEncode {
309280
}
310281

311282
let charset = extract_charset(&args.args[1])?;
312-
let enable_ansi_mode = args.config_options.execution.enable_ansi_mode;
313283

314284
// Determine if the result should be scalar or array
315285
let len = args.args.iter().find_map(|arg| match arg {
@@ -324,7 +294,7 @@ impl ScalarUDFImpl for SparkEncode {
324294
ColumnarValue::Array(array) => Arc::clone(array),
325295
};
326296

327-
let result = encode_dispatch(&string_arr, &charset, enable_ansi_mode)?;
297+
let result = encode_dispatch(&string_arr, &charset)?;
328298

329299
if is_scalar {
330300
ScalarValue::try_from_array(&result, 0).map(ColumnarValue::Scalar)
@@ -341,16 +311,10 @@ mod tests {
341311
use datafusion_common::config::ConfigOptions;
342312

343313
/// Helper to invoke encode as a scalar with two literal string arguments.
344-
fn eval_encode_scalar_with_ansi(
345-
input: ScalarValue,
346-
charset: &str,
347-
enable_ansi_mode: bool,
348-
) -> Result<ColumnarValue> {
314+
fn eval_encode_scalar(input: ScalarValue, charset: &str) -> Result<ColumnarValue> {
349315
let func = SparkEncode::new();
350316
let input_field = Arc::new(Field::new("input", input.data_type(), true));
351317
let charset_field = Arc::new(Field::new("charset", DataType::Utf8, false));
352-
let mut config = ConfigOptions::default();
353-
config.execution.enable_ansi_mode = enable_ansi_mode;
354318
func.invoke_with_args(ScalarFunctionArgs {
355319
args: vec![
356320
ColumnarValue::Scalar(input),
@@ -359,15 +323,10 @@ mod tests {
359323
arg_fields: vec![input_field, charset_field],
360324
number_rows: 1,
361325
return_field: Arc::new(Field::new("encode", DataType::Binary, true)),
362-
config_options: Arc::new(config),
326+
config_options: Arc::new(ConfigOptions::default()),
363327
})
364328
}
365329

366-
/// Helper with ANSI mode disabled (legacy behavior).
367-
fn eval_encode_scalar(input: ScalarValue, charset: &str) -> Result<ColumnarValue> {
368-
eval_encode_scalar_with_ansi(input, charset, false)
369-
}
370-
371330
fn expect_binary_scalar(result: ColumnarValue) -> Vec<u8> {
372331
match result {
373332
ColumnarValue::Scalar(ScalarValue::Binary(Some(bytes))) => bytes,
@@ -396,8 +355,8 @@ mod tests {
396355
}
397356

398357
#[test]
399-
fn test_encode_ascii_unmappable_legacy_mode() {
400-
// Legacy mode: non-ASCII chars replaced with '?'
358+
fn test_encode_ascii_unmappable_replaced() {
359+
// Spark 3.5: non-ASCII chars replaced with '?'
401360
let result = eval_encode_scalar(
402361
ScalarValue::Utf8(Some("\u{00E9}".into())), // é
403362
"US-ASCII",
@@ -407,20 +366,8 @@ mod tests {
407366
}
408367

409368
#[test]
410-
fn test_encode_ascii_unmappable_ansi_mode() {
411-
// ANSI mode: non-ASCII chars cause an error
412-
let result = eval_encode_scalar_with_ansi(
413-
ScalarValue::Utf8(Some("\u{00E9}".into())),
414-
"US-ASCII",
415-
true,
416-
);
417-
assert!(result.is_err());
418-
assert!(result.unwrap_err().to_string().contains("cannot encode"));
419-
}
420-
421-
#[test]
422-
fn test_encode_iso8859_unmappable_legacy_mode() {
423-
// Legacy mode: chars > U+00FF replaced with '?'
369+
fn test_encode_iso8859_unmappable_replaced() {
370+
// Spark 3.5: chars > U+00FF replaced with '?'
424371
let result = eval_encode_scalar(
425372
ScalarValue::Utf8(Some("\u{0100}".into())), // Ā (U+0100)
426373
"ISO-8859-1",
@@ -429,18 +376,6 @@ mod tests {
429376
assert_eq!(expect_binary_scalar(result), vec![b'?']);
430377
}
431378

432-
#[test]
433-
fn test_encode_iso8859_unmappable_ansi_mode() {
434-
// ANSI mode: chars > U+00FF cause an error
435-
let result = eval_encode_scalar_with_ansi(
436-
ScalarValue::Utf8(Some("\u{0100}".into())),
437-
"ISO-8859-1",
438-
true,
439-
);
440-
assert!(result.is_err());
441-
assert!(result.unwrap_err().to_string().contains("cannot encode"));
442-
}
443-
444379
#[test]
445380
fn test_encode_utf8view_column() {
446381
let func = SparkEncode::new();
@@ -584,14 +519,11 @@ mod tests {
584519
}
585520

586521
#[test]
587-
fn test_encode_utf32_with_bom() {
588-
// UTF-32 = BOM (0000FEFF) + UTF-32BE
522+
fn test_encode_utf32_no_bom() {
523+
// Spark's UTF-32 = UTF-32BE, no BOM prefix. 'A' = U+0041 → 00 00 00 41
589524
let result =
590525
eval_encode_scalar(ScalarValue::Utf8(Some("A".into())), "UTF-32").unwrap();
591-
assert_eq!(
592-
expect_binary_scalar(result),
593-
vec![0x00, 0x00, 0xFE, 0xFF, 0x00, 0x00, 0x00, 0x41]
594-
);
526+
assert_eq!(expect_binary_scalar(result), vec![0x00, 0x00, 0x00, 0x41]);
595527
}
596528

597529
#[test]
@@ -611,14 +543,11 @@ mod tests {
611543
}
612544

613545
#[test]
614-
fn test_encode_emoji_utf32_with_bom() {
615-
// UTF-32 = BOM (0000FEFF) + UTF-32BE: 00 01 F6 00
546+
fn test_encode_emoji_utf32_no_bom() {
547+
// Spark's UTF-32 = UTF-32BE, no BOM prefix. U+1F600 (😀) → 00 01 F6 00
616548
let result =
617549
eval_encode_scalar(ScalarValue::Utf8(Some("😀".into())), "UTF-32").unwrap();
618-
assert_eq!(
619-
expect_binary_scalar(result),
620-
vec![0x00, 0x00, 0xFE, 0xFF, 0x00, 0x01, 0xF6, 0x00]
621-
);
550+
assert_eq!(expect_binary_scalar(result), vec![0x00, 0x01, 0xF6, 0x00]);
622551
}
623552

624553
/// Simple hex encoding for test assertions.

datafusion/sqllogictest/test_files/spark/string/encode.slt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ SELECT encode('A'::string, 'utf-32be'::string);
4545
----
4646
00000041
4747

48+
# UTF-32 (Spark 3.5 / 4.1: no BOM, identical to UTF-32BE)
49+
query ?
50+
SELECT encode('A'::string, 'utf-32'::string);
51+
----
52+
00000041
53+
4854
# Case-insensitive charset
4955
query ?
5056
SELECT encode('hello'::string, 'Utf-8'::string);

0 commit comments

Comments
 (0)