Skip to content

Commit a2d0808

Browse files
committed
Refactor record batch conversion to PyArrow into a separate function for improved readability and maintainability
1 parent 12fc7ab commit a2d0808

1 file changed

Lines changed: 38 additions & 52 deletions

File tree

src/dataframe.rs

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,42 @@ impl PyDataFrame {
357357
}
358358
}
359359

360+
/// Convert a vector of `RecordBatch` into PyArrow `RecordBatch` objects.
361+
///
362+
/// This performs the FFI conversion in parallel while releasing the GIL.
363+
fn record_batches_to_pyarrow(
364+
py: Python<'_>,
365+
record_batch_class: &Bound<'_, PyAny>,
366+
batches: Vec<RecordBatch>,
367+
) -> PyResult<Vec<PyObject>> {
368+
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
369+
.allow_threads(|| {
370+
batches
371+
.into_par_iter()
372+
.map(|rb| {
373+
let sa: StructArray = rb.into();
374+
ffi::to_ffi(&sa.to_data())
375+
})
376+
.collect::<Result<Vec<_>, ArrowError>>()
377+
})
378+
.map_err(PyDataFusionError::from)?;
379+
380+
ffi_batches
381+
.into_iter()
382+
.map(|(array, schema)| {
383+
record_batch_class
384+
.call_method1(
385+
"_import_from_c",
386+
(
387+
addr_of!(array) as Py_uintptr_t,
388+
addr_of!(schema) as Py_uintptr_t,
389+
),
390+
)
391+
.map(Into::into)
392+
})
393+
.collect()
394+
}
395+
360396
#[pymethods]
361397
impl PyDataFrame {
362398
/// Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`
@@ -531,32 +567,7 @@ impl PyDataFrame {
531567
// Fetch pyarrow.RecordBatch class once per call and reuse it
532568
let record_batch_class = py.import("pyarrow")?.getattr("RecordBatch")?;
533569

534-
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
535-
.allow_threads(|| {
536-
batches
537-
.into_par_iter()
538-
.map(|rb| {
539-
let sa: StructArray = rb.into();
540-
ffi::to_ffi(&sa.to_data())
541-
})
542-
.collect::<Result<Vec<_>, ArrowError>>()
543-
})
544-
.map_err(PyDataFusionError::from)?;
545-
546-
ffi_batches
547-
.into_iter()
548-
.map(|(array, schema)| {
549-
record_batch_class
550-
.call_method1(
551-
"_import_from_c",
552-
(
553-
addr_of!(array) as Py_uintptr_t,
554-
addr_of!(schema) as Py_uintptr_t,
555-
),
556-
)
557-
.map(Into::into)
558-
})
559-
.collect()
570+
record_batches_to_pyarrow(py, &record_batch_class, batches)
560571
}
561572

562573
/// Cache DataFrame.
@@ -576,32 +587,7 @@ impl PyDataFrame {
576587

577588
batches
578589
.into_iter()
579-
.map(|rbs| {
580-
let ffi_batches: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = py
581-
.allow_threads(|| {
582-
rbs.into_par_iter()
583-
.map(|rb| {
584-
let sa: StructArray = rb.into();
585-
ffi::to_ffi(&sa.to_data())
586-
})
587-
.collect::<Result<Vec<_>, ArrowError>>()
588-
})
589-
.map_err(PyDataFusionError::from)?;
590-
ffi_batches
591-
.into_iter()
592-
.map(|(array, schema)| {
593-
record_batch_class
594-
.call_method1(
595-
"_import_from_c",
596-
(
597-
addr_of!(array) as Py_uintptr_t,
598-
addr_of!(schema) as Py_uintptr_t,
599-
),
600-
)
601-
.map(Into::into)
602-
})
603-
.collect::<PyResult<Vec<_>>>()
604-
})
590+
.map(|rbs| record_batches_to_pyarrow(py, &record_batch_class, rbs))
605591
.collect()
606592
}
607593

0 commit comments

Comments
 (0)