Skip to content

Commit 3c47ec3

Browse files
committed
test: add test for dropping stream after close to prevent crashes
1 parent 19020b3 commit 3c47ec3

2 files changed

Lines changed: 14 additions & 7 deletions

File tree

python/tests/test_dataframe_iter.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,11 @@ async def test_async_iter_dataframe(ctx):
3838
batches = [batch async for batch in df]
3939
assert len(batches) == 1
4040
assert batches[0].column(0).to_pylist() == [1, 2]
41+
42+
43+
def test_drop_after_close_does_not_crash(ctx):
44+
df = ctx.from_pydict({"a": [1, 2]})
45+
stream = df.execute_stream()
46+
stream.close()
47+
# Dropping the stream after close should not crash
48+
del stream

src/record_batch.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,12 @@ async fn close_stream(stream: Arc<Mutex<SendableRecordBatchStream>>) {
170170

171171
impl Drop for PyRecordBatchStream {
172172
fn drop(&mut self) {
173-
if let Some(stream) = self.stream.take() {
174-
// Drain the stream while the context is still alive
175-
Python::with_gil(|py| {
176-
let _ = wait_for_future(py, close_stream(stream));
177-
});
178-
}
179-
// Drop the context after the stream has been fully drained
173+
// Avoid interacting with Python during interpreter shutdown by
174+
// not attempting to drain the stream here. Any remaining
175+
// batches will be dropped and background tasks allowed to
176+
// complete naturally. Users should call `close` explicitly to
177+
// drain the stream when Python is still available.
178+
self.stream.take();
180179
self._ctx.take();
181180
}
182181
}

0 commit comments

Comments
 (0)