Skip to content

Commit 9c362aa

Browse files
committed
refactor: improve session context management in DataFrame stream readers
1 parent 22e213b commit 9c362aa

2 files changed

Lines changed: 33 additions & 5 deletions

File tree

python/datafusion/dataframe.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,8 +1118,9 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11181118
11191119
The returned capsule holds a reference to the originating
11201120
:class:`SessionContext`, keeping it alive until the stream is fully
1121-
consumed. This makes it safe to drop the original context after obtaining
1122-
the stream.
1121+
consumed. The stream is explicitly closed before the context is
1122+
released, so it is safe to drop the original context after obtaining the
1123+
stream.
11231124
11241125
Args:
11251126
requested_schema: Attempt to provide the DataFrame using this schema.

src/dataframe.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,13 +366,19 @@ impl PyDataFrame {
366366
/// converted via `record_batch_into_schema` to apply schema changes per batch.
367367
struct PartitionedDataFrameStreamReader {
368368
streams: Vec<Arc<Mutex<SendableRecordBatchStream>>>,
369-
// Hold a reference to the session context to keep it alive
370-
_ctx: Arc<SessionContext>,
371369
schema: SchemaRef,
372370
projection: Option<SchemaRef>,
373371
current: usize,
374372
}
375373

374+
/// Wrapper that keeps the [`SessionContext`] alive while a
375+
/// [`PartitionedDataFrameStreamReader`] is exported through the Arrow C Stream
376+
/// interface.
377+
struct StreamWithContext {
378+
reader: PartitionedDataFrameStreamReader,
379+
ctx: Arc<SessionContext>,
380+
}
381+
376382
impl Iterator for PartitionedDataFrameStreamReader {
377383
type Item = Result<RecordBatch, ArrowError>;
378384

@@ -419,6 +425,27 @@ impl RecordBatchReader for PartitionedDataFrameStreamReader {
419425
}
420426
}
421427

428+
impl Iterator for StreamWithContext {
429+
type Item = Result<RecordBatch, ArrowError>;
430+
431+
fn next(&mut self) -> Option<Self::Item> {
432+
self.reader.next()
433+
}
434+
}
435+
436+
impl RecordBatchReader for StreamWithContext {
437+
fn schema(&self) -> SchemaRef {
438+
self.reader.schema()
439+
}
440+
}
441+
442+
impl Drop for StreamWithContext {
443+
fn drop(&mut self) {
444+
// Explicitly close streams before the context is released
445+
self.reader.streams.clear();
446+
}
447+
}
448+
422449
#[pymethods]
423450
impl PyDataFrame {
424451
/// Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`
@@ -984,12 +1011,12 @@ impl PyDataFrame {
9841011
let schema_ref = Arc::new(schema.clone());
9851012

9861013
let reader = PartitionedDataFrameStreamReader {
987-
_ctx: ctx,
9881014
streams,
9891015
schema: schema_ref,
9901016
projection,
9911017
current: 0,
9921018
};
1019+
let reader = StreamWithContext { reader, ctx };
9931020
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
9941021

9951022
let stream = FFI_ArrowArrayStream::new(reader);

0 commit comments

Comments
 (0)