Skip to content

Commit 05de09a

Browse files
EeshanBembiadriangb
authored andcommitted
Address PR review feedback for StringView/BinaryView GC
- Replace row count heuristic with 10KB memory threshold - Improve documentation and add inline comments - Remove redundant test_exact_clickbench_issue_19414 - Maintains 96% reduction in spill file sizes
1 parent 23633d4 commit 05de09a

1 file changed

Lines changed: 67 additions & 131 deletions

File tree

  • datafusion/physical-plan/src/spill

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 67 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,52 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize {
344344
max_alignment
345345
}
346346

347+
/// Size of a single view structure in StringView/BinaryView arrays (in bytes).
348+
/// Each view is 16 bytes: 4 bytes length + 4 bytes prefix + 8 bytes buffer ID/offset.
347349
#[cfg(test)]
348350
const VIEW_SIZE_BYTES: usize = 16;
351+
352+
/// Maximum size of inlined string/binary data in StringView/BinaryView arrays.
353+
/// Strings/binaries <= 12 bytes are stored inline within the 16-byte view structure.
354+
/// This matches the Arrow specification for view arrays.
349355
#[cfg(test)]
350356
const INLINE_THRESHOLD: usize = 12;
351357

352-
/// Performs garbage collection on view arrays before spilling.
358+
/// Performs garbage collection on StringView and BinaryView arrays before spilling to reduce memory usage.
359+
///
360+
/// # Why GC is needed
361+
///
362+
/// StringView and BinaryView arrays can accumulate significant memory waste when sliced.
363+
/// When a large array is sliced (e.g., taking first 100 rows of 1000), the view array
364+
/// still references the original data buffers containing all 1000 rows of data.
365+
///
366+
/// For example, in the ClickBench benchmark (issue #19414), repeated slicing of StringView
367+
/// arrays resulted in 820MB of spill files that could be reduced to just 33MB after GC -
368+
/// a 96% reduction in size.
369+
///
370+
/// # How it works
371+
///
372+
/// The GC process creates new compact buffers containing only the data referenced by the
373+
/// current views, eliminating unreferenced data from sliced arrays.
353374
pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
375+
// Early return optimization: Skip GC entirely if the batch contains no view arrays.
376+
// This avoids unnecessary processing for batches with only primitive types.
377+
let has_view_arrays = batch.columns().iter().any(|array| {
378+
matches!(
379+
array.data_type(),
380+
arrow::datatypes::DataType::Utf8View | arrow::datatypes::DataType::BinaryView
381+
)
382+
});
383+
384+
if !has_view_arrays {
385+
// Return a new batch to maintain consistent behavior
386+
return Ok(RecordBatch::try_new(
387+
batch.schema(),
388+
batch.columns().to_vec(),
389+
)?);
390+
}
391+
354392
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
355-
let mut any_gc_performed = false;
356393

