Skip to content

Commit a81a639

Browse files
weakref.finalize cleans up event-loop thread on Connection GC
ISSUE-10 — the dedicated event-loop thread created by _ensure_loop used to leak forever if the Connection was garbage-collected without explicit close() (exception during use, forgotten `with`, tests). Long-lived processes accumulated dozens of daemon threads. Now _ensure_loop registers a weakref.finalize; the finalizer stops the loop, joins the thread, and emits ResourceWarning (matches stdlib sqlite3 convention) so leaks are visible in dev and test. The finalizer captures a 1-element ``closed_flag`` list that close() mutates, so explicit close() suppresses the warning and detaches the finalizer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6c9c8f9 commit a81a639

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed

src/dqlitedbapi/connection.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,58 @@
33
import asyncio
44
import contextlib
55
import threading
6+
import warnings
7+
import weakref
68
from typing import Any
79

810
from dqliteclient import DqliteConnection
911
from dqlitedbapi.cursor import Cursor
1012
from dqlitedbapi.exceptions import InterfaceError, OperationalError, ProgrammingError
1113

1214

15+
def _cleanup_loop_thread(
16+
loop: asyncio.AbstractEventLoop,
17+
thread: threading.Thread,
18+
closed_flag: list[bool],
19+
address: str,
20+
) -> None:
21+
"""Stop the background event loop and join its thread.
22+
23+
Called from a ``weakref.finalize`` so it must not reference the
24+
``Connection`` instance. ``closed_flag`` is a 1-element list that
25+
the Connection mutates when ``close()`` is called — we use that
26+
rather than a direct reference to self to decide whether to emit
27+
a ``ResourceWarning``.
28+
"""
29+
if closed_flag[0] is False:
30+
# User never called close() → leak warning (matches stdlib
31+
# sqlite3). Don't crash at interpreter shutdown.
32+
try:
33+
warnings.warn(
34+
f"Connection(address={address!r}) was garbage-collected "
35+
f"without close(); cleaning up event-loop thread. Call "
36+
f"Connection.close() explicitly to avoid this warning.",
37+
ResourceWarning,
38+
stacklevel=2,
39+
)
40+
except Exception:
41+
pass
42+
try:
43+
if not loop.is_closed():
44+
loop.call_soon_threadsafe(loop.stop)
45+
except Exception:
46+
pass
47+
try:
48+
thread.join(timeout=5)
49+
except Exception:
50+
pass
51+
try:
52+
if not loop.is_closed():
53+
loop.close()
54+
except Exception:
55+
pass
56+
57+
1358
class Connection:
1459
"""PEP 249 compliant database connection."""
1560

@@ -38,6 +83,11 @@ def __init__(
3883
self._op_lock = threading.Lock()
3984
self._connect_lock: asyncio.Lock | None = None
4085
self._creator_thread = threading.get_ident()
86+
# 1-element list (mutable, captured by the finalizer) that
87+
# close() flips to True. Using a list avoids the finalizer
88+
# closing over ``self`` and preventing GC.
89+
self._closed_flag: list[bool] = [False]
90+
self._finalizer: weakref.finalize | None = None
4191

4292
def _check_thread(self) -> None:
4393
"""Raise ProgrammingError if called from a different thread than the creator."""
@@ -54,6 +104,11 @@ def _ensure_loop(self) -> asyncio.AbstractEventLoop:
54104
55105
This allows sync methods to work even when called from within
56106
an already-running async context (e.g. uvicorn).
107+
108+
Registers a ``weakref.finalize`` the first time the loop is
109+
created so a Connection that's garbage-collected without an
110+
explicit ``close()`` still cleans up its thread. (GC'd connections
111+
used to leak daemon threads forever.)
57112
"""
58113
if self._loop is not None and not self._loop.is_closed():
59114
return self._loop
@@ -62,6 +117,18 @@ def _ensure_loop(self) -> asyncio.AbstractEventLoop:
62117
self._loop = asyncio.new_event_loop()
63118
self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
64119
self._thread.start()
120+
# Finalizer can't close over self — it'd keep the
121+
# Connection alive. Capture primitives only. The
122+
# closed-flag list is mutated by close() so the
123+
# finalizer knows whether to emit a leak warning.
124+
self._finalizer = weakref.finalize(
125+
self,
126+
_cleanup_loop_thread,
127+
self._loop,
128+
self._thread,
129+
self._closed_flag,
130+
self._address,
131+
)
65132
return self._loop
66133

67134
def _run_sync(self, coro: Any) -> Any:
@@ -134,6 +201,14 @@ def close(self) -> None:
134201
if self._closed:
135202
return
136203
self._closed = True
204+
# Flip the flag the finalizer reads so it knows this was an
205+
# explicit close (no ResourceWarning).
206+
self._closed_flag[0] = True
207+
# Detach the finalizer — it's about to do nothing useful, and
208+
# keeping it registered would double-stop the loop.
209+
if self._finalizer is not None:
210+
self._finalizer.detach()
211+
self._finalizer = None
137212
try:
138213
if self._loop is not None and not self._loop.is_closed():
139214
with contextlib.suppress(Exception):

tests/test_gc_cleanup.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""Event-loop thread is cleaned up on Connection GC (ISSUE-10).
2+
3+
Previously Connection spawned a daemon thread via _ensure_loop() and
4+
only stopped it on explicit close(). A GC'd connection leaked its
5+
thread forever, which bit long-lived processes (web servers, workers)
6+
accumulating dozens/hundreds of leaked loops over time.
7+
8+
Now _ensure_loop registers a weakref.finalize that stops the loop and
9+
joins the thread when the Connection is GC'd, and emits ResourceWarning
10+
to mirror stdlib sqlite3 conventions.
11+
"""
12+
13+
import gc
14+
import threading
15+
import warnings
16+
17+
import pytest
18+
19+
from dqlitedbapi.connection import Connection
20+
21+
22+
class TestGCCleanup:
23+
def test_gc_cleans_up_loop_thread(self) -> None:
24+
"""Connection without explicit close() gets cleaned up on GC."""
25+
baseline = threading.active_count()
26+
# Create a connection and force the background loop to exist.
27+
conn = Connection("localhost:19001", timeout=2.0)
28+
conn._ensure_loop()
29+
assert threading.active_count() == baseline + 1
30+
31+
# GC without close(). Expect ResourceWarning.
32+
with warnings.catch_warnings(record=True) as captured:
33+
warnings.simplefilter("always")
34+
del conn
35+
gc.collect()
36+
# Give the finalizer thread time to join.
37+
for t in threading.enumerate():
38+
if t.daemon and t is not threading.current_thread():
39+
t.join(timeout=1.0)
40+
41+
# At least one ResourceWarning about the leak.
42+
rw = [w for w in captured if issubclass(w.category, ResourceWarning)]
43+
assert rw, f"expected ResourceWarning; got categories={[w.category for w in captured]}"
44+
45+
def test_explicit_close_suppresses_resourcewarning(self) -> None:
46+
"""If the user calls close(), finalizer shouldn't emit a warning."""
47+
with warnings.catch_warnings(record=True) as captured:
48+
warnings.simplefilter("always")
49+
conn = Connection("localhost:19001", timeout=2.0)
50+
conn._ensure_loop()
51+
conn.close()
52+
del conn
53+
gc.collect()
54+
55+
# No ResourceWarning since the user did the right thing.
56+
rw = [w for w in captured if issubclass(w.category, ResourceWarning)]
57+
assert not rw, f"unexpected ResourceWarning: {[str(w.message) for w in rw]}"

0 commit comments

Comments
 (0)