Skip to content

Commit 1265bb2

Browse files
test(sqlalchemy): pin async executemany forwarding invariants (ISSUE-92)
Adds integration coverage for ``AsyncAdaptedCursor.executemany`` — previously only the sync dialect had coverage via the SQLAlchemy test suite; the async adapter merely forwarded, with nothing pinning correctness. - ``test_async_executemany_rowcount_sum``: executemany sums rowcount across parameter sets. - ``test_async_executemany_sequential_tasks_no_state_leak``: two sequential async tasks on the same engine each get a clean cursor; no cross-task row contamination. Sequential rather than concurrent because dqlite serializes writes via Raft and produces "database is locked" on parallel writers — a protocol-level quirk unrelated to adapter state correctness. Scope note in docstring: SQLAlchemy 2.0+ ``insertmanyvalues`` rewrites ``INSERT ... RETURNING`` + sequence params into a single multi-row INSERT that dqlite's gateway rejects with "nonempty statement tail". RETURNING + executemany is therefore tested only at the dbapi layer (``python-dqlite-dbapi/tests/integration/test_executemany_returning.py``).
1 parent e9539f2 commit 1265bb2

1 file changed

Lines changed: 98 additions & 0 deletions

File tree

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Integration tests for async-dialect executemany (ISSUE-92).
2+
3+
Pins ``AsyncAdaptedCursor.executemany`` against a real dqlite server:
4+
5+
- rowcount sums across parameter sets for plain DML.
6+
- Parallel tasks executing executemany do not cross-contaminate rows.
7+
- The async adapter forwards to the underlying dbapi cursor's
8+
executemany (which has its own RETURNING-accumulation tests — see
9+
python-dqlite-dbapi/tests/integration/test_executemany_returning.py).
10+
11+
Scope note: SQLAlchemy 2.0+ `insertmanyvalues` rewrites
12+
``INSERT ... RETURNING`` + sequence params into a single multi-row
13+
INSERT, which dqlite's gateway rejects with "nonempty statement tail"
14+
in some orderings. The RETURNING case is therefore tested at the
15+
lower dbapi layer only; the SQLAlchemy-layer test pins rowcount
16+
summation and parallel-task isolation, which is where the async
17+
adapter's forwarding invariants actually surface.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import pytest
23+
from sqlalchemy import text
24+
from sqlalchemy.ext.asyncio import create_async_engine
25+
26+
27+
@pytest.mark.integration
28+
class TestAsyncExecutemany:
29+
async def test_async_executemany_rowcount_sum(self, async_engine_url: str) -> None:
30+
"""Plain (non-returning) executemany sums rowcount across iterations."""
31+
engine = create_async_engine(async_engine_url)
32+
try:
33+
async with engine.begin() as conn:
34+
await conn.execute(text("DROP TABLE IF EXISTS async_em_rc"))
35+
await conn.execute(
36+
text("CREATE TABLE async_em_rc (id INTEGER PRIMARY KEY, val TEXT)")
37+
)
38+
result = await conn.execute(
39+
text("INSERT INTO async_em_rc (val) VALUES (:v)"),
40+
[{"v": f"r{i}"} for i in range(5)],
41+
)
42+
assert result.rowcount == 5
43+
44+
count_result = await conn.execute(text("SELECT COUNT(*) FROM async_em_rc"))
45+
assert count_result.scalar() == 5
46+
finally:
47+
await engine.dispose()
48+
49+
async def test_async_executemany_sequential_tasks_no_state_leak(
50+
self, async_engine_url: str
51+
) -> None:
52+
"""Sequential executemany from two async tasks (same engine)
53+
must not leak cursor state across tasks. Guards against any
54+
shared-mutable-state regression in the async adapter's
55+
per-cursor row accumulation.
56+
57+
Sequential (not concurrent) because dqlite serializes writes
58+
via Raft — two concurrent writers produce ``database is
59+
locked``, a protocol-level quirk unrelated to adapter state
60+
correctness."""
61+
engine = create_async_engine(async_engine_url)
62+
try:
63+
async with engine.begin() as conn:
64+
await conn.execute(text("DROP TABLE IF EXISTS async_em_sequential"))
65+
await conn.execute(
66+
text(
67+
"CREATE TABLE async_em_sequential "
68+
"(id INTEGER PRIMARY KEY, owner TEXT, val INTEGER)"
69+
)
70+
)
71+
72+
async def worker(owner: str, n: int) -> int:
73+
async with engine.begin() as conn:
74+
result = await conn.execute(
75+
text("INSERT INTO async_em_sequential (owner, val) VALUES (:o, :v)"),
76+
[{"o": owner, "v": i} for i in range(n)],
77+
)
78+
return int(result.rowcount)
79+
80+
# Run workers sequentially, each inside its own engine.begin()
81+
# transaction. The async adapter must give each a clean cursor.
82+
a_rc = await worker("a", 3)
83+
b_rc = await worker("b", 4)
84+
85+
assert a_rc == 3
86+
assert b_rc == 4
87+
88+
async with engine.begin() as conn:
89+
result = await conn.execute(
90+
text("SELECT owner, COUNT(*) FROM async_em_sequential GROUP BY owner")
91+
)
92+
counts = dict(result.fetchall())
93+
assert counts == {"a": 3, "b": 4}, (
94+
f"executemany leaked rows between sequential tasks: "
95+
f"expected {{'a': 3, 'b': 4}}, got {counts}"
96+
)
97+
finally:
98+
await engine.dispose()

0 commit comments

Comments
 (0)