Skip to content

Commit 9364008

Browse files
feat: add query_raw_typed to expose per-column wire types
query_raw() returned (names, rows) only. DBAPI cursors using it had no way to populate cursor.description[i][1] (type_code) without re-reading the RowsResponse frame themselves. Add query_sql_typed on DqliteProtocol and query_raw_typed on DqliteConnection returning (column_names, column_types, rows). Keep the existing query_raw unchanged for backward compat. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 40909eb commit 9364008

3 files changed

Lines changed: 66 additions & 0 deletions

File tree

src/dqliteclient/connection.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,17 @@ async def query_raw(
250250
"""
251251
return await self._run_protocol(lambda p, db: p.query_sql(db, sql, params))
252252

253+
async def query_raw_typed(
254+
self, sql: str, params: Sequence[Any] | None = None
255+
) -> tuple[list[str], list[int], list[list[Any]]]:
256+
"""Execute a query and return (column_names, column_types, rows).
257+
258+
``column_types`` are per-column wire ``ValueType`` integer tags
259+
from the first response frame — suitable for populating DBAPI
260+
``cursor.description[i][1]`` (``type_code``).
261+
"""
262+
return await self._run_protocol(lambda p, db: p.query_sql_typed(db, sql, params))
263+
253264
async def fetch(self, sql: str, params: Sequence[Any] | None = None) -> list[dict[str, Any]]:
254265
"""Execute a query and return results as list of dicts."""
255266
columns, rows = await self._run_protocol(lambda p, db: p.query_sql(db, sql, params))

src/dqliteclient/protocol.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,35 @@ async def exec_sql(
171171

172172
return response.last_insert_id, response.rows_affected
173173

174+
async def query_sql_typed(
175+
self, db_id: int, sql: str, params: Sequence[Any] | None = None
176+
) -> tuple[list[str], list[int], list[list[Any]]]:
177+
"""Execute a query and return (column_names, column_types, rows).
178+
179+
column_types are the wire-level ``ValueType`` integer tags from the
180+
first response frame — what DBAPI cursor.description maps into
181+
``type_code``.
182+
"""
183+
request = QuerySqlRequest(db_id=db_id, sql=sql, params=params if params is not None else [])
184+
self._writer.write(request.encode())
185+
await self._send()
186+
187+
deadline = self._operation_deadline()
188+
response = await self._read_response(deadline=deadline)
189+
if isinstance(response, FailureResponse):
190+
raise OperationalError(response.code, response.message)
191+
if not isinstance(response, RowsResponse):
192+
raise ProtocolError(f"Expected RowsResponse, got {type(response).__name__}")
193+
194+
column_names = list(response.column_names)
195+
column_types = [int(t) for t in response.column_types]
196+
all_rows = list(response.rows)
197+
while response.has_more:
198+
next_response = await self._read_continuation(deadline=deadline)
199+
all_rows.extend(next_response.rows)
200+
response = next_response
201+
return column_names, column_types, all_rows
202+
174203
async def query_sql(
175204
self, db_id: int, sql: str, params: Sequence[Any] | None = None
176205
) -> tuple[list[str], list[list[Any]]]:
@@ -179,6 +208,8 @@ async def query_sql(
179208
Returns (column_names, rows). Multi-statement SELECT is rejected
180209
by the server with OperationalError(SQLITE_ERROR, "nonempty
181210
statement tail") — there are no additional result sets to drain.
211+
Use :meth:`query_sql_typed` to also get per-column ``ValueType``
212+
tags.
182213
"""
183214
request = QuerySqlRequest(db_id=db_id, sql=sql, params=params if params is not None else [])
184215
self._writer.write(request.encode())

tests/test_protocol.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,30 @@ async def test_query_sql_raises_if_continuation_has_no_progress(
325325
with pytest.raises(ProtocolError, match="no progress|no rows"):
326326
await protocol.query_sql(1, "SELECT x FROM wide_table")
327327

328+
async def test_query_sql_typed_returns_column_types(
329+
self,
330+
protocol: DqliteProtocol,
331+
mock_reader: AsyncMock,
332+
) -> None:
333+
"""query_sql_typed returns the wire ValueType ints alongside names+rows,
334+
so DBAPI cursors can populate cursor.description[i][1] (type_code).
335+
"""
336+
from dqlitewire.constants import ValueType
337+
from dqlitewire.messages import RowsResponse
338+
339+
response = RowsResponse(
340+
column_names=["a", "b"],
341+
column_types=[ValueType.INTEGER, ValueType.TEXT],
342+
rows=[[1, "x"]],
343+
has_more=False,
344+
)
345+
mock_reader.read.return_value = response.encode()
346+
347+
names, types, rows = await protocol.query_sql_typed(1, "SELECT a, b FROM t")
348+
assert names == ["a", "b"]
349+
assert types == [int(ValueType.INTEGER), int(ValueType.TEXT)]
350+
assert rows == [[1, "x"]]
351+
328352
async def test_query_sql(
329353
self,
330354
protocol: DqliteProtocol,

0 commit comments

Comments
 (0)