Skip to content

Commit f9e791f

Browse files
committed
refactor: add gc_stringview_arrays util and use that
1 parent cf80341 commit f9e791f

3 files changed

Lines changed: 43 additions & 46 deletions

File tree

datafusion/physical-plan/src/common.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use super::SendableRecordBatchStream;
2525
use crate::stream::RecordBatchReceiverStream;
2626
use crate::{ColumnStatistics, Statistics};
2727

28-
use arrow::array::Array;
28+
use arrow::array::{Array, StringViewArray};
2929
use arrow::datatypes::Schema;
30+
use arrow::error::ArrowError;
3031
use arrow::record_batch::RecordBatch;
3132
use datafusion_common::stats::Precision;
3233
use datafusion_common::{Result, plan_err};
@@ -190,7 +191,7 @@ pub fn can_project(
190191
.max()
191192
.is_some_and(|&i| i >= schema.fields().len())
192193
{
193-
Err(arrow::error::ArrowError::SchemaError(format!(
194+
Err(ArrowError::SchemaError(format!(
194195
"project index {} out of bounds, max field {}",
195196
columns.iter().max().unwrap(),
196197
schema.fields().len()
@@ -204,6 +205,31 @@ pub fn can_project(
204205
}
205206
}
206207

208+
/// Return a new `RecordBatch` with [`StringViewArray::gc`] called on such columns (if any).
209+
pub(crate) fn gc_stringview_arrays(
210+
batch: RecordBatch,
211+
) -> Result<RecordBatch, ArrowError> {
212+
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
213+
214+
let mut arr_mutated = false;
215+
for array in batch.columns() {
216+
if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>()
217+
{
218+
let new_array = string_view_array.gc();
219+
new_columns.push(Arc::new(new_array));
220+
arr_mutated = true;
221+
} else {
222+
new_columns.push(Arc::clone(array));
223+
}
224+
}
225+
226+
if arr_mutated {
227+
RecordBatch::try_new(batch.schema(), new_columns)
228+
} else {
229+
Ok(batch)
230+
}
231+
}
232+
207233
#[cfg(test)]
208234
mod tests {
209235
use super::*;

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use super::{
3131
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
3232
};
3333
use crate::coalesce::LimitedBatchCoalescer;
34+
use crate::common::gc_stringview_arrays;
3435
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3536
use crate::hash_utils::create_hashes;
3637
use crate::metrics::{BaselineMetrics, SpillMetrics};
@@ -611,27 +612,20 @@ impl BatchPartitioner {
611612
let columns =
612613
take_arrays(batch.columns(), &indices_array, None)?;
613614

614-
let new_columns = columns
615-
.into_iter()
616-
.map(|col| {
617-
if let Some(sv) =
618-
col.as_any().downcast_ref::<StringViewArray>()
619-
{
620-
Arc::new(sv.gc())
621-
} else {
622-
col
623-
}
624-
})
625-
.collect();
626-
627615
let mut options = RecordBatchOptions::new();
628616
options = options.with_row_count(Some(indices_array.len()));
629617
let batch = RecordBatch::try_new_with_options(
630618
batch.schema(),
631-
new_columns,
619+
columns,
632620
&options,
633-
)
634-
.unwrap();
621+
)?;
622+
623+
// When `StringViewArray`s are present, the `take_arrays` call above
624+
// re-uses data buffers from the original array. This causes the memory
625+
// pool to count the same data buffers multiple times, once for each
626+
// consumer of the repartition.
627+
// So we gc the output arrays, which creates new data buffers.
628+
let batch = gc_stringview_arrays(batch)?;
635629

636630
partitioned_batches.push(Ok((partition, batch)));
637631

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626

2727
use parking_lot::RwLock;
2828

29-
use crate::common::spawn_buffered;
29+
use crate::common::{gc_stringview_arrays, spawn_buffered};
3030
use crate::execution_plan::{
3131
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
3232
};
@@ -496,33 +496,10 @@ impl ExternalSorter {
496496
fn organize_stringview_arrays(
497497
globally_sorted_batches: &mut Vec<RecordBatch>,
498498
) -> Result<()> {
499-
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
500-
501-
for batch in globally_sorted_batches.drain(..) {
502-
let mut new_columns: Vec<Arc<dyn Array>> =
503-
Vec::with_capacity(batch.num_columns());
504-
505-
let mut arr_mutated = false;
506-
for array in batch.columns() {
507-
if let Some(string_view_array) =
508-
array.as_any().downcast_ref::<StringViewArray>()
509-
{
510-
let new_array = string_view_array.gc();
511-
new_columns.push(Arc::new(new_array));
512-
arr_mutated = true;
513-
} else {
514-
new_columns.push(Arc::clone(array));
515-
}
516-
}
517-
518-
let organized_batch = if arr_mutated {
519-
RecordBatch::try_new(batch.schema(), new_columns)?
520-
} else {
521-
batch
522-
};
523-
524-
organized_batches.push(organized_batch);
525-
}
499+
let organized_batches = globally_sorted_batches
500+
.drain(..)
501+
.map(gc_stringview_arrays)
502+
.collect::<Result<_, _>>()?;
526503

527504
*globally_sorted_batches = organized_batches;
528505

0 commit comments

Comments
 (0)