Skip to content

Commit 57a7862

Browse files
damahuaclaude
authored andcommitted
fix: gc StringView/BinaryView arrays before spilling to prevent write amplification
After operations like `take` or `interleave`, view-type arrays retain shared references to large original data buffers. When these batches are written to spill files individually, the IPC writer duplicates all referenced buffers for every batch — measured at 8.5× write amplification with 10 chunks of 1000 rows from a 10,000-row StringView dataset. Changes: - Add `gc_view_arrays()` utility in spill/mod.rs that compacts both StringViewArray and BinaryViewArray in a RecordBatch - Apply gc to hash aggregation spill path (IncrementalSortIterator output chunks share parent batch buffers via take_record_batch) - Apply gc to sort-merge join bitwise_stream spill path (inner_key_buffer contains sliced batches sharing original batch buffers) - Extend sort operator's organize_stringview_arrays to also handle BinaryViewArray (was previously StringView-only) The sort operator already had StringView gc (organize_stringview_arrays), but the hash aggregation and sort-merge join spill paths were missing it. This is the same class of bug fixed by PR #19444 for sort spilling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2818abb commit 57a7862

4 files changed

Lines changed: 207 additions & 4 deletions

File tree

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1163,11 +1163,20 @@ impl GroupedHashAggregateStream {
11631163
self.spill_state.spill_expr.clone(),
11641164
self.batch_size,
11651165
);
1166+
// After `take_record_batch` inside IncrementalSortIterator, each output
1167+
// batch shares the same StringView/BinaryView data buffers as the
1168+
// original emitted batch. Without gc(), the IPC writer duplicates all
1169+
// referenced buffers for every chunk — potentially causing N× write
1170+
// amplification where N is the number of chunks.
1171+
let gc_iter =
1172+
sorted_iter.map(|r| r.map(|batch| crate::spill::gc_view_arrays(&batch)));
1173+
// gc_view_arrays returns Result, so flatten the nested Result
1174+
let gc_iter = gc_iter.map(|r| r.and_then(|inner| inner));
11661175
let spillfile = self
11671176
.spill_state
11681177
.spill_manager
11691178
.spill_record_batch_iter_and_return_max_batch_memory(
1170-
sorted_iter,
1179+
gc_iter,
11711180
"HashAggSpill",
11721181
)?;
11731182

datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,10 +447,19 @@ impl BitwiseSortMergeJoinStream {
447447

448448
/// Spill the in-memory inner key buffer to disk and clear it.
449449
fn spill_inner_key_buffer(&mut self) -> Result<()> {
450+
// The inner_key_buffer contains sliced batches that may share
451+
// StringView/BinaryView data buffers with the original unsliced
452+
// batches. GC each batch so the IPC writer only writes the
453+
// buffers actually referenced by each batch's live views.
454+
let gc_batches: Vec<_> = self
455+
.inner_key_buffer
456+
.iter()
457+
.map(|b| crate::spill::gc_view_arrays(b))
458+
.collect::<Result<_>>()?;
450459
let spill_file = self
451460
.spill_manager
452461
.spill_record_batch_and_finish(
453-
&self.inner_key_buffer,
462+
&gc_batches,
454463
"semi_anti_smj_inner_key_spill",
455464
)?
456465
.expect("inner_key_buffer is non-empty when spilling");

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::{
5353
Statistics,
5454
};
5555

56-
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
56+
use arrow::array::{Array, BinaryViewArray, RecordBatch, RecordBatchOptions, StringViewArray};
5757
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
5858
use arrow::datatypes::SchemaRef;
5959
use datafusion_common::config::SpillCompression;
@@ -509,6 +509,12 @@ impl ExternalSorter {
509509
let new_array = string_view_array.gc();
510510
new_columns.push(Arc::new(new_array));
511511
arr_mutated = true;
512+
} else if let Some(binary_view_array) =
513+
array.as_any().downcast_ref::<BinaryViewArray>()
514+
{
515+
let new_array = binary_view_array.gc();
516+
new_columns.push(Arc::new(new_array));
517+
arr_mutated = true;
512518
} else {
513519
new_columns.push(Arc::clone(array));
514520
}

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use std::pin::Pin;
3434
use std::sync::Arc;
3535
use std::task::{Context, Poll};
3636

37-
use arrow::array::{BufferSpec, layout};
37+
use arrow::array::{Array, BinaryViewArray, BufferSpec, StringViewArray, layout};
3838
use arrow::datatypes::{Schema, SchemaRef};
3939
use arrow::ipc::{
4040
MetadataVersion,
@@ -51,6 +51,42 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
5151
use futures::{FutureExt as _, Stream};
5252
use log::debug;
5353

54+
/// Compact view-type arrays (`StringViewArray` and `BinaryViewArray`) in a
55+
/// [`RecordBatch`] so that each batch owns only the data buffers it actually
56+
/// references.
57+
///
58+
/// After operations like `take` or `interleave`, view-type arrays may retain
59+
/// shared references to large original data buffers. When such batches are
60+
/// written to spill files individually, the IPC writer must include **all**
61+
/// referenced buffers for every batch, causing massive write amplification.
62+
/// Calling `gc()` on each view array rebuilds its buffer list to contain only
63+
/// the bytes that are live in the current array, eliminating the duplication.
64+
///
65+
/// Returns the batch unchanged (no allocation) when it contains no view-type
66+
/// columns.
67+
pub fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
68+
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
69+
let mut mutated = false;
70+
71+
for array in batch.columns() {
72+
if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
73+
new_columns.push(Arc::new(sv.gc()));
74+
mutated = true;
75+
} else if let Some(bv) = array.as_any().downcast_ref::<BinaryViewArray>() {
76+
new_columns.push(Arc::new(bv.gc()));
77+
mutated = true;
78+
} else {
79+
new_columns.push(Arc::clone(array));
80+
}
81+
}
82+
83+
if mutated {
84+
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
85+
} else {
86+
Ok(batch.clone())
87+
}
88+
}
89+
5490
/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
5591
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
5692
///
@@ -786,6 +822,149 @@ mod tests {
786822
})
787823
}
788824

