Skip to content

Commit 51c47bc

Browse files
EeshanBembiadriangb
authored andcommitted
fix: address PR review comments for StringView/BinaryView GC spill
- Return post-GC sliced size from append_batch so callers use the correct post-GC size for memory accounting (fixes cetra3's CHANGES_REQUESTED: max_record_batch_size was measured pre-GC in sort.rs and spill_manager.rs) - Fix incorrect comment claiming Arrow gc() is a no-op; it always allocates new compact buffers - Add comment in should_gc_view_array explaining why we sum data_buffers directly instead of using get_buffer_memory_size() - Enhance append_batch doc comment with GC rationale per reviewer request - Reduce row counts in heavy GC tests
1 parent d9e8295 commit 51c47bc

4 files changed

Lines changed: 34 additions & 42 deletions

File tree

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,9 @@ impl ExternalSorter {
429429
})?;
430430

431431
for batch in batches_to_spill {
432-
in_progress_file.append_batch(&batch)?;
432+
let gc_sliced_size = in_progress_file.append_batch(&batch)?;
433433

434-
*max_record_batch_size =
435-
(*max_record_batch_size).max(batch.get_sliced_size()?);
434+
*max_record_batch_size = (*max_record_batch_size).max(gc_sliced_size);
436435
}
437436

