Skip to content

Commit 471de13

Browse files
committed
fix: move gc to hash repartition only
This is not needed for round robin repartition
1 parent c90005d commit 471de13

1 file changed

Lines changed: 15 additions & 21 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -611,11 +611,25 @@ impl BatchPartitioner {
611611
let columns =
612612
take_arrays(batch.columns(), &indices_array, None)?;
613613

614+
let new_columns = columns
615+
.into_iter()
616+
.map(|col| {
617+
use arrow::array::{Array, StringViewArray};
618+
if let Some(sv) =
619+
col.as_any().downcast_ref::<StringViewArray>()
620+
{
621+
Arc::new(sv.gc()) as Arc<dyn Array>
622+
} else {
623+
col
624+
}
625+
})
626+
.collect();
627+
614628
let mut options = RecordBatchOptions::new();
615629
options = options.with_row_count(Some(indices_array.len()));
616630
let batch = RecordBatch::try_new_with_options(
617631
batch.schema(),
618-
columns,
632+
new_columns,
619633
&options,
620634
)
621635
.unwrap();
@@ -1375,26 +1389,6 @@ impl RepartitionExec {
13751389

13761390
for res in partitioner.partition_iter(batch)? {
13771391
let (partition, batch) = res?;
1378-
1379-
let batch = {
1380-
use arrow::array::{Array, StringViewArray};
1381-
let mut new_columns = Vec::with_capacity(batch.num_columns());
1382-
let mut mutated = false;
1383-
for col in batch.columns() {
1384-
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1385-
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1386-
mutated = true;
1387-
} else {
1388-
new_columns.push(Arc::clone(col));
1389-
}
1390-
}
1391-
if mutated {
1392-
RecordBatch::try_new(batch.schema(), new_columns)?
1393-
} else {
1394-
batch
1395-
}
1396-
};
1397-
13981392
let size = batch.get_array_memory_size();
13991393

14001394
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)