mirror of
https://github.com/xpltdco/media-rip.git
synced 2026-04-03 02:53:58 -06:00
Full-featured self-hosted yt-dlp web frontend:
- Python 3.12+ / FastAPI backend with async SQLite, SSE transport, session isolation
- Vue 3 / TypeScript / Pinia frontend with real-time progress, theme picker
- 3 built-in themes (cyberpunk/dark/light) + drop-in custom theme system
- Admin auth (bcrypt), purge system, cookie upload, file serving
- Docker multi-stage build, GitHub Actions CI/CD
- 179 backend tests, 29 frontend tests (208 total)
Slices: S01 (Foundation), S02 (SSE+Sessions), S03 (Frontend),
S04 (Admin+Auth), S05 (Themes), S06 (Docker+CI)
294 lines
10 KiB
Python
294 lines
10 KiB
Python
"""Tests for health endpoint, public config endpoint, and session-mode query layer.
|
|
|
|
Covers:
|
|
- GET /api/health — structure, types, queue_depth accuracy
|
|
- GET /api/config/public — safe fields present, sensitive fields excluded
|
|
- get_jobs_by_mode() — isolated/shared/open dispatching
|
|
- get_queue_depth() — counts only non-terminal jobs
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from app.core.database import (
|
|
create_job,
|
|
get_all_jobs,
|
|
get_jobs_by_mode,
|
|
get_queue_depth,
|
|
)
|
|
from app.models.job import Job, JobStatus
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_job(
|
|
session_id: str,
|
|
status: str = "queued",
|
|
url: str = "https://example.com/video",
|
|
) -> Job:
|
|
"""Create a Job model with a random ID and given session/status."""
|
|
return Job(
|
|
id=str(uuid.uuid4()),
|
|
session_id=session_id,
|
|
url=url,
|
|
status=status,
|
|
created_at=datetime.now(timezone.utc).isoformat(),
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Health endpoint tests
|
|
# ===========================================================================
|
|
|
|
|
|
class TestHealthEndpoint:
|
|
"""GET /api/health returns correct structure and values."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_health_returns_correct_structure(self, client):
|
|
resp = await client.get("/api/health")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert data["status"] == "ok"
|
|
assert isinstance(data["version"], str) and len(data["version"]) > 0
|
|
assert isinstance(data["yt_dlp_version"], str) and len(data["yt_dlp_version"]) > 0
|
|
assert isinstance(data["uptime"], (int, float)) and data["uptime"] >= 0
|
|
assert isinstance(data["queue_depth"], int) and data["queue_depth"] >= 0
|
|
|
|
@pytest.mark.anyio
|
|
async def test_health_version_is_semver(self, client):
|
|
resp = await client.get("/api/health")
|
|
version = resp.json()["version"]
|
|
parts = version.split(".")
|
|
assert len(parts) == 3, f"Expected semver, got {version}"
|
|
|
|
@pytest.mark.anyio
|
|
async def test_health_queue_depth_reflects_active_jobs(self, client):
|
|
"""queue_depth counts queued + downloading + extracting, not terminal."""
|
|
# Get the db from the test app via a back-door: make requests that
|
|
# create jobs, then check health.
|
|
# Create 2 queued jobs by posting downloads
|
|
resp1 = await client.post("/api/downloads", json={"url": "https://example.com/a"})
|
|
resp2 = await client.post("/api/downloads", json={"url": "https://example.com/b"})
|
|
assert resp1.status_code == 201
|
|
assert resp2.status_code == 201
|
|
|
|
health = await client.get("/api/health")
|
|
data = health.json()
|
|
# At least 2 active jobs (might be more if worker picked them up)
|
|
assert data["queue_depth"] >= 2
|
|
|
|
@pytest.mark.anyio
|
|
async def test_health_queue_depth_excludes_completed(self, db):
|
|
"""Completed/failed/expired jobs are NOT counted in queue_depth."""
|
|
sid = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid, "completed"))
|
|
await create_job(db, _make_job(sid, "failed"))
|
|
await create_job(db, _make_job(sid, "expired"))
|
|
await create_job(db, _make_job(sid, "queued"))
|
|
|
|
depth = await get_queue_depth(db)
|
|
assert depth == 1
|
|
|
|
@pytest.mark.anyio
|
|
async def test_health_uptime_positive(self, client):
|
|
resp = await client.get("/api/health")
|
|
assert resp.json()["uptime"] >= 0
|
|
|
|
|
|
# ===========================================================================
|
|
# Public config endpoint tests
|
|
# ===========================================================================
|
|
|
|
|
|
class TestPublicConfig:
|
|
"""GET /api/config/public returns safe fields only."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_public_config_returns_expected_fields(self, client):
|
|
resp = await client.get("/api/config/public")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert "session_mode" in data
|
|
assert "default_theme" in data
|
|
assert "purge_enabled" in data
|
|
assert "max_concurrent_downloads" in data
|
|
|
|
@pytest.mark.anyio
|
|
async def test_public_config_excludes_sensitive_fields(self, client):
|
|
resp = await client.get("/api/config/public")
|
|
raw = resp.text # Check the raw JSON string — catches nested keys too
|
|
assert "password_hash" not in raw
|
|
assert "username" not in raw
|
|
|
|
@pytest.mark.anyio
|
|
async def test_public_config_reflects_actual_config(self, tmp_path):
|
|
"""Config values in the response match what AppConfig was built with."""
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import FastAPI
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.core.config import AppConfig
|
|
from app.core.database import close_db, init_db
|
|
from app.core.sse_broker import SSEBroker
|
|
from app.middleware.session import SessionMiddleware
|
|
from app.routers.system import router as system_router
|
|
|
|
db_path = str(tmp_path / "cfg_test.db")
|
|
config = AppConfig(
|
|
server={"db_path": db_path},
|
|
session={"mode": "shared"},
|
|
ui={"default_theme": "cyberpunk"},
|
|
purge={"enabled": True},
|
|
downloads={"max_concurrent": 5},
|
|
)
|
|
|
|
db_conn = await init_db(db_path)
|
|
test_app = FastAPI()
|
|
test_app.add_middleware(SessionMiddleware)
|
|
test_app.include_router(system_router, prefix="/api")
|
|
test_app.state.config = config
|
|
test_app.state.db = db_conn
|
|
test_app.state.start_time = datetime.now(timezone.utc)
|
|
|
|
transport = ASGITransport(app=test_app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
resp = await ac.get("/api/config/public")
|
|
|
|
await close_db(db_conn)
|
|
|
|
data = resp.json()
|
|
assert data["session_mode"] == "shared"
|
|
assert data["default_theme"] == "cyberpunk"
|
|
assert data["purge_enabled"] is True
|
|
assert data["max_concurrent_downloads"] == 5
|
|
|
|
@pytest.mark.anyio
|
|
async def test_public_config_default_values(self, client):
|
|
"""Default config should have isolated mode and dark theme."""
|
|
resp = await client.get("/api/config/public")
|
|
data = resp.json()
|
|
assert data["session_mode"] == "isolated"
|
|
assert data["default_theme"] == "dark"
|
|
assert data["purge_enabled"] is False
|
|
assert data["max_concurrent_downloads"] == 3
|
|
|
|
|
|
# ===========================================================================
|
|
# Database: get_all_jobs
|
|
# ===========================================================================
|
|
|
|
|
|
class TestGetAllJobs:
|
|
"""get_all_jobs() returns every job regardless of session."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_returns_all_sessions(self, db):
|
|
sid_a = str(uuid.uuid4())
|
|
sid_b = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid_a))
|
|
await create_job(db, _make_job(sid_b))
|
|
|
|
jobs = await get_all_jobs(db)
|
|
session_ids = {j.session_id for j in jobs}
|
|
assert sid_a in session_ids
|
|
assert sid_b in session_ids
|
|
assert len(jobs) == 2
|
|
|
|
@pytest.mark.anyio
|
|
async def test_empty_when_no_jobs(self, db):
|
|
jobs = await get_all_jobs(db)
|
|
assert jobs == []
|
|
|
|
|
|
# ===========================================================================
|
|
# Database: get_jobs_by_mode
|
|
# ===========================================================================
|
|
|
|
|
|
class TestGetJobsByMode:
|
|
"""get_jobs_by_mode() dispatches correctly for isolated/shared/open."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_isolated_filters_by_session(self, db):
|
|
sid_a = str(uuid.uuid4())
|
|
sid_b = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid_a))
|
|
await create_job(db, _make_job(sid_b))
|
|
|
|
jobs = await get_jobs_by_mode(db, sid_a, "isolated")
|
|
assert all(j.session_id == sid_a for j in jobs)
|
|
assert len(jobs) == 1
|
|
|
|
@pytest.mark.anyio
|
|
async def test_shared_returns_all(self, db):
|
|
sid_a = str(uuid.uuid4())
|
|
sid_b = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid_a))
|
|
await create_job(db, _make_job(sid_b))
|
|
|
|
jobs = await get_jobs_by_mode(db, sid_a, "shared")
|
|
assert len(jobs) == 2
|
|
|
|
@pytest.mark.anyio
|
|
async def test_open_returns_all(self, db):
|
|
sid_a = str(uuid.uuid4())
|
|
sid_b = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid_a))
|
|
await create_job(db, _make_job(sid_b))
|
|
|
|
jobs = await get_jobs_by_mode(db, sid_a, "open")
|
|
assert len(jobs) == 2
|
|
|
|
|
|
# ===========================================================================
|
|
# Database: get_queue_depth
|
|
# ===========================================================================
|
|
|
|
|
|
class TestGetQueueDepth:
|
|
"""get_queue_depth() counts only non-terminal jobs."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_counts_active_statuses(self, db):
|
|
sid = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid, "queued"))
|
|
await create_job(db, _make_job(sid, "downloading"))
|
|
await create_job(db, _make_job(sid, "extracting"))
|
|
|
|
assert await get_queue_depth(db) == 3
|
|
|
|
@pytest.mark.anyio
|
|
async def test_excludes_terminal_statuses(self, db):
|
|
sid = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid, "completed"))
|
|
await create_job(db, _make_job(sid, "failed"))
|
|
await create_job(db, _make_job(sid, "expired"))
|
|
|
|
assert await get_queue_depth(db) == 0
|
|
|
|
@pytest.mark.anyio
|
|
async def test_mixed_statuses(self, db):
|
|
sid = str(uuid.uuid4())
|
|
await create_job(db, _make_job(sid, "queued"))
|
|
await create_job(db, _make_job(sid, "completed"))
|
|
await create_job(db, _make_job(sid, "downloading"))
|
|
await create_job(db, _make_job(sid, "failed"))
|
|
|
|
assert await get_queue_depth(db) == 2
|
|
|
|
@pytest.mark.anyio
|
|
async def test_zero_when_empty(self, db):
|
|
assert await get_queue_depth(db) == 0
|