825+
#[test]
826+
fn test_gc_view_arrays_reduces_spill_size() -> Result<()> {
827+
use arrow::array::{StringViewArray, BinaryViewArray};
828+
829+
// Create a large StringViewArray with non-inline strings (>12 bytes)
830+
let long_strings: Vec<String> = (0..1000)
831+
.map(|i| format!("this_is_a_long_string_value_{:06}", i))
832+
.collect();
833+
let sv_array = StringViewArray::from(
834+
long_strings.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
835+
);
836+
837+
// Create a large BinaryViewArray with non-inline values
838+
let long_binaries: Vec<Vec<u8>> = (0..1000)
839+
.map(|i| format!("binary_payload_data_value_{:06}", i).into_bytes())
840+
.collect();
841+
let bv_array = BinaryViewArray::from(
842+
long_binaries
843+
.iter()
844+
.map(|b| b.as_slice())
845+
.collect::<Vec<_>>(),
846+
);
847+
848+
let schema = Arc::new(Schema::new(vec![
849+
Field::new("sv", DataType::Utf8View, false),
850+
Field::new("bv", DataType::BinaryView, false),
851+
]));
852+
853+
let full_batch = RecordBatch::try_new(
854+
Arc::clone(&schema),
855+
vec![
856+
Arc::new(sv_array) as ArrayRef,
857+
Arc::new(bv_array) as ArrayRef,
858+
],
859+
)?;
860+
861+
// Simulate what take_record_batch does: take a subset of rows.
862+
// The resulting batch retains all original data buffers.
863+
let indices = arrow::array::UInt32Array::from(vec![0, 1, 2, 3, 4]);
864+
let taken_batch =
865+
arrow::compute::take_record_batch(&full_batch, &indices)?;
866+
867+
// The taken batch (5 rows) should reference the full 1000-row buffers
868+
let taken_size = get_record_batch_memory_size(&taken_batch);
869+
870+
// After gc, it should only contain the 5 rows' worth of data
871+
let gc_batch = gc_view_arrays(&taken_batch)?;
872+
let gc_size = get_record_batch_memory_size(&gc_batch);
873+
874+
// The gc'd batch should be significantly smaller
875+
// (5/1000 = 0.5% of original data, but with overhead maybe ~5%)
876+
assert!(
877+
gc_size < taken_size / 2,
878+
"gc_view_arrays should reduce memory: gc_size={gc_size}, taken_size={taken_size}"
879+
);
880+
881+
// Verify data integrity
882+
assert_eq!(gc_batch.num_rows(), 5);
883+
assert_eq!(gc_batch.num_columns(), 2);
884+
885+
Ok(())
886+
}
887+
888+
/// Demonstrates the write amplification that gc_view_arrays prevents.
889+
/// When IncrementalSortIterator produces chunks via take(), each chunk
890+
/// retains all original StringView data buffers. Without gc(), spilling
891+
/// N chunks writes N copies of the shared buffers.
892+
#[test]
893+
fn test_gc_view_arrays_write_amplification() -> Result<()> {
894+
use arrow::array::StringViewArray;
895+
896+
// 10,000 rows of long strings (~50 bytes each = ~500 KB of string data)
897+
let strings: Vec<String> = (0..10_000)
898+
.map(|i| format!("long_string_for_gc_amplification_test_{:08}", i))
899+
.collect();
900+
let sv = StringViewArray::from(
901+
strings.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
902+
);
903+
let schema = Arc::new(Schema::new(vec![
904+
Field::new("sv", DataType::Utf8View, false),
905+
]));
906+
let full_batch = RecordBatch::try_new(
907+
Arc::clone(&schema),
908+
vec![Arc::new(sv) as ArrayRef],
909+
)?;
910+
911+
// Simulate IncrementalSortIterator: split into 10 chunks of 1000 rows
912+
let chunk_size = 1000u32;
913+
let n_chunks = 10;
914+
let mut total_without_gc = 0usize;
915+
let mut total_with_gc = 0usize;
916+
917+
for i in 0..n_chunks {
918+
let start = i * chunk_size;
919+
let indices = arrow::array::UInt32Array::from(
920+
(start..start + chunk_size).collect::<Vec<_>>(),
921+
);
922+
let chunk = arrow::compute::take_record_batch(&full_batch, &indices)?;
923+
total_without_gc += get_record_batch_memory_size(&chunk);
924+
925+
let gc_chunk = gc_view_arrays(&chunk)?;
926+
total_with_gc += get_record_batch_memory_size(&gc_chunk);
927+
}
928+
929+
// Without gc: each chunk carries all 10,000 rows' buffers → ~10x amplification
930+
// With gc: each chunk carries only its 1,000 rows' buffers → ~1x
931+
let amplification = total_without_gc as f64 / total_with_gc as f64;
932+
eprintln!(
933+
"Write amplification: {amplification:.1}x \
934+
(without_gc={total_without_gc} bytes, with_gc={total_with_gc} bytes)"
935+
);
936+
937+
// The amplification should be close to n_chunks (10x)
938+
assert!(
939+
amplification > 3.0,
940+
"Expected significant write amplification, got {amplification:.1}x"
941+
);
942+
// With gc, total should be close to the full batch size (not N× it)
943+
let full_size = get_record_batch_memory_size(&full_batch);
944+
assert!(
945+
total_with_gc < full_size * 2,
946+
"With gc, total ({total_with_gc}) should be close to full batch ({full_size})"
947+
);
948+
949+
Ok(())
950+
}
951+
952+
#[test]
953+
fn test_gc_view_arrays_noop_for_non_view_types() -> Result<()> {
954+
let batch = build_table_i32(
955+
("a", &vec![1, 2, 3]),
956+
("b", &vec![4, 5, 6]),
957+
("c", &vec![7, 8, 9]),
958+
);
959+
let gc_batch = gc_view_arrays(&batch)?;
960+
assert_eq!(gc_batch.num_rows(), batch.num_rows());
961+
assert_eq!(
962+
get_record_batch_memory_size(&gc_batch),
963+
get_record_batch_memory_size(&batch)
964+
);
965+
Ok(())
966+
}
967+
789968
#[test]
790969
fn test_alignment_for_schema() -> Result<()> {
791970
let schema = Schema::new(vec![Field::new("strings", DataType::Utf8View, false)]);

0 commit comments

Comments
 (0)