Skip to content

Commit 06cde20

Browse files
committed
fix: gc string view arrays in RepartitionExec
Fixes #20491 - Took the fix from `ExternalSorter` introduced in #14823 - If any `StringViewArray` columns are present in the repartitioned input, we gc them to reduce duplicate tracking of the same string view buffer. - Fixes over-counting when there's a `RepartitionExec` above a partial agg on a `StringViewArray` column.
1 parent 9660c98 commit 06cde20

1 file changed

Lines changed: 20 additions & 0 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,26 @@ impl RepartitionExec {
13591359

13601360
for res in partitioner.partition_iter(batch)? {
13611361
let (partition, batch) = res?;
1362+
1363+
let batch = {
1364+
use arrow::array::{Array, StringViewArray};
1365+
let mut new_columns = Vec::with_capacity(batch.num_columns());
1366+
let mut mutated = false;
1367+
for col in batch.columns() {
1368+
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1369+
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1370+
mutated = true;
1371+
} else {
1372+
new_columns.push(Arc::clone(col));
1373+
}
1374+
}
1375+
if mutated {
1376+
RecordBatch::try_new(batch.schema(), new_columns)?
1377+
} else {
1378+
batch
1379+
}
1380+
};
1381+
13621382
let size = batch.get_array_memory_size();
13631383

13641384
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)