Skip to content

Commit 4b8c1d9

Browse files
Fix massive spill files for StringView/BinaryView columns II (#21633)
- Replaces #19444 which seems stuck. - fixes #19414 - closes #19444 FYi @EeshanBembi --------- Co-authored-by: Eeshan Bembi <bembieeshan@gmail.com>
1 parent 1f0faf9 commit 4b8c1d9

7 files changed

Lines changed: 599 additions & 90 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ arrow-avro = { version = "58.1.0", default-features = false, features = [
100100
"xz",
101101
] }
102102
arrow-buffer = { version = "58.1.0", default-features = false }
103+
arrow-data = { version = "58.1.0", default-features = false }
103104
arrow-flight = { version = "58.1.0", features = [
104105
"flight-sql-experimental",
105106
] }

datafusion/physical-plan/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ name = "datafusion_physical_plan"
4848

4949
[dependencies]
5050
arrow = { workspace = true }
51+
arrow-data = { workspace = true }
5152
arrow-ord = { workspace = true }
5253
arrow-schema = { workspace = true }
5354
async-trait = { workspace = true }
@@ -72,6 +73,7 @@ pin-project-lite = "^0.2.7"
7273
tokio = { workspace = true }
7374

7475
[dev-dependencies]
76+
arrow-data = { workspace = true }
7577
criterion = { workspace = true, features = ["async_futures"] }
7678
datafusion-functions-aggregate = { workspace = true }
7779
datafusion-functions-window = { workspace = true }

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use crate::{
5555
Statistics,
5656
};
5757

58-
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
58+
use arrow::array::{RecordBatch, RecordBatchOptions};
5959
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
6060
use arrow::datatypes::SchemaRef;
6161
use datafusion_common::config::SpillCompression;
@@ -420,8 +420,6 @@ impl ExternalSorter {
420420
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
421421
}
422422

423-
Self::organize_stringview_arrays(globally_sorted_batches)?;
424-
425423
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
426424

427425
let batches_to_spill = std::mem::take(globally_sorted_batches);
@@ -433,10 +431,9 @@ impl ExternalSorter {
433431
})?;
434432

435433
for batch in batches_to_spill {
436-
in_progress_file.append_batch(&batch)?;
434+
let gc_sliced_size = in_progress_file.append_batch(&batch)?;
437435

438-
*max_record_batch_size =
439-
(*max_record_batch_size).max(batch.get_sliced_size()?);
436+
*max_record_batch_size = (*max_record_batch_size).max(gc_sliced_size);
440437
}
441438

442439
assert_or_internal_err!(
@@ -465,71 +462,6 @@ impl ExternalSorter {
465462
Ok(())
466463
}
467464

468-
/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
469-
/// `StringViewArray` in sequential order by calling `gc()` on them.
470-
///
471-
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
472-
/// available
473-
///
474-
/// # Rationale
475-
/// After (merge-based) sorting, all batches will be sorted into a single run,
476-
/// but physically this sorted run is chunked into many small batches. For
477-
/// `StringViewArray`s inside each sorted run, their inner buffers are not
478-
/// re-constructed by default, leading to non-sequential payload locations
479-
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
480-
/// be shared by multiple `RecordBatch`es.
481-
/// When writing each batch to disk, the writer has to write all referenced buffers,
482-
/// because they have to be read back one by one to reduce memory usage. This
483-
/// causes extra disk reads and writes, and potentially execution failure.
484-
///
485-
/// # Example
486-
/// Before sorting:
487-
/// batch1 -> buffer1
488-
/// batch2 -> buffer2
489-
///
490-
/// sorted_batch1 -> buffer1
491-
/// -> buffer2
492-
/// sorted_batch2 -> buffer1
493-
/// -> buffer2
494-
///
495-
/// Then when spilling each batch, the writer has to write all referenced buffers
496-
/// repeatedly.
497-
fn organize_stringview_arrays(
498-
globally_sorted_batches: &mut Vec<RecordBatch>,
499-
) -> Result<()> {
500-
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
501-
502-
for batch in globally_sorted_batches.drain(..) {
503-
let mut new_columns: Vec<Arc<dyn Array>> =
504-
Vec::with_capacity(batch.num_columns());
505-
506-
let mut arr_mutated = false;
507-
for array in batch.columns() {
508-
if let Some(string_view_array) =
509-
array.as_any().downcast_ref::<StringViewArray>()
510-
{
511-
let new_array = string_view_array.gc();
512-
new_columns.push(Arc::new(new_array));
513-
arr_mutated = true;
514-
} else {
515-
new_columns.push(Arc::clone(array));
516-
}
517-
}
518-
519-
let organized_batch = if arr_mutated {
520-
RecordBatch::try_new(batch.schema(), new_columns)?
521-
} else {
522-
batch
523-
};
524-
525-
organized_batches.push(organized_batch);
526-
}
527-
528-
*globally_sorted_batches = organized_batches;
529-
530-
Ok(())
531-
}
532-
533465
/// Sorts the in-memory batches and merges them into a single sorted run, then writes
534466
/// the result to spill files.
535467
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use arrow::array::RecordBatch;
2424
use datafusion_common::exec_datafusion_err;
2525
use datafusion_execution::disk_manager::RefCountedTempFile;
2626

27-
use super::{IPCStreamWriter, spill_manager::SpillManager};
27+
use super::{
28+
IPCStreamWriter, gc_view_arrays,
29+
spill_manager::{GetSlicedSize, SpillManager},
30+
};
2831

2932
/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
3033
/// Caller is able to use this struct to incrementally append in-memory batches to
@@ -51,16 +54,25 @@ impl InProgressSpillFile {
5154

5255
/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
5356
///
57+
/// Before writing, performs GC on StringView/BinaryView arrays to compact backing
58+
/// buffers. When a view array is sliced, it still references the original full buffers,
59+
/// causing massive spill files without GC (see issue #19414: 820MB → 33MB after GC).
60+
///
61+
/// Returns the post-GC sliced memory size of the batch for memory accounting.
62+
///
5463
/// # Errors
5564
/// - Returns an error if the file is not active (has been finalized)
5665
/// - Returns an error if appending would exceed the disk usage limit configured
5766
/// by `max_temp_directory_size` in `DiskManager`
58-
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
67+
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<usize> {
5968
if self.in_progress_file.is_none() {
6069
return Err(exec_datafusion_err!(
6170
"Append operation failed: No active in-progress file. The file may have already been finalized."
6271
));
6372
}
73+
74+
let gc_batch = gc_view_arrays(batch)?;
75+
6476
if self.writer.is_none() {
6577
// Use the SpillManager's declared schema rather than the batch's schema.
6678
// Individual batches may have different schemas (e.g., different nullability)
@@ -87,7 +99,7 @@ impl InProgressSpillFile {
8799
}
88100
}
89101
if let Some(writer) = &mut self.writer {
90-
let (spilled_rows, _) = writer.write(batch)?;
102+
let (spilled_rows, _) = writer.write(&gc_batch)?;
91103
if let Some(in_progress_file) = &mut self.in_progress_file {
92104
let pre_size = in_progress_file.current_disk_usage();
93105
in_progress_file.update_disk_usage()?;
@@ -102,7 +114,7 @@ impl InProgressSpillFile {
102114
unreachable!() // Already checked inside current function
103115
}
104116
}
105-
Ok(())
117+
gc_batch.get_sliced_size()
106118
}
107119

108120
pub fn flush(&mut self) -> Result<()> {

0 commit comments

Comments
 (0)