Skip to content

Commit ab15a82

Browse files
Extract _send_query / _drain_continuations helpers
ISSUE (cycle 6) — query_sql and query_sql_typed shared near-identical bodies (send request, read response, validate type, drain continuations with progress + row-cap checks). Extracting the shared logic into _send_query and _drain_continuations removes the drift risk without changing behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 36fff71 commit ab15a82

1 file changed

Lines changed: 41 additions & 56 deletions

File tree

src/dqliteclient/protocol.py

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -179,19 +179,13 @@ async def exec_sql(
179179

180180
return response.last_insert_id, response.rows_affected
181181

182-
async def query_sql_typed(
183-
self, db_id: int, sql: str, params: Sequence[Any] | None = None
184-
) -> tuple[list[str], list[int], list[list[Any]]]:
185-
"""Execute a query and return (column_names, column_types, rows).
186-
187-
column_types are the wire-level ``ValueType`` integer tags from the
188-
first response frame — what DBAPI cursor.description maps into
189-
``type_code``.
182+
async def _send_query(
183+
self, db_id: int, sql: str, params: Sequence[Any] | None
184+
) -> tuple["RowsResponse", float]:
185+
"""Send a QUERY_SQL request and return the first RowsResponse + deadline.
190186
191-
Atomicity: a mid-stream server failure or an unexpected message
192-
type raises before any rows are returned to the caller; the
193-
local row list is discarded. The connection is invalidated so
194-
callers don't accidentally reuse it with torn protocol state.
187+
Raises OperationalError for server FailureResponse and ProtocolError
188+
for any other unexpected message type.
195189
"""
196190
request = QuerySqlRequest(db_id=db_id, sql=sql, params=params if params is not None else [])
197191
self._writer.write(request.encode())
@@ -203,10 +197,20 @@ async def query_sql_typed(
203197
raise OperationalError(response.code, response.message)
204198
if not isinstance(response, RowsResponse):
205199
raise ProtocolError(f"Expected RowsResponse, got {type(response).__name__}")
200+
return response, deadline
206201

207-
column_names = list(response.column_names)
208-
column_types = [int(t) for t in response.column_types]
209-
all_rows = list(response.rows)
202+
async def _drain_continuations(
203+
self, initial: "RowsResponse", deadline: float
204+
) -> list[list[Any]]:
205+
"""Drain all continuation frames, enforcing the progress + total-row caps.
206+
207+
Returns the full list of rows (initial frame first). A
208+
continuation claiming more rows with zero delivered, or a
209+
cumulative row count exceeding ``max_total_rows``, raises
210+
ProtocolError.
211+
"""
212+
all_rows = list(initial.rows)
213+
response = initial
210214
while response.has_more:
211215
next_response = await self._read_continuation(deadline=deadline)
212216
if not next_response.rows and next_response.has_more:
@@ -222,6 +226,26 @@ async def query_sql_typed(
222226
)
223227
all_rows.extend(next_response.rows)
224228
response = next_response
229+
return all_rows
230+
231+
async def query_sql_typed(
232+
self, db_id: int, sql: str, params: Sequence[Any] | None = None
233+
) -> tuple[list[str], list[int], list[list[Any]]]:
234+
"""Execute a query and return (column_names, column_types, rows).
235+
236+
column_types are the wire-level ``ValueType`` integer tags from the
237+
first response frame — what DBAPI cursor.description maps into
238+
``type_code``.
239+
240+
Atomicity: a mid-stream server failure or an unexpected message
241+
type raises before any rows are returned to the caller; the
242+
local row list is discarded. The connection is invalidated so
243+
callers don't accidentally reuse it with torn protocol state.
244+
"""
245+
response, deadline = await self._send_query(db_id, sql, params)
246+
column_names = list(response.column_names)
247+
column_types = [int(t) for t in response.column_types]
248+
all_rows = await self._drain_continuations(response, deadline)
225249
return column_names, column_types, all_rows
226250

227251
async def query_sql(
@@ -235,48 +259,9 @@ async def query_sql(
235259
Use :meth:`query_sql_typed` to also get per-column ``ValueType``
236260
tags.
237261
"""
238-
request = QuerySqlRequest(db_id=db_id, sql=sql, params=params if params is not None else [])
239-
self._writer.write(request.encode())
240-
await self._send()
241-
242-
# Single deadline spans the initial response plus every continuation
243-
# frame; otherwise a server that split a reply into N frames could
244-
# legitimately take N * self._timeout to complete.
245-
deadline = self._operation_deadline()
246-
response = await self._read_response(deadline=deadline)
247-
248-
if isinstance(response, FailureResponse):
249-
raise OperationalError(response.code, response.message)
250-
251-
if not isinstance(response, RowsResponse):
252-
raise ProtocolError(f"Expected RowsResponse, got {type(response).__name__}")
253-
262+
response, deadline = await self._send_query(db_id, sql, params)
254263
column_names = response.column_names
255-
256-
# Handle multi-part responses via decode_continuation(),
257-
# which decodes each continuation frame using the same layout
258-
# as the initial frame (column_count + column_names + rows +
259-
# marker), matching the C dqlite server's wire format.
260-
all_rows = list(response.rows)
261-
while response.has_more:
262-
next_response = await self._read_continuation(deadline=deadline)
263-
if not next_response.rows and next_response.has_more:
264-
# Server claimed "more coming" but delivered zero rows in a
265-
# continuation frame. That would spin forever (known
266-
# pathological case: column header larger than the server's
267-
# page buffer). Bail out instead of livelocking.
268-
raise ProtocolError(
269-
"ROWS continuation made no progress: frame had 0 rows and has_more=True"
270-
)
271-
if self._max_total_rows is not None and (
272-
len(all_rows) + len(next_response.rows) > self._max_total_rows
273-
):
274-
raise ProtocolError(
275-
f"Query exceeded max_total_rows cap ({self._max_total_rows}); "
276-
f"reduce result size or raise the cap on the connection/pool."
277-
)
278-
all_rows.extend(next_response.rows)
279-
response = next_response
264+
all_rows = await self._drain_continuations(response, deadline)
280265

281266
return column_names, all_rows
282267

0 commit comments

Comments
 (0)