Skip to content

Commit 863732b

Browse files
fix(dbapi): accumulate rows across executemany iterations (ISSUE-57)
For DML statements with a RETURNING clause (or any result-producing iteration), executemany previously overwrote _rows / _description on every param-set iteration — so fetchall() after executemany only returned rows from the LAST parameter set, silently dropping the rest. This broke SQLAlchemy's insert_executemany_returning optimization for bulk inserts. Accumulate rows and preserve description across iterations. Add an integration test that verifies executemany+RETURNING returns every row from every parameter set in insert order, for both sync and async cursors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9424361 commit 863732b

File tree

3 files changed

+75
-0
lines changed

3 files changed

+75
-0
lines changed

src/dqlitedbapi/aio/cursor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,32 @@ async def executemany(
140140
An empty ``seq_of_parameters`` must not leave stale SELECT
141141
state around: reset description / rows so callers can't
142142
confuse an empty executemany with a preceding SELECT.
143+
144+
For statements with a RETURNING clause, rows produced by each
145+
iteration are accumulated into ``_rows`` so a subsequent
146+
``fetchall`` yields every returned row across parameter sets.
143147
"""
144148
self._check_closed()
145149

146150
self._description = None
147151
self._rows = []
148152
self._row_index = 0
149153
total_affected = 0
154+
accumulated_rows: list[tuple[Any, ...]] = []
155+
accumulated_desc: list[tuple[str, int | None, None, None, None, None, None]] | None = None
150156
for params in seq_of_parameters:
151157
await self.execute(operation, params)
152158
if self._rowcount >= 0:
153159
total_affected += self._rowcount
160+
if self._description is not None:
161+
if accumulated_desc is None:
162+
accumulated_desc = self._description
163+
accumulated_rows.extend(self._rows)
154164
self._rowcount = total_affected
165+
if accumulated_desc is not None:
166+
self._description = accumulated_desc
167+
self._rows = accumulated_rows
168+
self._row_index = 0
155169
return self
156170

157171
def _check_result_set(self) -> None:

src/dqlitedbapi/cursor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,33 @@ async def _executemany_async(
265265
state around: reset description / rows to None / empty so
266266
callers can't confuse an empty executemany with a preceding
267267
SELECT.
268+
269+
For statements with a RETURNING clause (or any result-producing
270+
DML), each iteration's rows are accumulated so ``fetchall`` at
271+
the end yields every returned row across all parameter sets.
272+
Without the accumulation, ``_execute_async`` would overwrite
273+
``_rows`` on each iteration and only the rows from the last
274+
parameter set would survive.
268275
"""
269276
self._description = None
270277
self._rows = []
271278
self._row_index = 0
272279
total_affected = 0
280+
accumulated_rows: list[tuple[Any, ...]] = []
281+
accumulated_desc: list[tuple[str, int | None, None, None, None, None, None]] | None = None
273282
for params in seq_of_parameters:
274283
await self._execute_async(operation, params)
275284
if self._rowcount >= 0:
276285
total_affected += self._rowcount
286+
if self._description is not None:
287+
if accumulated_desc is None:
288+
accumulated_desc = self._description
289+
accumulated_rows.extend(self._rows)
277290
self._rowcount = total_affected
291+
if accumulated_desc is not None:
292+
self._description = accumulated_desc
293+
self._rows = accumulated_rows
294+
self._row_index = 0
278295

279296
def _check_result_set(self) -> None:
280297
if self._description is None:
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Integration test for ISSUE-57: executemany must accumulate rows from
2+
every parameter set when the statement has a RETURNING clause."""
3+
4+
import pytest
5+
6+
import dqlitedbapi
7+
from dqlitedbapi.aio import aconnect
8+
9+
10+
@pytest.mark.integration
11+
def test_sync_executemany_returning_accumulates_rows(cluster_address: str) -> None:
12+
with dqlitedbapi.connect(cluster_address) as conn:
13+
cur = conn.cursor()
14+
cur.execute("DROP TABLE IF EXISTS emany_ret")
15+
cur.execute("CREATE TABLE emany_ret (id INTEGER PRIMARY KEY, x TEXT)")
16+
params = [("a",), ("b",), ("c",)]
17+
cur.executemany("INSERT INTO emany_ret (x) VALUES (?) RETURNING id", params)
18+
assert cur.rowcount == 3
19+
rows = cur.fetchall()
20+
assert len(rows) == 3
21+
# ids must be 1, 2, 3 in insert order.
22+
assert [r[0] for r in rows] == [1, 2, 3]
23+
conn.rollback()
24+
25+
26+
@pytest.mark.integration
27+
@pytest.mark.asyncio
28+
async def test_async_executemany_returning_accumulates_rows(
29+
cluster_address: str,
30+
) -> None:
31+
conn = await aconnect(cluster_address)
32+
try:
33+
cur = conn.cursor()
34+
await cur.execute("DROP TABLE IF EXISTS emany_ret_async")
35+
await cur.execute("CREATE TABLE emany_ret_async (id INTEGER PRIMARY KEY, x TEXT)")
36+
params = [("alpha",), ("beta",), ("gamma",), ("delta",)]
37+
await cur.executemany("INSERT INTO emany_ret_async (x) VALUES (?) RETURNING id, x", params)
38+
assert cur.rowcount == 4
39+
rows = await cur.fetchall()
40+
assert len(rows) == 4
41+
assert [r[1] for r in rows] == ["alpha", "beta", "gamma", "delta"]
42+
await conn.rollback()
43+
finally:
44+
await conn.close()

0 commit comments

Comments
 (0)