Skip to content

Commit e35f889

Browse files
committed
Replace Tokio runtime creation with spawn_stream utility in PySessionContext
1 parent c21234c commit e35f889

1 file changed

Lines changed: 2 additions & 7 deletions

File tree

src/context.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, get_tokio_runtime, validate_pycapsule, wait_for_future};
48+
use crate::utils::{get_global_ctx, spawn_stream, validate_pycapsule, wait_for_future};
4949
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5050
use datafusion::arrow::pyarrow::PyArrowType;
5151
use datafusion::arrow::record_batch::RecordBatch;
@@ -74,7 +74,6 @@ use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvid
7474
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7575
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
7676
use pyo3::IntoPyObjectExt;
77-
use tokio::task::JoinHandle;
7877

7978
/// Configuration options for a SessionContext
8079
#[pyclass(name = "SessionConfig", module = "datafusion", subclass)]
@@ -1132,12 +1131,8 @@ impl PySessionContext {
11321131
py: Python,
11331132
) -> PyDataFusionResult<PyRecordBatchStream> {
11341133
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1135-
// create a Tokio runtime to run the async code
1136-
let rt = &get_tokio_runtime().0;
11371134
let plan = plan.plan.clone();
1138-
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
1139-
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1140-
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
1135+
let stream = spawn_stream(py, async move { plan.execute(part, Arc::new(ctx)) })?;
11411136
Ok(PyRecordBatchStream::new(stream))
11421137
}
11431138
}

0 commit comments

Comments
 (0)