Skip to content

Commit 5e57d96

Browse files
fix: roll back open transactions when returning connections to pool
When a connection with _in_transaction=True is returned to the pool (via _release() or acquire()'s exception handler), the pool now issues ROLLBACK before recycling. If the ROLLBACK fails, the connection is destroyed instead of being returned dirty. This prevents a connection with an open transaction from being handed to the next caller, who would unknowingly operate inside someone else's stale transaction. Also adds a public in_transaction property to DqliteConnection. Fixes #101 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 50a7f27 commit 5e57d96

3 files changed

Lines changed: 109 additions & 6 deletions

File tree

src/dqliteclient/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ def is_connected(self) -> bool:
9292
"""Check if connected."""
9393
return self._protocol is not None
9494

95+
@property
96+
def in_transaction(self) -> bool:
97+
"""Check if a transaction is active."""
98+
return self._in_transaction
99+
95100
async def connect(self) -> None:
96101
"""Establish connection to the database."""
97102
self._check_in_use()

src/dqliteclient/pool.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
148148
except BaseException:
149149
if conn.is_connected and not self._closed:
150150
# Connection is healthy — user code raised a non-connection error.
151-
# Return it to the pool instead of destroying it.
151+
# Roll back any open transaction, then return to pool.
152+
if not await self._reset_connection(conn):
153+
with contextlib.suppress(Exception):
154+
await conn.close()
155+
self._size -= 1
156+
raise
152157
try:
153158
self._pool.put_nowait(conn)
154159
except asyncio.QueueFull:
@@ -165,17 +170,38 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
165170
else:
166171
await self._release(conn)
167172

173+
async def _reset_connection(self, conn: DqliteConnection) -> bool:
174+
"""Roll back any open transaction before returning to pool.
175+
176+
Returns True if the connection is clean and can be reused,
177+
False if it should be destroyed.
178+
"""
179+
if conn._in_transaction is True:
180+
try:
181+
await conn.execute("ROLLBACK")
182+
except Exception:
183+
return False
184+
conn._in_transaction = False
185+
return True
186+
168187
async def _release(self, conn: DqliteConnection) -> None:
169188
"""Return a connection to the pool or close it."""
170189
if self._closed:
171190
await conn.close()
172191
self._size -= 1
173-
else:
174-
try:
175-
self._pool.put_nowait(conn)
176-
except asyncio.QueueFull:
192+
return
193+
194+
if not await self._reset_connection(conn):
195+
with contextlib.suppress(Exception):
177196
await conn.close()
178-
self._size -= 1
197+
self._size -= 1
198+
return
199+
200+
try:
201+
self._pool.put_nowait(conn)
202+
except asyncio.QueueFull:
203+
await conn.close()
204+
self._size -= 1
179205

180206
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
181207
"""Execute a SQL statement using a pooled connection."""

tests/test_pool.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,78 @@ async def test_pool_has_no_in_use_set(self) -> None:
568568
"Pool._in_use set should have been removed — it was dead code (written but never read)"
569569
)
570570

571+
async def test_release_rolls_back_open_transaction(self) -> None:
572+
"""Returning a connection with _in_transaction=True must issue ROLLBACK."""
573+
pool = ConnectionPool(["localhost:9001"], max_size=1)
574+
575+
mock_conn = MagicMock()
576+
mock_conn.is_connected = True
577+
mock_conn.connect = AsyncMock()
578+
mock_conn.close = AsyncMock()
579+
mock_conn._in_transaction = False
580+
mock_conn._in_use = False
581+
mock_conn._bound_loop = None
582+
mock_conn._check_in_use = MagicMock()
583+
584+
call_log: list[str] = []
585+
586+
async def mock_execute(sql, params=None):
587+
call_log.append(sql)
588+
return (0, 0)
589+
590+
mock_conn.execute = mock_execute
591+
592+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
593+
await pool.initialize()
594+
595+
# Simulate: user enters transaction but the context manager exit
596+
# somehow leaves _in_transaction=True (e.g., a bug or raw BEGIN)
597+
async with pool.acquire() as conn:
598+
conn._in_transaction = True
599+
600+
# The pool should have issued ROLLBACK before returning to queue
601+
assert "ROLLBACK" in call_log, (
602+
f"Pool should issue ROLLBACK for dirty connections, calls: {call_log}"
603+
)
604+
# And the flag should be reset
605+
assert not mock_conn._in_transaction
606+
607+
await pool.close()
608+
609+
async def test_release_destroys_connection_if_rollback_fails(self) -> None:
610+
"""If ROLLBACK fails on dirty release, connection must be destroyed."""
611+
pool = ConnectionPool(["localhost:9001"], max_size=1)
612+
613+
mock_conn = MagicMock()
614+
mock_conn.is_connected = True
615+
mock_conn.connect = AsyncMock()
616+
mock_conn.close = AsyncMock()
617+
mock_conn._in_transaction = False
618+
mock_conn._in_use = False
619+
mock_conn._bound_loop = None
620+
mock_conn._check_in_use = MagicMock()
621+
622+
async def failing_execute(sql, params=None):
623+
if "ROLLBACK" in sql:
624+
raise Exception("connection lost")
625+
return (0, 0)
626+
627+
mock_conn.execute = failing_execute
628+
629+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
630+
await pool.initialize()
631+
632+
initial_size = pool._size
633+
634+
async with pool.acquire() as conn:
635+
conn._in_transaction = True
636+
637+
# ROLLBACK failed, so connection should be destroyed
638+
mock_conn.close.assert_called()
639+
assert pool._size == initial_size - 1
640+
641+
await pool.close()
642+
571643

572644
class TestConnectionPoolIntegration:
573645
"""Integration tests requiring mocked connections."""

0 commit comments

Comments
 (0)