Skip to content

Commit 6a1547d

Browse files
committed
fix: move gc to hash repartition only
This is not needed for round robin repartition
1 parent 06cde20 commit 6a1547d

File tree

1 file changed

+15
-21
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+15
-21
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -608,11 +608,25 @@ impl BatchPartitioner {
608608
let columns =
609609
take_arrays(batch.columns(), &indices_array, None)?;
610610

611+
let new_columns = columns
612+
.into_iter()
613+
.map(|col| {
614+
use arrow::array::{Array, StringViewArray};
615+
if let Some(sv) =
616+
col.as_any().downcast_ref::<StringViewArray>()
617+
{
618+
Arc::new(sv.gc()) as Arc<dyn Array>
619+
} else {
620+
col
621+
}
622+
})
623+
.collect();
624+
611625
let mut options = RecordBatchOptions::new();
612626
options = options.with_row_count(Some(indices_array.len()));
613627
let batch = RecordBatch::try_new_with_options(
614628
batch.schema(),
615-
columns,
629+
new_columns,
616630
&options,
617631
)
618632
.unwrap();
@@ -1359,26 +1373,6 @@ impl RepartitionExec {
13591373

13601374
for res in partitioner.partition_iter(batch)? {
13611375
let (partition, batch) = res?;
1362-
1363-
let batch = {
1364-
use arrow::array::{Array, StringViewArray};
1365-
let mut new_columns = Vec::with_capacity(batch.num_columns());
1366-
let mut mutated = false;
1367-
for col in batch.columns() {
1368-
if let Some(sv) = col.as_any().downcast_ref::<StringViewArray>() {
1369-
new_columns.push(Arc::new(sv.gc()) as Arc<dyn Array>);
1370-
mutated = true;
1371-
} else {
1372-
new_columns.push(Arc::clone(col));
1373-
}
1374-
}
1375-
if mutated {
1376-
RecordBatch::try_new(batch.schema(), new_columns)?
1377-
} else {
1378-
batch
1379-
}
1380-
};
1381-
13821376
let size = batch.get_array_memory_size();
13831377

13841378
let timer = metrics.send_time[partition].timer();

0 commit comments

Comments
 (0)