Skip to content

Commit 9bf6304

Browse files
Cumulative row cap + atomicity docstring on query paths
ISSUE-05 — DqliteProtocol now enforces a max_total_rows cap (default 10_000_000) across continuation frames. A server that drip-feeds 1 row per frame inside the per-operation deadline used to be able to have the client accumulate arbitrary rows; the cap bounds that in addition to the existing wall-clock deadline. Set max_total_rows=None to disable. ISSUE-03 — document the atomicity post-condition of query_sql_typed: a mid-stream failure raises before any rows are returned; the local row list is discarded; the connection is invalidated by the enclosing _run_protocol path. No code change — the invariant already held, we're making it explicit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 152ca5b commit 9bf6304

1 file changed

Lines changed: 30 additions & 0 deletions

File tree

src/dqliteclient/protocol.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,20 @@ def __init__(
3636
reader: asyncio.StreamReader,
3737
writer: asyncio.StreamWriter,
3838
timeout: float = 15.0,
39+
max_total_rows: int | None = 10_000_000,
3940
) -> None:
4041
self._reader = reader
4142
self._writer = writer
4243
self._decoder = MessageDecoder(is_request=False)
4344
self._client_id = 0
4445
self._heartbeat_timeout = 0
4546
self._timeout = timeout
47+
# Cumulative cap across continuation frames for a single query.
48+
# A hostile or buggy server can drip-feed 1-row-per-frame inside
49+
# the per-operation deadline; without a cumulative cap, clients
50+
# could legitimately allocate hundreds of millions of rows over
51+
# the full deadline. None disables the cap.
52+
self._max_total_rows = max_total_rows
4653

4754
async def handshake(self, client_id: int | None = None) -> int:
4855
"""Perform protocol handshake.
@@ -180,6 +187,11 @@ async def query_sql_typed(
180187
column_types are the wire-level ``ValueType`` integer tags from the
181188
first response frame — what DBAPI cursor.description maps into
182189
``type_code``.
190+
191+
Atomicity: a mid-stream server failure or an unexpected message
192+
type raises before any rows are returned to the caller; the
193+
local row list is discarded. The connection is invalidated so
194+
callers don't accidentally reuse it with torn protocol state.
183195
"""
184196
request = QuerySqlRequest(db_id=db_id, sql=sql, params=params if params is not None else [])
185197
self._writer.write(request.encode())
@@ -197,6 +209,17 @@ async def query_sql_typed(
197209
all_rows = list(response.rows)
198210
while response.has_more:
199211
next_response = await self._read_continuation(deadline=deadline)
212+
if not next_response.rows and next_response.has_more:
213+
raise ProtocolError(
214+
"ROWS continuation made no progress: frame had 0 rows and has_more=True"
215+
)
216+
if self._max_total_rows is not None and (
217+
len(all_rows) + len(next_response.rows) > self._max_total_rows
218+
):
219+
raise ProtocolError(
220+
f"Query exceeded max_total_rows cap ({self._max_total_rows}); "
221+
f"reduce result size or raise the cap on the connection/pool."
222+
)
200223
all_rows.extend(next_response.rows)
201224
response = next_response
202225
return column_names, column_types, all_rows
@@ -245,6 +268,13 @@ async def query_sql(
245268
raise ProtocolError(
246269
"ROWS continuation made no progress: frame had 0 rows and has_more=True"
247270
)
271+
if self._max_total_rows is not None and (
272+
len(all_rows) + len(next_response.rows) > self._max_total_rows
273+
):
274+
raise ProtocolError(
275+
f"Query exceeded max_total_rows cap ({self._max_total_rows}); "
276+
f"reduce result size or raise the cap on the connection/pool."
277+
)
248278
all_rows.extend(next_response.rows)
249279
response = next_response
250280

0 commit comments

Comments
 (0)