Skip to content

Commit 4385fb4

Browse files
committed
test: add test for string view array in repartition
1 parent 471de13 commit 4385fb4

1 file changed

Lines changed: 55 additions & 4 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
check_if_same_properties,
4545
};
4646

47-
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
47+
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, StringViewArray};
4848
use arrow::compute::take_arrays;
4949
use arrow::datatypes::{SchemaRef, UInt32Type};
5050
use datafusion_common::config::ConfigOptions;
@@ -614,11 +614,10 @@ impl BatchPartitioner {
614614
let new_columns = columns
615615
.into_iter()
616616
.map(|col| {
617-
use arrow::array::{Array, StringViewArray};
618617
if let Some(sv) =
619618
col.as_any().downcast_ref::<StringViewArray>()
620619
{
621-
Arc::new(sv.gc()) as Arc<dyn Array>
620+
Arc::new(sv.gc())
622621
} else {
623622
col
624623
}
@@ -1769,7 +1768,7 @@ mod tests {
17691768
{collect, expressions::col},
17701769
};
17711770

1772-
use arrow::array::{ArrayRef, StringArray, UInt32Array};
1771+
use arrow::array::{ArrayRef, StringArray, StringViewArray, UInt32Array};
17731772
use arrow::datatypes::{DataType, Field, Schema};
17741773
use datafusion_common::cast::as_string_array;
17751774
use datafusion_common::exec_err;
@@ -2543,6 +2542,33 @@ mod tests {
25432542
.collect()
25442543
}
25452544

2545+
fn test_schema_string_view() -> Arc<Schema> {
2546+
Arc::new(Schema::new(vec![
2547+
Field::new("key", DataType::UInt32, false),
2548+
Field::new("val", DataType::Utf8View, false),
2549+
]))
2550+
}
2551+
2552+
/// Create a batch with StringViewArray data for compaction tests.
2553+
/// Strings are >12 bytes to force out-of-line storage in the buffer
2554+
/// (strings <=12 bytes are inlined in the view and don't reference the buffer).
2555+
fn create_string_view_batch(num_rows: usize, num_partitions: usize) -> RecordBatch {
2556+
let schema = test_schema_string_view();
2557+
let keys: Vec<u32> = (0..num_rows).map(|i| (i % num_partitions) as u32).collect();
2558+
let vals: Vec<String> = (0..num_rows)
2559+
.map(|i| format!("string_value_{i:0>20}"))
2560+
.collect();
2561+
let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect();
2562+
RecordBatch::try_new(
2563+
schema,
2564+
vec![
2565+
Arc::new(UInt32Array::from(keys)) as ArrayRef,
2566+
Arc::new(StringViewArray::from(val_refs)) as ArrayRef,
2567+
],
2568+
)
2569+
.unwrap()
2570+
}
2571+
25462572
#[tokio::test]
25472573
async fn test_repartition_ordering_with_spilling() -> Result<()> {
25482574
// Test that repartition preserves ordering when spilling occurs
@@ -2616,6 +2642,31 @@ mod tests {
26162642

26172643
Ok(())
26182644
}
2645+
2646+
#[tokio::test]
2647+
async fn hash_repartition_string_view_compaction() -> Result<()> {
2648+
let schema = test_schema_string_view();
2649+
let num_partitions = 8;
2650+
let batch = create_string_view_batch(800, num_partitions);
2651+
let partitions = vec![vec![batch]];
2652+
2653+
let output_partitions = repartition(
2654+
&schema,
2655+
partitions,
2656+
Partitioning::Hash(vec![col("key", &schema)?], num_partitions),
2657+
)
2658+
.await?;
2659+
2660+
assert_eq!(num_partitions, output_partitions.len());
2661+
2662+
let total_rows: usize = output_partitions
2663+
.iter()
2664+
.map(|x| x.iter().map(|x| x.num_rows()).sum::<usize>())
2665+
.sum();
2666+
assert_eq!(total_rows, 800);
2667+
2668+
Ok(())
2669+
}
26192670
}
26202671

26212672
#[cfg(test)]

0 commit comments

Comments
 (0)