|
4 | 4 | from unittest.mock import AsyncMock |
5 | 5 |
|
6 | 6 | import pytest |
| 7 | +from fastapi import Response |
7 | 8 | from redis.asyncio import Redis |
8 | 9 | from sqlalchemy.ext.asyncio import AsyncSession |
9 | 10 |
|
10 | | -from app.routes.health import db_health, redis_health |
| 11 | +from app.routes.health import ( |
| 12 | + _check_db, |
| 13 | + _check_redis, |
| 14 | + db_health, |
| 15 | + readiness, |
| 16 | + redis_health, |
| 17 | +) |
11 | 18 |
|
12 | 19 |
|
13 | 20 | @pytest.fixture |
@@ -53,3 +60,56 @@ async def test_db_health_executes_ping_query(fake_session: AsyncSession) -> None |
53 | 60 |
|
54 | 61 | assert result == {"db": "ok"} |
55 | 62 | cast(AsyncMock, fake_session.execute).assert_awaited_once() |
| 63 | + |
| 64 | + |
| 65 | +@pytest.mark.asyncio |
| 66 | +async def test_check_db_returns_down_on_exception(fake_session: AsyncSession) -> None: |
| 67 | + """_check_db возвращает db=down при исключении.""" |
| 68 | + cast(AsyncMock, fake_session.execute).side_effect = Exception("connection lost") |
| 69 | + result = await _check_db(fake_session) |
| 70 | + assert result == {"db": "down"} |
| 71 | + |
| 72 | + |
| 73 | +@pytest.mark.asyncio |
| 74 | +async def test_check_redis_returns_down_on_exception(fake_redis: Redis) -> None: |
| 75 | + """_check_redis возвращает redis=down при исключении.""" |
| 76 | + cast(AsyncMock, fake_redis.ping).side_effect = Exception("connection refused") |
| 77 | + result = await _check_redis(fake_redis) |
| 78 | + assert result == {"redis": "down"} |
| 79 | + |
| 80 | + |
| 81 | +@pytest.mark.asyncio |
| 82 | +async def test_readiness_200_when_both_ok( |
| 83 | + fake_redis: Redis, fake_session: AsyncSession |
| 84 | +) -> None: |
| 85 | + """Readiness возвращает 200 и ok для db и redis, когда оба источника доступны.""" |
| 86 | + cast(AsyncMock, fake_redis.ping).return_value = True |
| 87 | + response = Response() |
| 88 | + result = await readiness(redis=fake_redis, session=fake_session, response=response) |
| 89 | + assert result == {"db": "ok", "redis": "ok"} |
| 90 | + assert response.status_code == 200 |
| 91 | + |
| 92 | + |
| 93 | +@pytest.mark.asyncio |
| 94 | +async def test_readiness_503_when_db_down( |
| 95 | + fake_redis: Redis, fake_session: AsyncSession |
| 96 | +) -> None: |
| 97 | + """Readiness возвращает 503, когда БД недоступна.""" |
| 98 | + cast(AsyncMock, fake_redis.ping).return_value = True |
| 99 | + cast(AsyncMock, fake_session.execute).side_effect = Exception("db down") |
| 100 | + response = Response() |
| 101 | + result = await readiness(redis=fake_redis, session=fake_session, response=response) |
| 102 | + assert result == {"db": "down", "redis": "ok"} |
| 103 | + assert response.status_code == 503 |
| 104 | + |
| 105 | + |
| 106 | +@pytest.mark.asyncio |
| 107 | +async def test_readiness_503_when_redis_down( |
| 108 | + fake_redis: Redis, fake_session: AsyncSession |
| 109 | +) -> None: |
| 110 | + """Readiness возвращает 503, когда Redis недоступен.""" |
| 111 | + cast(AsyncMock, fake_redis.ping).side_effect = Exception("redis down") |
| 112 | + response = Response() |
| 113 | + result = await readiness(redis=fake_redis, session=fake_session, response=response) |
| 114 | + assert result == {"db": "ok", "redis": "down"} |
| 115 | + assert response.status_code == 503 |
0 commit comments