Skip to content

Commit 8a53db1

Browse files
committed
Optimize pyarrow.RecordBatch class fetching by reusing it across calls
1 parent 00f16df commit 8a53db1

1 file changed

Lines changed: 11 additions & 9 deletions

File tree

src/dataframe.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::ffi::CString;
2020
use std::ptr::addr_of;
2121
use std::sync::Arc;
2222

23-
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader, StructArray};
23+
use arrow::array::{new_null_array, Array, RecordBatch, RecordBatchReader, StructArray};
2424
use arrow::compute::can_cast_types;
2525
use arrow::error::ArrowError;
2626
use arrow::ffi::{self, FFI_ArrowArray, FFI_ArrowSchema};
@@ -528,6 +528,9 @@ impl PyDataFrame {
528528
let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
529529
.map_err(PyDataFusionError::from)?;
530530

531+
// Fetch pyarrow.RecordBatch class once per call and reuse it
532+
let record_batch_class = py.import("pyarrow")?.getattr("RecordBatch")?;
533+
531534
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
532535
.allow_threads(|| {
533536
batches
@@ -536,16 +539,14 @@ impl PyDataFrame {
536539
let sa: StructArray = rb.into();
537540
ffi::to_ffi(&sa.to_data())
538541
})
539-
.collect()
542+
.collect::<Result<Vec<_>, ArrowError>>()
540543
})
541544
.map_err(PyDataFusionError::from)?;
542545

543-
let module = py.import("pyarrow")?;
544-
let class = module.getattr("RecordBatch")?;
545546
ffi_batches
546547
.into_iter()
547548
.map(|(array, schema)| {
548-
class
549+
record_batch_class
549550
.call_method1(
550551
"_import_from_c",
551552
(
@@ -570,6 +571,9 @@ impl PyDataFrame {
570571
let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?
571572
.map_err(PyDataFusionError::from)?;
572573

574+
// Fetch pyarrow.RecordBatch class once and reuse it for all partitions
575+
let record_batch_class = py.import("pyarrow")?.getattr("RecordBatch")?;
576+
573577
batches
574578
.into_iter()
575579
.map(|rbs| {
@@ -580,15 +584,13 @@ impl PyDataFrame {
580584
let sa: StructArray = rb.into();
581585
ffi::to_ffi(&sa.to_data())
582586
})
583-
.collect()
587+
.collect::<Result<Vec<_>, ArrowError>>()
584588
})
585589
.map_err(PyDataFusionError::from)?;
586-
let module = py.import("pyarrow")?;
587-
let class = module.getattr("RecordBatch")?;
588590
ffi_batches
589591
.into_iter()
590592
.map(|(array, schema)| {
591-
class
593+
record_batch_class
592594
.call_method1(
593595
"_import_from_c",
594596
(

0 commit comments

Comments
 (0)