438437
assert_or_internal_err!(

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use arrow::array::RecordBatch;
2424
use datafusion_common::exec_datafusion_err;
2525
use datafusion_execution::disk_manager::RefCountedTempFile;
2626

27-
use super::{IPCStreamWriter, gc_view_arrays, spill_manager::SpillManager};
27+
use super::{
28+
IPCStreamWriter, gc_view_arrays,
29+
spill_manager::{GetSlicedSize, SpillManager},
30+
};
2831

2932
/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
3033
/// Caller is able to use this struct to incrementally append in-memory batches to
@@ -50,13 +53,18 @@ impl InProgressSpillFile {
5053
}
5154

5255
/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
53-
/// Performs garbage collection on StringView/BinaryView arrays to reduce spill file size.
56+
///
57+
/// Before writing, performs GC on StringView/BinaryView arrays to compact backing
58+
/// buffers. When a view array is sliced, it still references the original full buffers,
59+
/// causing massive spill files without GC (see issue #19414: 820MB → 33MB after GC).
60+
///
61+
/// Returns the post-GC sliced memory size of the batch for memory accounting.
5462
///
5563
/// # Errors
5664
/// - Returns an error if the file is not active (has been finalized)
5765
/// - Returns an error if appending would exceed the disk usage limit configured
5866
/// by `max_temp_directory_size` in `DiskManager`
59-
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
67+
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<usize> {
6068
if self.in_progress_file.is_none() {
6169
return Err(exec_datafusion_err!(
6270
"Append operation failed: No active in-progress file. The file may have already been finalized."
@@ -106,7 +114,7 @@ impl InProgressSpillFile {
106114
unreachable!() // Already checked inside current function
107115
}
108116
}
109-
Ok(())
117+
gc_batch.get_sliced_size()
110118
}
111119

112120
pub fn flush(&mut self) -> Result<()> {

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -440,36 +440,24 @@ pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
440440
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
441441
}
442442

443-
/// Determines whether a view array should be garbage collected.
443+
/// Determines whether a view array should be garbage collected before spilling.
444444
///
445-
/// This function checks if:
446-
/// 1. The array has data buffers (non-inline strings/binaries)
447-
/// 2. The total buffer memory exceeds a threshold
448-
/// 3. The array appears to be sliced (has potential memory waste)
445+
/// Arrow's `gc()` always allocates new compact buffers (it is never a no-op), so we
446+
/// check here to skip the allocation cost when data buffers are small.
449447
///
450-
/// The Arrow `gc()` method itself is a no-op for arrays that don't benefit from GC,
451-
/// but we still check here to avoid the overhead of calling gc() unnecessarily.
452-
///
453-
/// # Memory threshold rationale
454-
///
455-
/// We use a 10KB threshold based on:
456-
/// - Small arrays (< 10KB) have negligible memory impact even if sliced
457-
/// - GC has CPU overhead that isn't worth it for small arrays
458-
/// - This threshold captures pathological cases (e.g., ClickBench: 820MB -> 33MB)
459-
/// while avoiding unnecessary GC on small working sets
448+
/// We check `data_buffers` capacity rather than `array.get_buffer_memory_size()` because
449+
/// the latter includes the views buffer (16 bytes × n_rows), which scales with row count
450+
/// regardless of string content. We want to trigger GC only when non-inline string data
451+
/// is substantial enough to justify the copy.
460452
fn should_gc_view_array<T: ByteViewType>(array: &GenericByteViewArray<T>) -> bool {
461453
const MIN_BUFFER_SIZE_FOR_GC: usize = 10 * 1024; // 10KB threshold
462454

463455
let data_buffers = array.data_buffers();
464456
if data_buffers.is_empty() {
465-
// All strings/binaries are inlined (< 12 bytes), no GC needed
466457
return false;
467458
}
468459

469-
// Calculate total buffer memory
470460
let total_buffer_size: usize = data_buffers.iter().map(|b| b.capacity()).sum();
471-
472-
// Only GC if buffers exceed threshold
473461
total_buffer_size > MIN_BUFFER_SIZE_FOR_GC
474462
}
475463

@@ -1035,7 +1023,7 @@ mod tests {
10351023
fn test_gc_string_view_before_spill() -> Result<()> {
10361024
use arrow::array::StringViewArray;
10371025

1038-
let strings: Vec<String> = (0..1000)
1026+
let strings: Vec<String> = (0..200)
10391027
.map(|i| {
10401028
if i % 2 == 0 {
10411029
"short_string".to_string()
@@ -1056,7 +1044,7 @@ mod tests {
10561044
Arc::clone(&schema),
10571045
vec![Arc::new(string_array) as ArrayRef],
10581046
)?;
1059-
let sliced_batch = batch.slice(0, 100);
1047+
let sliced_batch = batch.slice(0, 20);
10601048
let gc_batch = gc_view_arrays(&sliced_batch)?;
10611049

10621050
assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
@@ -1069,7 +1057,7 @@ mod tests {
10691057
fn test_gc_binary_view_before_spill() -> Result<()> {
10701058
use arrow::array::BinaryViewArray;
10711059

1072-
let binaries: Vec<Vec<u8>> = (0..1000)
1060+
let binaries: Vec<Vec<u8>> = (0..200)
10731061
.map(|i| {
10741062
if i % 2 == 0 {
10751063
vec![1, 2, 3, 4]
@@ -1091,7 +1079,7 @@ mod tests {
10911079
Arc::clone(&schema),
10921080
vec![Arc::new(binary_array) as ArrayRef],
10931081
)?;
1094-
let sliced_batch = batch.slice(0, 100);
1082+
let sliced_batch = batch.slice(0, 20);
10951083
let gc_batch = gc_view_arrays(&sliced_batch)?;
10961084

10971085
assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
@@ -1161,7 +1149,7 @@ mod tests {
11611149

11621150
#[test]
11631151
fn test_verify_gc_triggers_for_sliced_arrays() -> Result<()> {
1164-
let strings: Vec<String> = (0..1000)
1152+
let strings: Vec<String> = (0..200)
11651153
.map(|i| {
11661154
format!(
11671155
"http://example.com/very/long/path/that/exceeds/inline/threshold/{i}"
@@ -1181,7 +1169,7 @@ mod tests {
11811169
vec![Arc::new(string_array.clone()) as ArrayRef],
11821170
)?;
11831171

1184-
let sliced = batch.slice(0, 100);
1172+
let sliced = batch.slice(0, 20);
11851173

11861174
let sliced_array = sliced
11871175
.column(0)
@@ -1208,7 +1196,7 @@ mod tests {
12081196
use arrow::array::StringViewArray;
12091197
use std::fs;
12101198

1211-
let num_rows = 5000;
1199+
let num_rows = 1000;
12121200
let mut strings = Vec::with_capacity(num_rows);
12131201

12141202
for i in 0..num_rows {
@@ -1284,7 +1272,7 @@ mod tests {
12841272

12851273
#[test]
12861274
fn test_spill_with_and_without_gc_comparison() -> Result<()> {
1287-
let num_rows = 2000;
1275+
let num_rows = 400;
12881276
let strings: Vec<String> = (0..num_rows)
12891277
.map(|i| {
12901278
format!(
@@ -1303,7 +1291,7 @@ mod tests {
13031291
let batch =
13041292
RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef])?;
13051293

1306-
let sliced_batch = batch.slice(0, 200);
1294+
let sliced_batch = batch.slice(0, 40);
13071295

13081296
let array_without_gc = sliced_batch
13091297
.column(0)

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{common::spawn_buffered, metrics::SpillMetrics};
2323
use arrow::array::StringViewArray;
2424
use arrow::datatypes::SchemaRef;
2525
use arrow::record_batch::RecordBatch;
26-
use datafusion_common::utils::memory::get_record_batch_memory_size;
2726
use datafusion_common::{DataFusionError, Result, config::SpillCompression};
2827
use datafusion_execution::SendableRecordBatchStream;
2928
use datafusion_execution::disk_manager::RefCountedTempFile;
@@ -127,10 +126,8 @@ impl SpillManager {
127126
if borrowed.num_rows() == 0 {
128127
return Ok(());
129128
}
130-
in_progress_file.append_batch(borrowed)?;
131-
132-
max_record_batch_size =
133-
max_record_batch_size.max(get_record_batch_memory_size(borrowed));
129+
let gc_sliced_size = in_progress_file.append_batch(borrowed)?;
130+
max_record_batch_size = max_record_batch_size.max(gc_sliced_size);
134131
Result::<_, DataFusionError>::Ok(())
135132
})?;
136133

@@ -153,9 +150,9 @@ impl SpillManager {
153150

154151
while let Some(batch) = stream.next().await {
155152
let batch = batch?;
156-
in_progress_file.append_batch(&batch)?;
153+
let gc_sliced_size = in_progress_file.append_batch(&batch)?;
157154

158-
max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
155+
max_record_batch_size = max_record_batch_size.max(gc_sliced_size);
159156
}
160157

161158
let file = in_progress_file.finish()?;

0 commit comments

Comments
 (0)