Skip to content

Commit e37796b

Browse files
committed
Add spawn_stream utility for handling SendableRecordBatchStream on Tokio runtime
1 parent e35f889 commit e37796b

1 file changed

Lines changed: 15 additions & 3 deletions

File tree

src/utils.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
use crate::{
1919
common::data_type::PyScalarValue,
20-
errors::{PyDataFusionError, PyDataFusionResult},
20+
errors::{to_datafusion_err, PyDataFusionError, PyDataFusionResult},
2121
TokioRuntime,
2222
};
2323
use datafusion::{
24-
common::ScalarValue, execution::context::SessionContext, logical_expr::Volatility,
24+
common::ScalarValue, execution::context::SessionContext, execution::SendableRecordBatchStream,
25+
logical_expr::Volatility,
2526
};
2627
use pyo3::prelude::*;
2728
use pyo3::{exceptions::PyValueError, types::PyCapsule};
2829
use std::{future::Future, sync::OnceLock, time::Duration};
29-
use tokio::{runtime::Runtime, time::sleep};
30+
use tokio::{runtime::Runtime, task::JoinHandle, time::sleep};
3031
/// Utility to get the Tokio Runtime from Python
3132
#[inline]
3233
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
@@ -84,6 +85,17 @@ where
8485
})
8586
}
8687

88+
/// Spawn a [`SendableRecordBatchStream`] on the Tokio runtime and wait for completion
89+
/// while respecting Python signal handling.
90+
pub(crate) fn spawn_stream<F>(py: Python, fut: F) -> PyDataFusionResult<SendableRecordBatchStream>
91+
where
92+
F: Future<Output = datafusion::common::Result<SendableRecordBatchStream>> + Send + 'static,
93+
{
94+
let rt = &get_tokio_runtime().0;
95+
let handle: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> = rt.spawn(fut);
96+
wait_for_future(py, async { handle.await.map_err(to_datafusion_err) })???
97+
}
98+
8799
pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
88100
Ok(match value {
89101
"immutable" => Volatility::Immutable,

0 commit comments

Comments
 (0)