media-rip/backend/tests/test_health.py
xpltd efc2ead796 M001: media.rip() v1.0 — complete application
Full-featured self-hosted yt-dlp web frontend:
- Python 3.12+ / FastAPI backend with async SQLite, SSE transport, session isolation
- Vue 3 / TypeScript / Pinia frontend with real-time progress, theme picker
- 3 built-in themes (cyberpunk/dark/light) + drop-in custom theme system
- Admin auth (bcrypt), purge system, cookie upload, file serving
- Docker multi-stage build, GitHub Actions CI/CD
- 179 backend tests, 29 frontend tests (208 total)

Slices: S01 (Foundation), S02 (SSE+Sessions), S03 (Frontend),
        S04 (Admin+Auth), S05 (Themes), S06 (Docker+CI)
2026-03-18 20:00:17 -05:00

294 lines
10 KiB
Python

"""Tests for health endpoint, public config endpoint, and session-mode query layer.
Covers:
- GET /api/health — structure, types, queue_depth accuracy
- GET /api/config/public — safe fields present, sensitive fields excluded
- get_jobs_by_mode() — isolated/shared/open dispatching
- get_queue_depth() — counts only non-terminal jobs
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from app.core.database import (
create_job,
get_all_jobs,
get_jobs_by_mode,
get_queue_depth,
)
from app.models.job import Job, JobStatus
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_job(
session_id: str,
status: str = "queued",
url: str = "https://example.com/video",
) -> Job:
"""Create a Job model with a random ID and given session/status."""
return Job(
id=str(uuid.uuid4()),
session_id=session_id,
url=url,
status=status,
created_at=datetime.now(timezone.utc).isoformat(),
)
# ===========================================================================
# Health endpoint tests
# ===========================================================================
class TestHealthEndpoint:
"""GET /api/health returns correct structure and values."""
@pytest.mark.anyio
async def test_health_returns_correct_structure(self, client):
resp = await client.get("/api/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert isinstance(data["version"], str) and len(data["version"]) > 0
assert isinstance(data["yt_dlp_version"], str) and len(data["yt_dlp_version"]) > 0
assert isinstance(data["uptime"], (int, float)) and data["uptime"] >= 0
assert isinstance(data["queue_depth"], int) and data["queue_depth"] >= 0
@pytest.mark.anyio
async def test_health_version_is_semver(self, client):
resp = await client.get("/api/health")
version = resp.json()["version"]
parts = version.split(".")
assert len(parts) == 3, f"Expected semver, got {version}"
@pytest.mark.anyio
async def test_health_queue_depth_reflects_active_jobs(self, client):
"""queue_depth counts queued + downloading + extracting, not terminal."""
# Get the db from the test app via a back-door: make requests that
# create jobs, then check health.
# Create 2 queued jobs by posting downloads
resp1 = await client.post("/api/downloads", json={"url": "https://example.com/a"})
resp2 = await client.post("/api/downloads", json={"url": "https://example.com/b"})
assert resp1.status_code == 201
assert resp2.status_code == 201
health = await client.get("/api/health")
data = health.json()
# At least 2 active jobs (might be more if worker picked them up)
assert data["queue_depth"] >= 2
@pytest.mark.anyio
async def test_health_queue_depth_excludes_completed(self, db):
"""Completed/failed/expired jobs are NOT counted in queue_depth."""
sid = str(uuid.uuid4())
await create_job(db, _make_job(sid, "completed"))
await create_job(db, _make_job(sid, "failed"))
await create_job(db, _make_job(sid, "expired"))
await create_job(db, _make_job(sid, "queued"))
depth = await get_queue_depth(db)
assert depth == 1
@pytest.mark.anyio
async def test_health_uptime_positive(self, client):
resp = await client.get("/api/health")
assert resp.json()["uptime"] >= 0
# ===========================================================================
# Public config endpoint tests
# ===========================================================================
class TestPublicConfig:
"""GET /api/config/public returns safe fields only."""
@pytest.mark.anyio
async def test_public_config_returns_expected_fields(self, client):
resp = await client.get("/api/config/public")
assert resp.status_code == 200
data = resp.json()
assert "session_mode" in data
assert "default_theme" in data
assert "purge_enabled" in data
assert "max_concurrent_downloads" in data
@pytest.mark.anyio
async def test_public_config_excludes_sensitive_fields(self, client):
resp = await client.get("/api/config/public")
raw = resp.text # Check the raw JSON string — catches nested keys too
assert "password_hash" not in raw
assert "username" not in raw
@pytest.mark.anyio
async def test_public_config_reflects_actual_config(self, tmp_path):
"""Config values in the response match what AppConfig was built with."""
import asyncio
from datetime import datetime, timezone
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from app.core.config import AppConfig
from app.core.database import close_db, init_db
from app.core.sse_broker import SSEBroker
from app.middleware.session import SessionMiddleware
from app.routers.system import router as system_router
db_path = str(tmp_path / "cfg_test.db")
config = AppConfig(
server={"db_path": db_path},
session={"mode": "shared"},
ui={"default_theme": "cyberpunk"},
purge={"enabled": True},
downloads={"max_concurrent": 5},
)
db_conn = await init_db(db_path)
test_app = FastAPI()
test_app.add_middleware(SessionMiddleware)
test_app.include_router(system_router, prefix="/api")
test_app.state.config = config
test_app.state.db = db_conn
test_app.state.start_time = datetime.now(timezone.utc)
transport = ASGITransport(app=test_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.get("/api/config/public")
await close_db(db_conn)
data = resp.json()
assert data["session_mode"] == "shared"
assert data["default_theme"] == "cyberpunk"
assert data["purge_enabled"] is True
assert data["max_concurrent_downloads"] == 5
@pytest.mark.anyio
async def test_public_config_default_values(self, client):
"""Default config should have isolated mode and dark theme."""
resp = await client.get("/api/config/public")
data = resp.json()
assert data["session_mode"] == "isolated"
assert data["default_theme"] == "dark"
assert data["purge_enabled"] is False
assert data["max_concurrent_downloads"] == 3
# ===========================================================================
# Database: get_all_jobs
# ===========================================================================
class TestGetAllJobs:
"""get_all_jobs() returns every job regardless of session."""
@pytest.mark.anyio
async def test_returns_all_sessions(self, db):
sid_a = str(uuid.uuid4())
sid_b = str(uuid.uuid4())
await create_job(db, _make_job(sid_a))
await create_job(db, _make_job(sid_b))
jobs = await get_all_jobs(db)
session_ids = {j.session_id for j in jobs}
assert sid_a in session_ids
assert sid_b in session_ids
assert len(jobs) == 2
@pytest.mark.anyio
async def test_empty_when_no_jobs(self, db):
jobs = await get_all_jobs(db)
assert jobs == []
# ===========================================================================
# Database: get_jobs_by_mode
# ===========================================================================
class TestGetJobsByMode:
"""get_jobs_by_mode() dispatches correctly for isolated/shared/open."""
@pytest.mark.anyio
async def test_isolated_filters_by_session(self, db):
sid_a = str(uuid.uuid4())
sid_b = str(uuid.uuid4())
await create_job(db, _make_job(sid_a))
await create_job(db, _make_job(sid_b))
jobs = await get_jobs_by_mode(db, sid_a, "isolated")
assert all(j.session_id == sid_a for j in jobs)
assert len(jobs) == 1
@pytest.mark.anyio
async def test_shared_returns_all(self, db):
sid_a = str(uuid.uuid4())
sid_b = str(uuid.uuid4())
await create_job(db, _make_job(sid_a))
await create_job(db, _make_job(sid_b))
jobs = await get_jobs_by_mode(db, sid_a, "shared")
assert len(jobs) == 2
@pytest.mark.anyio
async def test_open_returns_all(self, db):
sid_a = str(uuid.uuid4())
sid_b = str(uuid.uuid4())
await create_job(db, _make_job(sid_a))
await create_job(db, _make_job(sid_b))
jobs = await get_jobs_by_mode(db, sid_a, "open")
assert len(jobs) == 2
# ===========================================================================
# Database: get_queue_depth
# ===========================================================================
class TestGetQueueDepth:
"""get_queue_depth() counts only non-terminal jobs."""
@pytest.mark.anyio
async def test_counts_active_statuses(self, db):
sid = str(uuid.uuid4())
await create_job(db, _make_job(sid, "queued"))
await create_job(db, _make_job(sid, "downloading"))
await create_job(db, _make_job(sid, "extracting"))
assert await get_queue_depth(db) == 3
@pytest.mark.anyio
async def test_excludes_terminal_statuses(self, db):
sid = str(uuid.uuid4())
await create_job(db, _make_job(sid, "completed"))
await create_job(db, _make_job(sid, "failed"))
await create_job(db, _make_job(sid, "expired"))
assert await get_queue_depth(db) == 0
@pytest.mark.anyio
async def test_mixed_statuses(self, db):
sid = str(uuid.uuid4())
await create_job(db, _make_job(sid, "queued"))
await create_job(db, _make_job(sid, "completed"))
await create_job(db, _make_job(sid, "downloading"))
await create_job(db, _make_job(sid, "failed"))
assert await get_queue_depth(db) == 2
@pytest.mark.anyio
async def test_zero_when_empty(self, db):
assert await get_queue_depth(db) == 0