Skip to content

Commit 667ebb5

Browse files
committed
refactor: add gc_stringview_arrays util and use that
1 parent 0733758 commit 667ebb5

3 files changed

Lines changed: 43 additions & 46 deletions

File tree

datafusion/physical-plan/src/common.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use super::SendableRecordBatchStream;
2525
use crate::stream::RecordBatchReceiverStream;
2626
use crate::{ColumnStatistics, Statistics};
2727

28-
use arrow::array::Array;
28+
use arrow::array::{Array, StringViewArray};
2929
use arrow::datatypes::Schema;
30+
use arrow::error::ArrowError;
3031
use arrow::record_batch::RecordBatch;
3132
use datafusion_common::stats::Precision;
3233
use datafusion_common::{Result, plan_err};
@@ -190,7 +191,7 @@ pub fn can_project(
190191
.max()
191192
.is_some_and(|&i| i >= schema.fields().len())
192193
{
193-
Err(arrow::error::ArrowError::SchemaError(format!(
194+
Err(ArrowError::SchemaError(format!(
194195
"project index {} out of bounds, max field {}",
195196
columns.iter().max().unwrap(),
196197
schema.fields().len()
@@ -204,6 +205,31 @@ pub fn can_project(
204205
}
205206
}
206207

208+
/// Return a new `RecordBatch` with [`StringViewArray::gc`] called on such columns (if any).
209+
pub(crate) fn gc_stringview_arrays(
210+
batch: RecordBatch,
211+
) -> Result<RecordBatch, ArrowError> {
212+
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
213+
214+
let mut arr_mutated = false;
215+
for array in batch.columns() {
216+
if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>()
217+
{
218+
let new_array = string_view_array.gc();
219+
new_columns.push(Arc::new(new_array));
220+
arr_mutated = true;
221+
} else {
222+
new_columns.push(Arc::clone(array));
223+
}
224+
}
225+
226+
if arr_mutated {
227+
RecordBatch::try_new(batch.schema(), new_columns)
228+
} else {
229+
Ok(batch)
230+
}
231+
}
232+
207233
#[cfg(test)]
208234
mod tests {
209235
use super::*;

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use super::{
3131
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
3232
};
3333
use crate::coalesce::LimitedBatchCoalescer;
34+
use crate::common::gc_stringview_arrays;
3435
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3536
use crate::hash_utils::create_hashes;
3637
use crate::metrics::{BaselineMetrics, SpillMetrics};
@@ -611,27 +612,20 @@ impl BatchPartitioner {
611612
let columns =
612613
take_arrays(batch.columns(), &indices_array, None)?;
613614

614-
let new_columns = columns
615-
.into_iter()
616-
.map(|col| {
617-
if let Some(sv) =
618-
col.as_any().downcast_ref::<StringViewArray>()
619-
{
620-
Arc::new(sv.gc())
621-
} else {
622-
col
623-
}
624-
})
625-
.collect();
626-
627615
let mut options = RecordBatchOptions::new();
628616
options = options.with_row_count(Some(indices_array.len()));
629617
let batch = RecordBatch::try_new_with_options(
630618
batch.schema(),
631-
new_columns,
619+
columns,
632620
&options,
633-
)
634-
.unwrap();
621+
)?;
622+
623+
// When `StringViewArray`s are present, the `take_arrays` call above
624+
// re-uses data buffers from the original array. This causes the memory
625+
// pool to count the same data buffers multiple times, once for each
626+
// consumer of the repartition.
627+
// So we gc the output arrays, which creates new data buffers.
628+
let batch = gc_stringview_arrays(batch)?;
635629

636630
partitioned_batches.push(Ok((partition, batch)));
637631

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626

2727
use parking_lot::RwLock;
2828

29-
use crate::common::spawn_buffered;
29+
use crate::common::{gc_stringview_arrays, spawn_buffered};
3030
use crate::execution_plan::{
3131
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
3232
};
@@ -483,33 +483,10 @@ impl ExternalSorter {
483483
fn organize_stringview_arrays(
484484
globally_sorted_batches: &mut Vec<RecordBatch>,
485485
) -> Result<()> {
486-
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
487-
488-
for batch in globally_sorted_batches.drain(..) {
489-
let mut new_columns: Vec<Arc<dyn Array>> =
490-
Vec::with_capacity(batch.num_columns());
491-
492-
let mut arr_mutated = false;
493-
for array in batch.columns() {
494-
if let Some(string_view_array) =
495-
array.as_any().downcast_ref::<StringViewArray>()
496-
{
497-
let new_array = string_view_array.gc();
498-
new_columns.push(Arc::new(new_array));
499-
arr_mutated = true;
500-
} else {
501-
new_columns.push(Arc::clone(array));
502-
}
503-
}
504-
505-
let organized_batch = if arr_mutated {
506-
RecordBatch::try_new(batch.schema(), new_columns)?
507-
} else {
508-
batch
509-
};
510-
511-
organized_batches.push(organized_batch);
512-
}
486+
let organized_batches = globally_sorted_batches
487+
.drain(..)
488+
.map(gc_stringview_arrays)
489+
.collect::<Result<_, _>>()?;
513490

514491
*globally_sorted_batches = organized_batches;
515492

0 commit comments

Comments
 (0)