Skip to content

Commit 3769fce

Browse files
fix(sqlalchemy): clear AsyncAdaptedCursor state before execute (ISSUE-37)
Reset description/rowcount/lastrowid and the buffered rows deque at the top of execute() and executemany(), before the await_only() that might raise. On a CancelledError (or any other exception) mid-call, the adapter cursor now reliably surfaces "no active result" rather than carrying stale rows from a previous successful execute — preventing silent data-integrity bugs when user code catches the exception and re-issues a query on the same cursor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e59de8a commit 3769fce

2 files changed

Lines changed: 125 additions & 5 deletions

File tree

src/sqlalchemydqlite/aio.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ def close(self) -> None:
4040
self._rows.clear()
4141

4242
def execute(self, operation: str, parameters: Any = None) -> Any:
43+
# Clear buffered state FIRST so a CancelledError (or any other
44+
# exception) during execute/fetchall leaves the adapter in a
45+
# "no active result" state rather than carrying stale rows
46+
# from a previous execution.
47+
self.description = None
48+
self.rowcount = -1
49+
self.lastrowid = None
50+
self._rows.clear()
51+
4352
cursor = self._connection.cursor()
4453
try:
4554
if parameters is not None:
@@ -49,24 +58,26 @@ def execute(self, operation: str, parameters: Any = None) -> Any:
4958

5059
if cursor.description:
5160
self.description = cursor.description
52-
self.lastrowid = self.rowcount = -1
5361
self._rows = deque(await_only(cursor.fetchall()))
5462
else:
55-
self.description = None
5663
self.lastrowid = cursor.lastrowid
5764
self.rowcount = cursor.rowcount
58-
self._rows.clear()
5965
finally:
6066
await_only(cursor.close())
6167

6268
def executemany(self, operation: str, seq_of_parameters: Any) -> Any:
69+
# Clear state up-front so cancellation mid-call doesn't leak
70+
# a previous execution's buffered rows.
71+
self.description = None
72+
self.rowcount = -1
73+
self.lastrowid = None
74+
self._rows.clear()
75+
6376
cursor = self._connection.cursor()
6477
try:
6578
await_only(cursor.executemany(operation, seq_of_parameters))
66-
self.description = None
6779
self.lastrowid = cursor.lastrowid
6880
self.rowcount = cursor.rowcount
69-
self._rows.clear()
7081
finally:
7182
await_only(cursor.close())
7283

tests/test_adapter_stale_rows.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""ISSUE-37: AsyncAdaptedCursor must not leak rows from a previous
2+
execute across an exception (including CancelledError)."""
3+
4+
import asyncio
5+
from collections import deque
6+
from types import SimpleNamespace
7+
from typing import Any
8+
from unittest.mock import MagicMock
9+
10+
import pytest
11+
12+
13+
class _FakeCursor:
14+
def __init__(self) -> None:
15+
self.description: Any = None
16+
self.rowcount = -1
17+
self.lastrowid: int | None = None
18+
19+
async def execute(self, *a: Any, **kw: Any) -> None:
20+
pass
21+
22+
async def executemany(self, *a: Any, **kw: Any) -> None:
23+
pass
24+
25+
async def fetchall(self) -> list[Any]:
26+
return []
27+
28+
async def close(self) -> None:
29+
pass
30+
31+
32+
class _FakeConnection:
33+
def __init__(self, cursor: _FakeCursor) -> None:
34+
self._cursor = cursor
35+
36+
def cursor(self) -> _FakeCursor:
37+
return self._cursor
38+
39+
40+
def _sync_await(coro: Any) -> Any:
41+
"""Run a coroutine synchronously; stand-in for ``await_only`` in
42+
tests so we don't need a greenlet context."""
43+
try:
44+
return asyncio.new_event_loop().run_until_complete(coro)
45+
except Exception:
46+
raise
47+
48+
49+
def test_stale_rows_cleared_when_execute_raises(monkeypatch: pytest.MonkeyPatch) -> None:
50+
from sqlalchemydqlite import aio as aio_mod
51+
from sqlalchemydqlite.aio import AsyncAdaptedCursor
52+
53+
monkeypatch.setattr(aio_mod, "await_only", _sync_await)
54+
55+
cursor = _FakeCursor()
56+
57+
async def boom(*a: Any, **kw: Any) -> None:
58+
raise RuntimeError("bang")
59+
60+
cursor.execute = boom # type: ignore[method-assign]
61+
conn_adapter = MagicMock()
62+
conn_adapter._connection = _FakeConnection(cursor)
63+
64+
adapter = AsyncAdaptedCursor(conn_adapter)
65+
# Seed stale state simulating a prior successful execute.
66+
adapter.description = [("a", None, None, None, None, None, None)]
67+
adapter._rows = deque([(1,), (2,)])
68+
adapter.rowcount = 2
69+
adapter.lastrowid = 10
70+
71+
with pytest.raises(RuntimeError):
72+
adapter.execute("SELECT 1")
73+
74+
assert adapter.description is None
75+
assert adapter.rowcount == -1
76+
assert adapter.lastrowid is None
77+
assert list(adapter._rows) == []
78+
assert adapter.fetchone() is None # Not the stale (1,).
79+
80+
81+
def test_stale_rows_cleared_when_executemany_raises(
82+
monkeypatch: pytest.MonkeyPatch,
83+
) -> None:
84+
from sqlalchemydqlite import aio as aio_mod
85+
from sqlalchemydqlite.aio import AsyncAdaptedCursor
86+
87+
monkeypatch.setattr(aio_mod, "await_only", _sync_await)
88+
89+
cursor = _FakeCursor()
90+
91+
async def boom(*a: Any, **kw: Any) -> None:
92+
raise RuntimeError("bang")
93+
94+
cursor.executemany = boom # type: ignore[method-assign]
95+
conn_adapter = SimpleNamespace(_connection=_FakeConnection(cursor))
96+
97+
adapter = AsyncAdaptedCursor(conn_adapter) # type: ignore[arg-type]
98+
adapter.description = [("a", None, None, None, None, None, None)]
99+
adapter._rows = deque([(1,)])
100+
adapter.rowcount = 3
101+
adapter.lastrowid = 99
102+
103+
with pytest.raises(RuntimeError):
104+
adapter.executemany("INSERT INTO t VALUES (?)", [[1], [2]])
105+
106+
assert adapter.description is None
107+
assert adapter.rowcount == -1
108+
assert adapter.lastrowid is None
109+
assert list(adapter._rows) == []

0 commit comments

Comments
 (0)