Skip to content

Commit 0733758

Browse files
committed
test: add test for string view array in repartition
1 parent f2e7076 commit 0733758

File tree

1 file changed

+55
-4
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+55
-4
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
check_if_same_properties,
4545
};
4646

47-
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
47+
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, StringViewArray};
4848
use arrow::compute::take_arrays;
4949
use arrow::datatypes::{SchemaRef, UInt32Type};
5050
use datafusion_common::config::ConfigOptions;
@@ -614,11 +614,10 @@ impl BatchPartitioner {
614614
let new_columns = columns
615615
.into_iter()
616616
.map(|col| {
617-
use arrow::array::{Array, StringViewArray};
618617
if let Some(sv) =
619618
col.as_any().downcast_ref::<StringViewArray>()
620619
{
621-
Arc::new(sv.gc()) as Arc<dyn Array>
620+
Arc::new(sv.gc())
622621
} else {
623622
col
624623
}
@@ -1784,7 +1783,7 @@ mod tests {
17841783
{collect, expressions::col},
17851784
};
17861785

1787-
use arrow::array::{ArrayRef, StringArray, UInt32Array};
1786+
use arrow::array::{ArrayRef, StringArray, StringViewArray, UInt32Array};
17881787
use arrow::datatypes::{DataType, Field, Schema};
17891788
use datafusion_common::cast::as_string_array;
17901789
use datafusion_common::exec_err;
@@ -2558,6 +2557,33 @@ mod tests {
25582557
.collect()
25592558
}
25602559

2560+
fn test_schema_string_view() -> Arc<Schema> {
2561+
Arc::new(Schema::new(vec![
2562+
Field::new("key", DataType::UInt32, false),
2563+
Field::new("val", DataType::Utf8View, false),
2564+
]))
2565+
}
2566+
2567+
/// Create a batch with StringViewArray data for compaction tests.
2568+
/// Strings are >12 bytes to force out-of-line storage in the buffer
2569+
/// (strings <=12 bytes are inlined in the view and don't reference the buffer).
2570+
fn create_string_view_batch(num_rows: usize, num_partitions: usize) -> RecordBatch {
2571+
let schema = test_schema_string_view();
2572+
let keys: Vec<u32> = (0..num_rows).map(|i| (i % num_partitions) as u32).collect();
2573+
let vals: Vec<String> = (0..num_rows)
2574+
.map(|i| format!("string_value_{i:0>20}"))
2575+
.collect();
2576+
let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect();
2577+
RecordBatch::try_new(
2578+
schema,
2579+
vec![
2580+
Arc::new(UInt32Array::from(keys)) as ArrayRef,
2581+
Arc::new(StringViewArray::from(val_refs)) as ArrayRef,
2582+
],
2583+
)
2584+
.unwrap()
2585+
}
2586+
25612587
#[tokio::test]
25622588
async fn test_repartition_ordering_with_spilling() -> Result<()> {
25632589
// Test that repartition preserves ordering when spilling occurs
@@ -2631,6 +2657,31 @@ mod tests {
26312657

26322658
Ok(())
26332659
}
2660+
2661+
#[tokio::test]
2662+
async fn hash_repartition_string_view_compaction() -> Result<()> {
2663+
let schema = test_schema_string_view();
2664+
let num_partitions = 8;
2665+
let batch = create_string_view_batch(800, num_partitions);
2666+
let partitions = vec![vec![batch]];
2667+
2668+
let output_partitions = repartition(
2669+
&schema,
2670+
partitions,
2671+
Partitioning::Hash(vec![col("key", &schema)?], num_partitions),
2672+
)
2673+
.await?;
2674+
2675+
assert_eq!(num_partitions, output_partitions.len());
2676+
2677+
let total_rows: usize = output_partitions
2678+
.iter()
2679+
.map(|x| x.iter().map(|x| x.num_rows()).sum::<usize>())
2680+
.sum();
2681+
assert_eq!(total_rows, 800);
2682+
2683+
Ok(())
2684+
}
26342685
}
26352686

26362687
#[cfg(test)]

0 commit comments

Comments
 (0)