Skip to content

Commit a738963

Browse files
Add has_terminate + do_terminate to the async dialect
SQLAlchemy's async pool gates the forced-disposal path on the dialect's has_terminate flag and a do_terminate(dbapi_connection) override. The reference aiosqlite dialect wires both; ours silently omitted them, so engine.dispose() on a connection whose rollback was hanging would block on graceful close. AsyncAdaptedConnection gains a terminate() that forwards straight to the underlying async close without attempting rollback first — that's the point of the path. DqliteDialect_aio pins has_terminate = True and routes do_terminate through the adapter's terminate(). Three pin tests fence the flag location and rollback-skip invariant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 077f92b commit a738963

2 files changed

Lines changed: 145 additions & 0 deletions

File tree

src/sqlalchemydqlite/aio.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,18 @@ def close(self) -> None:
285285
finally:
286286
await_only(self._connection.close())
287287

288+
def terminate(self) -> None:
289+
"""Force-close the underlying connection without rollback.
290+
291+
SQLAlchemy's async pool calls ``dialect.do_terminate(dbapi_conn)``
292+
(which defers to this method) when ``has_terminate = True`` and
293+
a connection must be forcibly reclaimed — typically during
294+
``engine.dispose()`` under failure, or when a stuck rollback
295+
would otherwise block shutdown. Unlike ``close()`` we do NOT
296+
attempt rollback first: that's the whole point of terminate.
297+
"""
298+
await_only(self._connection.close())
299+
288300

