Skip to content

Commit c90005d

Browse files
committed
fix: gc string view arrays in RepartitionExec
Fixes #20491 - Took the fix from `ExternalSorter` introduced in #14823 - If any `StringViewArray` columns are present in the repartitioned input, we gc them to reduce duplicate tracking of the same string view buffer. - Fixes over-counting when there's a `RepartitionExec` above a partial agg on a `StringViewArray` column.
1 parent 2347306 commit c90005d

1 file changed

Lines changed: 20 additions & 0 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,26 @@ impl RepartitionExec {
13751375

13761376
for res in partitioner.partition_iter(batch)? {
13771377
let (partition, batch) = res?;
1378+
1379+
let batch = {
1380+
use arrow::array::{Array, StringViewArray};
1381+
let mut new_columns = Vec::with_capacity(batch.num_columns());
1382+
let mut mutated = false;
1383+
for col in batch.columns() {
1384+
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1385+
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1386+
mutated = true;
1387+
} else {
1388+
new_columns.push(Arc::clone(col));
1389+
}
1390+
}
1391+
if mutated {
1392+
RecordBatch::try_new(batch.schema(), new_columns)?
1393+
} else {
1394+
batch
1395+
}
1396+
};
1397+
13781398
let size = batch.get_array_memory_size();
13791399

13801400
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)