Skip to content

Commit f49d7b3

Browse files
EeshanBembiadriangb
authored andcommitted
Remove redundant StringView GC in external sort
The SpillManager now handles GC for StringView/BinaryView arrays internally via gc_view_arrays(), making the organize_stringview_arrays() function in external sort redundant. Changes: - Remove organize_stringview_arrays() call and function from sort.rs - Use batch.clone() for early return (cheaper than creating new batch) - Use arrow_data::MAX_INLINE_VIEW_LEN constant instead of custom constant - Update comment in spill_manager.rs to reference gc_view_arrays()
1 parent 9cb3aef commit f49d7b3

6 files changed

Lines changed: 9 additions & 81 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ arrow-avro = { version = "58.1.0", default-features = false, features = [
100100
"xz",
101101
] }
102102
arrow-buffer = { version = "58.1.0", default-features = false }
103+
arrow-data = { version = "58.1.0", default-features = false }
103104
arrow-flight = { version = "58.1.0", features = [
104105
"flight-sql-experimental",
105106
] }

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pin-project-lite = "^0.2.7"
7272
tokio = { workspace = true }
7373

7474
[dev-dependencies]
75+
arrow-data = { workspace = true }
7576
criterion = { workspace = true, features = ["async_futures"] }
7677
datafusion-functions-aggregate = { workspace = true }
7778
datafusion-functions-window = { workspace = true }

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::{
5353
Statistics,
5454
};
5555

56-
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
56+
use arrow::array::{Array, RecordBatch, RecordBatchOptions};
5757
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
5858
use arrow::datatypes::SchemaRef;
5959
use datafusion_common::config::SpillCompression;
@@ -418,8 +418,6 @@ impl ExternalSorter {
418418
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
419419
}
420420

421-
Self::organize_stringview_arrays(globally_sorted_batches)?;
422-
423421
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
424422

425423
let batches_to_spill = std::mem::take(globally_sorted_batches);
@@ -463,71 +461,6 @@ impl ExternalSorter {
463461
Ok(())
464462
}
465463

466-
/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
467-
/// `StringViewArray` in sequential order by calling `gc()` on them.
468-
///
469-
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
470-
/// available
471-
///
472-
/// # Rationale
473-
/// After (merge-based) sorting, all batches will be sorted into a single run,
474-
/// but physically this sorted run is chunked into many small batches. For
475-
/// `StringViewArray`s inside each sorted run, their inner buffers are not
476-
/// re-constructed by default, leading to non-sequential payload locations
477-
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
478-
/// be shared by multiple `RecordBatch`es.
479-
/// When writing each batch to disk, the writer has to write all referenced buffers,
480-
/// because they have to be read back one by one to reduce memory usage. This
481-
/// causes extra disk reads and writes, and potentially execution failure.
482-
///
483-
/// # Example
484-
/// Before sorting:
485-
/// batch1 -> buffer1
486-
/// batch2 -> buffer2
487-
///
488-
/// sorted_batch1 -> buffer1
489-
/// -> buffer2
490-
/// sorted_batch2 -> buffer1
491-
/// -> buffer2
492-
///
493-
/// Then when spilling each batch, the writer has to write all referenced buffers
494-
/// repeatedly.
495-
fn organize_stringview_arrays(
496-
globally_sorted_batches: &mut Vec<RecordBatch>,
497-
) -> Result<()> {
498-
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
499-
500-
for batch in globally_sorted_batches.drain(..) {
501-
let mut new_columns: Vec<Arc<dyn Array>> =
502-
Vec::with_capacity(batch.num_columns());
503-
504-
let mut arr_mutated = false;
505-
for array in batch.columns() {
506-
if let Some(string_view_array) =
507-
array.as_any().downcast_ref::<StringViewArray>()
508-
{
509-
let new_array = string_view_array.gc();
510-
new_columns.push(Arc::new(new_array));
511-
arr_mutated = true;
512-
} else {
513-
new_columns.push(Arc::clone(array));
514-
}
515-
}
516-
517-
let organized_batch = if arr_mutated {
518-
RecordBatch::try_new(batch.schema(), new_columns)?
519-
} else {
520-
batch
521-
};
522-
523-
organized_batches.push(organized_batch);
524-
}
525-
526-
*globally_sorted_batches = organized_batches;
527-
528-
Ok(())
529-
}
530-
531464
/// Sorts the in-memory batches and merges them into a single sorted run, then writes
532465
/// the result to spill files.
533466
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,6 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize {
349349
#[cfg(test)]
350350
const VIEW_SIZE_BYTES: usize = 16;
351351

352-
/// Maximum size of inlined string/binary data in StringView/BinaryView arrays.
353-
/// Strings/binaries <= 12 bytes are stored inline within the 16-byte view structure.
354-
/// This matches the Arrow specification for view arrays.
355-
#[cfg(test)]
356-
const INLINE_THRESHOLD: usize = 12;
357-
358352
/// Performs garbage collection on StringView and BinaryView arrays before spilling to reduce memory usage.
359353
///
360354
/// # Why GC is needed
@@ -382,11 +376,8 @@ pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
382376
});
383377

384378
if !has_view_arrays {
385-
// Return a new batch to maintain consistent behavior
386-
return Ok(RecordBatch::try_new(
387-
batch.schema(),
388-
batch.columns().to_vec(),
389-
)?);
379+
// RecordBatch::clone() is cheap - just Arc reference count bumps
380+
return Ok(batch.clone());
390381
}
391382

392383
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
@@ -452,10 +443,11 @@ fn should_gc_view_array(data_buffers: &[arrow::buffer::Buffer]) -> bool {
452443

453444
#[cfg(test)]
454445
fn calculate_string_view_waste_ratio(array: &StringViewArray) -> f64 {
446+
use arrow_data::MAX_INLINE_VIEW_LEN;
455447
calculate_view_waste_ratio(array.len(), array.data_buffers(), |i| {
456448
if !array.is_null(i) {
457449
let value = array.value(i);
458-
if value.len() > INLINE_THRESHOLD {
450+
if value.len() > MAX_INLINE_VIEW_LEN as usize {
459451
return value.len();
460452
}
461453
}

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl SpillManager {
197197
pub(crate) trait GetSlicedSize {
198198
/// Returns the size of the `RecordBatch` when sliced.
199199
/// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
200-
/// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
200+
/// Therefore, make sure we call gc() or gc_view_arrays() before using this method.
201201
fn get_sliced_size(&self) -> Result<usize>;
202202
}
203203

0 commit comments

Comments
 (0)