289301
class DqliteDialect_aio(DqliteDialect): # noqa: N801
290302
"""Async SQLAlchemy dialect for dqlite.
@@ -312,10 +324,28 @@ class DqliteDialect_aio(DqliteDialect): # noqa: N801
312324
# through an SS-cursor code path the adapter does not implement.
313325
supports_server_side_cursors = False
314326

327+
# SQLAlchemy's async pool gates its forced-disposal path on
328+
# ``has_terminate`` (see ``pool/base.py`` docs for
329+
# ``_ConnectionRecord.invalidate``). The reference aiosqlite
330+
# dialect sets this True; our ``AsyncAdaptedConnection`` now
331+
# provides a ``terminate()`` that skips rollback and closes
332+
# directly, so pin True locally to defend against an MRO flip
333+
# from the DefaultDialect default (``False``).
334+
has_terminate = True
335+
315336
@classmethod
316337
def get_pool_class(cls, url: URL) -> type[pool.Pool]:
317338
return AsyncAdaptedQueuePool
318339

340+
def do_terminate(self, dbapi_connection: Any) -> None:
341+
"""Integration point SQLAlchemy's async pool calls for forced
342+
disposal. Defers to ``AsyncAdaptedConnection.terminate()``,
343+
which closes without the usual pre-close rollback so a stuck
344+
rollback on a half-dead connection cannot block
345+
``engine.dispose()``.
346+
"""
347+
dbapi_connection.terminate()
348+
319349
@classmethod
320350
def import_dbapi(cls) -> types.ModuleType:
321351
from dqlitedbapi import aio

tests/test_aio_terminate.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""Pin ``has_terminate = True`` and the terminate integration path.
2+
3+
SQLAlchemy's async pool gates its forced-disposal path on the dialect's
4+
``has_terminate`` flag plus a working ``do_terminate(dbapi_connection)``
5+
override. Without both, ``engine.dispose()`` on a stuck connection
6+
defers to the full graceful-close path and can block on a hanging
7+
rollback. These tests fence three invariants:
8+
9+
1. The flag is pinned locally on ``DqliteDialect_aio`` (not merely
10+
inherited from ``DefaultDialect`` / parent MRO).
11+
2. ``do_terminate`` delegates to the adapter's ``terminate()``.
12+
3. ``terminate()`` closes the underlying connection without first
13+
invoking rollback — the whole point of ``terminate`` is to skip
14+
any graceful teardown that could hang.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
from sqlalchemydqlite.aio import AsyncAdaptedConnection, DqliteDialect_aio
20+
21+
22+
class _FakeAsyncConn:
23+
def __init__(self) -> None:
24+
self.rollback_calls = 0
25+
self.close_calls = 0
26+
27+
async def rollback(self) -> None: # pragma: no cover — must not be called
28+
self.rollback_calls += 1
29+
30+
async def close(self) -> None:
31+
self.close_calls += 1
32+
33+
34+
class TestHasTerminatePinned:
35+
def test_flag_is_true(self) -> None:
36+
assert DqliteDialect_aio.has_terminate is True
37+
38+
def test_flag_is_local_to_class(self) -> None:
39+
"""The pin must live in ``DqliteDialect_aio.__dict__`` so an
40+
upstream DefaultDialect default flip cannot silently revert it.
41+
"""
42+
assert "has_terminate" in DqliteDialect_aio.__dict__
43+
44+
def test_do_terminate_is_local_override(self) -> None:
45+
assert "do_terminate" in DqliteDialect_aio.__dict__
46+
47+
48+
class TestAsyncAdaptedConnectionTerminate:
49+
def test_terminate_skips_rollback(self) -> None:
50+
"""``terminate()`` must NOT call rollback — that's the whole
51+
point of the forced-disposal path. Stub ``await_only`` so the
52+
sync-driven call resolves deterministically without a real
53+
event loop.
54+
"""
55+
fake = _FakeAsyncConn()
56+
adapter = AsyncAdaptedConnection(fake) # type: ignore[arg-type]
57+
58+
from sqlalchemydqlite import aio as aio_module
59+
60+
def _fake_await_only(coro: object) -> object:
61+
import asyncio
62+
63+
return asyncio.new_event_loop().run_until_complete(coro) # type: ignore[arg-type]
64+
65+
orig = aio_module.await_only
66+
aio_module.await_only = _fake_await_only # type: ignore[assignment]
67+
try:
68+
adapter.terminate()
69+
finally:
70+
aio_module.await_only = orig # type: ignore[assignment]
71+
72+
assert fake.rollback_calls == 0
73+
assert fake.close_calls == 1
74+
75+
76+
class TestDoTerminateDelegatesToAdapter:
77+
def test_do_terminate_calls_terminate_on_dbapi_connection(self) -> None:
78+
"""``dialect.do_terminate(dbapi_conn)`` is SQLAlchemy's single
79+
integration point; verify it forwards to the adapter's
80+
``terminate()`` rather than reaching into private state.
81+
"""
82+
calls: list[str] = []
83+
84+
class _DbapiConn:
85+
def terminate(self) -> None:
86+
calls.append("terminate")
87+
88+
dialect = DqliteDialect_aio()
89+
dialect.do_terminate(_DbapiConn())
90+
91+
assert calls == ["terminate"]
92+
93+
def test_do_terminate_leaves_rollback_alone(self) -> None:
94+
"""Under a mocked connection, do_terminate must not invoke
95+
rollback even as a side-effect of teardown logic elsewhere."""
96+
fake = _FakeAsyncConn()
97+
adapter = AsyncAdaptedConnection(fake) # type: ignore[arg-type]
98+
dialect = DqliteDialect_aio()
99+
100+
from sqlalchemydqlite import aio as aio_module
101+
102+
def _fake_await_only(coro: object) -> object:
103+
import asyncio
104+
105+
return asyncio.new_event_loop().run_until_complete(coro) # type: ignore[arg-type]
106+
107+
orig = aio_module.await_only
108+
aio_module.await_only = _fake_await_only # type: ignore[assignment]
109+
try:
110+
dialect.do_terminate(adapter)
111+
finally:
112+
aio_module.await_only = orig # type: ignore[assignment]
113+
114+
assert fake.rollback_calls == 0
115+
assert fake.close_calls == 1

0 commit comments

Comments
 (0)