@@ -231,3 +231,158 @@ async def test_connection_invalidated_after_protocol_error(self) -> None:
231231
232232 # Connection should be invalidated
233233 assert not conn .is_connected
234+
235+ async def test_fetchone_returns_first_row (self ) -> None :
236+ conn = DqliteConnection ("localhost:9001" )
237+
238+ mock_reader = AsyncMock ()
239+ mock_writer = MagicMock ()
240+ mock_writer .drain = AsyncMock ()
241+ mock_writer .close = MagicMock ()
242+ mock_writer .wait_closed = AsyncMock ()
243+
244+ from dqlitewire .constants import ValueType
245+ from dqlitewire .messages import DbResponse , RowsResponse , WelcomeResponse
246+
247+ responses = [
248+ WelcomeResponse (heartbeat_timeout = 15000 ).encode (),
249+ DbResponse (db_id = 1 ).encode (),
250+ RowsResponse (
251+ column_names = ["id" , "name" ],
252+ column_types = [ValueType .INTEGER , ValueType .TEXT ],
253+ rows = [[1 , "first" ], [2 , "second" ]],
254+ has_more = False ,
255+ ).encode (),
256+ ]
257+ mock_reader .read .side_effect = responses
258+
259+ with patch ("asyncio.open_connection" , return_value = (mock_reader , mock_writer )):
260+ await conn .connect ()
261+
262+ mock_reader .read .side_effect = [responses [2 ]]
263+ result = await conn .fetchone ("SELECT * FROM t" )
264+ assert result == {"id" : 1 , "name" : "first" }
265+
266+ async def test_fetchone_returns_none_for_empty (self ) -> None :
267+ conn = DqliteConnection ("localhost:9001" )
268+
269+ mock_reader = AsyncMock ()
270+ mock_writer = MagicMock ()
271+ mock_writer .drain = AsyncMock ()
272+ mock_writer .close = MagicMock ()
273+ mock_writer .wait_closed = AsyncMock ()
274+
275+ from dqlitewire .constants import ValueType
276+ from dqlitewire .messages import DbResponse , RowsResponse , WelcomeResponse
277+
278+ responses = [
279+ WelcomeResponse (heartbeat_timeout = 15000 ).encode (),
280+ DbResponse (db_id = 1 ).encode (),
281+ ]
282+ mock_reader .read .side_effect = responses
283+
284+ with patch ("asyncio.open_connection" , return_value = (mock_reader , mock_writer )):
285+ await conn .connect ()
286+
287+ empty_response = RowsResponse (
288+ column_names = ["id" ],
289+ column_types = [ValueType .INTEGER ],
290+ rows = [],
291+ has_more = False ,
292+ ).encode ()
293+ mock_reader .read .side_effect = [empty_response ]
294+ result = await conn .fetchone ("SELECT * FROM t WHERE 1=0" )
295+ assert result is None
296+
297+ async def test_fetchall_returns_lists (self ) -> None :
298+ conn = DqliteConnection ("localhost:9001" )
299+
300+ mock_reader = AsyncMock ()
301+ mock_writer = MagicMock ()
302+ mock_writer .drain = AsyncMock ()
303+ mock_writer .close = MagicMock ()
304+ mock_writer .wait_closed = AsyncMock ()
305+
306+ from dqlitewire .constants import ValueType
307+ from dqlitewire .messages import DbResponse , RowsResponse , WelcomeResponse
308+
309+ responses = [
310+ WelcomeResponse (heartbeat_timeout = 15000 ).encode (),
311+ DbResponse (db_id = 1 ).encode (),
312+ ]
313+ mock_reader .read .side_effect = responses
314+
315+ with patch ("asyncio.open_connection" , return_value = (mock_reader , mock_writer )):
316+ await conn .connect ()
317+
318+ rows_response = RowsResponse (
319+ column_names = ["id" , "name" ],
320+ column_types = [ValueType .INTEGER , ValueType .TEXT ],
321+ rows = [[1 , "a" ], [2 , "b" ]],
322+ has_more = False ,
323+ ).encode ()
324+ mock_reader .read .side_effect = [rows_response ]
325+ result = await conn .fetchall ("SELECT * FROM t" )
326+ assert result == [[1 , "a" ], [2 , "b" ]]
327+
328+ async def test_fetchval_returns_first_column (self ) -> None :
329+ conn = DqliteConnection ("localhost:9001" )
330+
331+ mock_reader = AsyncMock ()
332+ mock_writer = MagicMock ()
333+ mock_writer .drain = AsyncMock ()
334+ mock_writer .close = MagicMock ()
335+ mock_writer .wait_closed = AsyncMock ()
336+
337+ from dqlitewire .constants import ValueType
338+ from dqlitewire .messages import DbResponse , RowsResponse , WelcomeResponse
339+
340+ responses = [
341+ WelcomeResponse (heartbeat_timeout = 15000 ).encode (),
342+ DbResponse (db_id = 1 ).encode (),
343+ ]
344+ mock_reader .read .side_effect = responses
345+
346+ with patch ("asyncio.open_connection" , return_value = (mock_reader , mock_writer )):
347+ await conn .connect ()
348+
349+ rows_response = RowsResponse (
350+ column_names = ["count" ],
351+ column_types = [ValueType .INTEGER ],
352+ rows = [[42 ]],
353+ has_more = False ,
354+ ).encode ()
355+ mock_reader .read .side_effect = [rows_response ]
356+ result = await conn .fetchval ("SELECT count(*) FROM t" )
357+ assert result == 42
358+
359+ async def test_fetchval_returns_none_for_empty (self ) -> None :
360+ conn = DqliteConnection ("localhost:9001" )
361+
362+ mock_reader = AsyncMock ()
363+ mock_writer = MagicMock ()
364+ mock_writer .drain = AsyncMock ()
365+ mock_writer .close = MagicMock ()
366+ mock_writer .wait_closed = AsyncMock ()
367+
368+ from dqlitewire .constants import ValueType
369+ from dqlitewire .messages import DbResponse , RowsResponse , WelcomeResponse
370+
371+ responses = [
372+ WelcomeResponse (heartbeat_timeout = 15000 ).encode (),
373+ DbResponse (db_id = 1 ).encode (),
374+ ]
375+ mock_reader .read .side_effect = responses
376+
377+ with patch ("asyncio.open_connection" , return_value = (mock_reader , mock_writer )):
378+ await conn .connect ()
379+
380+ empty_response = RowsResponse (
381+ column_names = ["id" ],
382+ column_types = [ValueType .INTEGER ],
383+ rows = [],
384+ has_more = False ,
385+ ).encode ()
386+ mock_reader .read .side_effect = [empty_response ]
387+ result = await conn .fetchval ("SELECT id FROM t WHERE 1=0" )
388+ assert result is None
0 commit comments