Skip to content

Commit 8c0bfc0

Browse files
committed
test: add garbage collection tests for arrow C stream reader
1 parent 9c362aa commit 8c0bfc0

1 file changed

Lines changed: 49 additions & 0 deletions

File tree

python/tests/test_dataframe.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,6 +1874,55 @@ def test_arrow_streams_context_drop_no_segfault():
18741874
gc.collect()
18751875

18761876

1877+
def test_arrow_c_stream_reader_discard_gc():
1878+
"""Capsule imported into a reader and immediately discarded."""
1879+
1880+
for _ in range(50):
1881+
ctx = SessionContext()
1882+
df = ctx.sql("SELECT 1 AS a")
1883+
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
1884+
del reader
1885+
del df
1886+
del ctx
1887+
gc.collect()
1888+
1889+
1890+
def test_arrow_c_stream_unconsumed_gc():
1891+
"""Capsule left unconsumed and only garbage-collected."""
1892+
1893+
for _ in range(50):
1894+
ctx = SessionContext()
1895+
df = ctx.sql("SELECT 1 AS a")
1896+
df.__arrow_c_stream__()
1897+
del df
1898+
del ctx
1899+
gc.collect()
1900+
1901+
1902+
def test_arrow_c_stream_concurrent_context_drop():
1903+
"""Consume capsule while context is deleted concurrently."""
1904+
1905+
for _ in range(50):
1906+
ctx = SessionContext()
1907+
df = ctx.sql("SELECT 1 AS a")
1908+
capsule = df.__arrow_c_stream__()
1909+
reader = pa.RecordBatchReader._import_from_c_capsule(capsule)
1910+
result: list[pa.Table] = []
1911+
1912+
def consume(reader=reader, result=result) -> None:
1913+
result.append(reader.read_all())
1914+
1915+
thread = threading.Thread(target=consume)
1916+
thread.start()
1917+
del df
1918+
del ctx
1919+
thread.join()
1920+
assert result[0].column("a").to_pylist() == [1]
1921+
del reader
1922+
del capsule
1923+
gc.collect()
1924+
1925+
18771926
def test_arrow_stream_to_pylist(df):
18781927
capsule = df.__arrow_c_stream__()
18791928
reader = pa.RecordBatchReader._import_from_c_capsule(capsule)

0 commit comments

Comments
 (0)