|
| 1 | +"""Tests for async adapter cursor behavior.""" |
| 2 | + |
| 3 | +import ast |
| 4 | +import inspect |
| 5 | +import textwrap |
| 6 | +from collections import deque |
| 7 | +from unittest.mock import MagicMock, patch |
| 8 | + |
| 9 | +from sqlalchemydqlite.aio import AsyncAdaptedConnection, AsyncAdaptedCursor |
| 10 | + |
| 11 | + |
| 12 | +def _make_cursor() -> AsyncAdaptedCursor: |
| 13 | + """Create an AsyncAdaptedCursor with a mocked connection.""" |
| 14 | + mock_conn = MagicMock() |
| 15 | + adapted_conn = AsyncAdaptedConnection.__new__(AsyncAdaptedConnection) |
| 16 | + adapted_conn._connection = mock_conn |
| 17 | + cursor = AsyncAdaptedCursor(adapted_conn) |
| 18 | + return cursor |
| 19 | + |
| 20 | + |
| 21 | +def _run_sync(coro_or_value: object) -> object: |
| 22 | + """Replacement for await_only that resolves coroutines synchronously.""" |
| 23 | + if hasattr(coro_or_value, "__await__") or hasattr(coro_or_value, "cr_await"): |
| 24 | + # It's a coroutine -- we can't actually await it in sync context, |
| 25 | + # but our mocks return plain values, so send(None) is enough. |
| 26 | + try: |
| 27 | + coro_or_value.send(None) # type: ignore[union-attr] |
| 28 | + except StopIteration as e: |
| 29 | + return e.value |
| 30 | + return coro_or_value |
| 31 | + |
| 32 | + |
| 33 | +class TestAsyncAdaptedCursorRowsCleared: |
| 34 | + def test_rows_cleared_after_non_query_execute(self) -> None: |
| 35 | + """After a SELECT then an INSERT, fetchone() must return None.""" |
| 36 | + cursor = _make_cursor() |
| 37 | + |
| 38 | + # Simulate that a previous SELECT populated _rows |
| 39 | + cursor._rows = deque([(1, "alice")]) |
| 40 | + |
| 41 | + # Set up a mock inner cursor that returns no description (DML) |
| 42 | + mock_inner = MagicMock() |
| 43 | + mock_inner.description = None |
| 44 | + mock_inner.lastrowid = 1 |
| 45 | + mock_inner.rowcount = 1 |
| 46 | + mock_inner.execute.return_value = None |
| 47 | + mock_inner.close.return_value = None |
| 48 | + cursor._connection.cursor.return_value = mock_inner |
| 49 | + |
| 50 | + with patch("sqlalchemydqlite.aio.await_only", side_effect=_run_sync): |
| 51 | + cursor.execute("INSERT INTO t VALUES (1)") |
| 52 | + |
| 53 | + result = cursor.fetchone() |
| 54 | + assert result is None, f"Expected None after non-query execute, got {result}" |
| 55 | + |
| 56 | + def test_rows_cleared_after_executemany(self) -> None: |
| 57 | + """After executemany(), fetchone() must return None.""" |
| 58 | + cursor = _make_cursor() |
| 59 | + |
| 60 | + # Simulate that a previous SELECT populated _rows |
| 61 | + cursor._rows = deque([(1, "alice"), (2, "bob")]) |
| 62 | + |
| 63 | + mock_inner = MagicMock() |
| 64 | + mock_inner.lastrowid = 3 |
| 65 | + mock_inner.rowcount = 2 |
| 66 | + mock_inner.executemany.return_value = None |
| 67 | + mock_inner.close.return_value = None |
| 68 | + cursor._connection.cursor.return_value = mock_inner |
| 69 | + |
| 70 | + with patch("sqlalchemydqlite.aio.await_only", side_effect=_run_sync): |
| 71 | + cursor.executemany("INSERT INTO t VALUES (?)", [(1,), (2,)]) |
| 72 | + |
| 73 | + result = cursor.fetchone() |
| 74 | + assert result is None, f"Expected None after executemany, got {result}" |
| 75 | + |
| 76 | + |
| 77 | +def _has_finally_with_close(func: object) -> bool: |
| 78 | + """Check if a function has cursor.close() inside a finally block.""" |
| 79 | + source = textwrap.dedent(inspect.getsource(func)) |
| 80 | + tree = ast.parse(source) |
| 81 | + |
| 82 | + for node in ast.walk(tree): |
| 83 | + if isinstance(node, ast.Try) and node.finalbody: |
| 84 | + for stmt in ast.walk(node): |
| 85 | + if isinstance(stmt, ast.Call): |
| 86 | + func_node = stmt.func |
| 87 | + if isinstance(func_node, ast.Attribute) and func_node.attr == "close": |
| 88 | + return True |
| 89 | + return False |
| 90 | + |
| 91 | + |
| 92 | +class TestAsyncAdaptedCursorCleanup: |
| 93 | + def test_cursor_closed_on_execute_error(self) -> None: |
| 94 | + """Underlying cursor must be closed even if execute() raises.""" |
| 95 | + assert _has_finally_with_close(AsyncAdaptedCursor.execute), ( |
| 96 | + "cursor.close() should be in a finally block to prevent leaks on error" |
| 97 | + ) |
| 98 | + |
| 99 | + def test_executemany_cursor_closed_on_error(self) -> None: |
| 100 | + """Underlying cursor must be closed even if executemany() raises.""" |
| 101 | + assert _has_finally_with_close(AsyncAdaptedCursor.executemany), ( |
| 102 | + "cursor.close() should be in a finally block to prevent leaks on error" |
| 103 | + ) |
0 commit comments