Skip to content

Commit d5820e1

Browse files
committed
Refactor sql methods in PySessionContext to use immutable self reference
Enhance wait_for_future function to handle current runtime context
1 parent c0a5770 commit d5820e1

2 files changed

Lines changed: 10 additions & 4 deletions

File tree

src/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,15 +429,15 @@ impl PySessionContext {
429429
}
430430

431431
/// Returns a PyDataFrame whose plan corresponds to the SQL statement.
432-
pub fn sql(&mut self, query: &str, py: Python) -> PyDataFusionResult<PyDataFrame> {
432+
pub fn sql(&self, query: &str, py: Python) -> PyDataFusionResult<PyDataFrame> {
433433
let result = self.ctx.sql(query);
434434
let df = wait_for_future(py, result)??;
435435
Ok(PyDataFrame::new(df))
436436
}
437437

438438
#[pyo3(signature = (query, options=None))]
439439
pub fn sql_with_options(
440-
&mut self,
440+
&self,
441441
query: &str,
442442
options: Option<PySQLOptions>,
443443
py: Python,

src/utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ where
8282
const INTERVAL_CHECK_SIGNALS: Duration = Duration::from_millis(1_000);
8383

8484
py.allow_threads(|| {
85-
runtime.block_on(async {
85+
let wait_future = || async {
8686
tokio::pin!(fut);
8787
loop {
8888
tokio::select! {
@@ -92,7 +92,13 @@ where
9292
}
9393
}
9494
}
95-
})
95+
};
96+
97+
if tokio::runtime::Handle::try_current().is_ok() {
98+
tokio::task::block_in_place(|| runtime.block_on(wait_future()))
99+
} else {
100+
runtime.block_on(wait_future())
101+
}
96102
})
97103
}
98104

0 commit comments

Comments
 (0)