Skip to content

Commit f2e7076

Browse files
committed
fix: move gc to hash repartition only
This is not needed for round robin repartition
1 parent 76434f6 commit f2e7076

1 file changed

Lines changed: 15 additions & 21 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -611,11 +611,25 @@ impl BatchPartitioner {
611611
let columns =
612612
take_arrays(batch.columns(), &indices_array, None)?;
613613

614+
let new_columns = columns
615+
.into_iter()
616+
.map(|col| {
617+
use arrow::array::{Array, StringViewArray};
618+
if let Some(sv) =
619+
col.as_any().downcast_ref::<StringViewArray>()
620+
{
621+
Arc::new(sv.gc()) as Arc<dyn Array>
622+
} else {
623+
col
624+
}
625+
})
626+
.collect();
627+
614628
let mut options = RecordBatchOptions::new();
615629
options = options.with_row_count(Some(indices_array.len()));
616630
let batch = RecordBatch::try_new_with_options(
617631
batch.schema(),
618-
columns,
632+
new_columns,
619633
&options,
620634
)
621635
.unwrap();
@@ -1390,26 +1404,6 @@ impl RepartitionExec {
13901404

13911405
for res in partitioner.partition_iter(batch)? {
13921406
let (partition, batch) = res?;
1393-
1394-
let batch = {
1395-
use arrow::array::{Array, StringViewArray};
1396-
let mut new_columns = Vec::with_capacity(batch.num_columns());
1397-
let mut mutated = false;
1398-
for col in batch.columns() {
1399-
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1400-
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1401-
mutated = true;
1402-
} else {
1403-
new_columns.push(Arc::clone(col));
1404-
}
1405-
}
1406-
if mutated {
1407-
RecordBatch::try_new(batch.schema(), new_columns)?
1408-
} else {
1409-
batch
1410-
}
1411-
};
1412-
14131407
let size = batch.get_array_memory_size();
14141408

14151409
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)