Skip to content

Commit 8510df3

Browse files
committed
fix: gc string view arrays in RepartitionExec
Fixes #20491 - Took the fix from `ExternalSorter` introduced in #14823 - If any `StringViewArray` columns are present in the repartitioned input, we gc them to reduce duplicate tracking of the same string view buffer. - Fixes over-counting when there's a `RepartitionExec` above a partial agg on a `StringViewArray` column.
1 parent cdaecf0 commit 8510df3

1 file changed

Lines changed: 20 additions & 0 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,26 @@ impl RepartitionExec {
13901390

13911391
for res in partitioner.partition_iter(batch)? {
13921392
let (partition, batch) = res?;
1393+
1394+
let batch = {
1395+
use arrow::array::{Array, StringViewArray};
1396+
let mut new_columns = Vec::with_capacity(batch.num_columns());
1397+
let mut mutated = false;
1398+
for col in batch.columns() {
1399+
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1400+
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1401+
mutated = true;
1402+
} else {
1403+
new_columns.push(Arc::clone(col));
1404+
}
1405+
}
1406+
if mutated {
1407+
RecordBatch::try_new(batch.schema(), new_columns)?
1408+
} else {
1409+
batch
1410+
}
1411+
};
1412+
13931413
let size = batch.get_array_memory_size();
13941414

13951415
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)