357394
for array in batch.columns() {
358395
let gc_array = match array.data_type() {
@@ -361,8 +398,9 @@ pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
361398
.as_any()
362399
.downcast_ref::<StringViewArray>()
363400
.expect("Utf8View array should downcast to StringViewArray");
364-
if should_gc_view_array(string_view.len(), string_view.data_buffers()) {
365-
any_gc_performed = true;
401+
// Only perform GC if the data buffers exceed our size threshold.
402+
// This balances memory savings against GC overhead.
403+
if should_gc_view_array(string_view.data_buffers()) {
366404
Arc::new(string_view.gc()) as Arc<dyn Array>
367405
} else {
368406
Arc::clone(array)
@@ -373,32 +411,43 @@ pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
373411
.as_any()
374412
.downcast_ref::<BinaryViewArray>()
375413
.expect("BinaryView array should downcast to BinaryViewArray");
376-
if should_gc_view_array(binary_view.len(), binary_view.data_buffers()) {
377-
any_gc_performed = true;
414+
// Only perform GC if the data buffers exceed our size threshold.
415+
// This balances memory savings against GC overhead.
416+
if should_gc_view_array(binary_view.data_buffers()) {
378417
Arc::new(binary_view.gc()) as Arc<dyn Array>
379418
} else {
380419
Arc::clone(array)
381420
}
382421
}
422+
// Non-view arrays are passed through unchanged
383423
_ => Arc::clone(array),
384424
};
385425
new_columns.push(gc_array);
386426
}
387427

388-
if any_gc_performed {
389-
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
390-
} else {
391-
Ok(batch.clone())
392-
}
428+
// Always return a new batch for consistency
429+
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
393430
}
394431

395-
fn should_gc_view_array(len: usize, data_buffers: &[arrow::buffer::Buffer]) -> bool {
396-
if len < 10 {
397-
return false;
398-
}
399-
432+
/// Determines whether a view array should be garbage collected based on its buffer usage.
433+
///
434+
/// Uses a minimum buffer size threshold to avoid unnecessary GC on small arrays.
435+
/// This prevents the overhead of GC for arrays with negligible memory footprint,
436+
/// while still capturing cases where sliced arrays carry large unreferenced buffers.
437+
///
438+
/// # Why not use get_record_batch_memory_size
439+
///
440+
/// We use manual buffer size calculation here because:
441+
/// - `get_record_batch_memory_size` operates on entire arrays, not just the data buffers
442+
/// - We need to check buffer capacity specifically to determine GC potential
443+
/// - The data buffers are what gets compacted during GC, so their size is the key metric
444+
fn should_gc_view_array(data_buffers: &[arrow::buffer::Buffer]) -> bool {
445+
// Only perform GC if the buffers exceed 10KB. This avoids the overhead of
446+
// GC for small arrays while still capturing the pathological cases like
447+
// the ClickBench scenario (820MB -> 33MB reduction).
448+
const MIN_BUFFER_SIZE_FOR_GC: usize = 10 * 1024; // 10KB threshold
400449
let total_buffer_size: usize = data_buffers.iter().map(|b| b.capacity()).sum();
401-
total_buffer_size > 0
450+
total_buffer_size > MIN_BUFFER_SIZE_FOR_GC
402451
}
403452

404453
#[cfg(test)]
@@ -1115,8 +1164,7 @@ mod tests {
11151164
.as_any()
11161165
.downcast_ref::<StringViewArray>()
11171166
.unwrap();
1118-
let should_gc =
1119-
should_gc_view_array(sliced_array.len(), sliced_array.data_buffers());
1167+
let should_gc = should_gc_view_array(sliced_array.data_buffers());
11201168
let waste_ratio = calculate_string_view_waste_ratio(sliced_array);
11211169

11221170
assert!(
@@ -1210,118 +1258,6 @@ mod tests {
12101258
Ok(())
12111259
}
12121260

1213-
#[test]
1214-
fn test_exact_clickbench_issue_19414() -> Result<()> {
1215-
use arrow::array::StringViewArray;
1216-
use std::fs;
1217-
1218-
// Test for clickbench issue: 820MB -> 33MB spill reduction
1219-
let unique_urls = vec![
1220-
"http://irr.ru/index.php?showalbum/login-leniya7777294,938303130",
1221-
"http://komme%2F27.0.1453.116",
1222-
"https://produkty%2Fproduct",
1223-
"http://irr.ru/index.php?showalbum/login-kapusta-advert2668]=0&order_by=0",
1224-
"http://irr.ru/index.php?showalbum/login-kapustic/product_name",
1225-
"http://irr.ru/index.php",
1226-
"https://produkty%2F",
1227-
"http://irr.ru/index.php?showalbum/login",
1228-
"https://produkty/kurortmag",
1229-
"https://produkty%2Fpulove.ru/album/login",
1230-
];
1231-
1232-
let mut urls = Vec::with_capacity(200_000);
1233-
1234-
// URL frequencies from bug report
1235-
for _ in 0..58976 {
1236-
urls.push(unique_urls[0].to_string());
1237-
}
1238-
for _ in 0..29585 {
1239-
urls.push(unique_urls[1].to_string());
1240-
}
1241-
for _ in 0..11464 {
1242-
urls.push(unique_urls[2].to_string());
1243-
}
1244-
for _ in 0..10480 {
1245-
urls.push(unique_urls[3].to_string());
1246-
}
1247-
for _ in 0..10128 {
1248-
urls.push(unique_urls[4].to_string());
1249-
}
1250-
for _ in 0..7758 {
1251-
urls.push(unique_urls[5].to_string());
1252-
}
1253-
for _ in 0..6649 {
1254-
urls.push(unique_urls[6].to_string());
1255-
}
1256-
for _ in 0..6141 {
1257-
urls.push(unique_urls[7].to_string());
1258-
}
1259-
for _ in 0..5764 {
1260-
urls.push(unique_urls[8].to_string());
1261-
}
1262-
for _ in 0..5495 {
1263-
urls.push(unique_urls[9].to_string());
1264-
}
1265-
1266-
while urls.len() < 200_000 {
1267-
urls.push(unique_urls[urls.len() % 10].to_string());
1268-
}
1269-
1270-
let string_array = StringViewArray::from(urls);
1271-
let schema = Arc::new(Schema::new(vec![Field::new(
1272-
"URL",
1273-
DataType::Utf8View,
1274-
false,
1275-
)]));
1276-
1277-
let original_batch = RecordBatch::try_new(
1278-
Arc::clone(&schema),
1279-
vec![Arc::new(string_array.clone()) as ArrayRef],
1280-
)?;
1281-
1282-
// Simulate GROUP BY slices
1283-
let mut sliced_batches = Vec::new();
1284-
let total_rows = original_batch.num_rows();
1285-
let mut offset = 0;
1286-
1287-
let group_sizes = vec![
1288-
58976, 29585, 11464, 10480, 10128, 7758, 6649, 6141, 5764, 5495,
1289-
];
1290-
for &size in &group_sizes {
1291-
if offset < total_rows {
1292-
let actual_size = std::cmp::min(size, total_rows - offset);
1293-
sliced_batches.push(original_batch.slice(offset, actual_size));
1294-
offset += actual_size;
1295-
}
1296-
}
1297-
1298-
// Small groups for remainder
1299-
while offset < total_rows {
1300-
let size = std::cmp::min(100, total_rows - offset);
1301-
sliced_batches.push(original_batch.slice(offset, size));
1302-
offset += size;
1303-
}
1304-
1305-
// Setup spill with GC
1306-
let env = Arc::new(RuntimeEnv::default());
1307-
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1308-
let spill_manager = SpillManager::new(env, metrics, schema);
1309-
1310-
let mut in_progress_file = spill_manager.create_in_progress_file("Clickbench")?;
1311-
for batch in &sliced_batches {
1312-
in_progress_file.append_batch(batch)?;
1313-
}
1314-
1315-
let spill_file = in_progress_file.finish()?.unwrap();
1316-
let file_size_mb = fs::metadata(spill_file.path())?.len() as f64 / 1_048_576.0;
1317-
1318-
assert!(
1319-
file_size_mb < 50.0,
1320-
"Spill file should be <50MB (target 33MB), got {file_size_mb:.2}MB"
1321-
);
1322-
1323-
Ok(())
1324-
}
13251261

13261262
#[test]
13271263
fn test_spill_with_and_without_gc_comparison() -> Result<()> {

0 commit comments

Comments
 (0)