mirror of
https://github.com/xpltdco/media-rip.git
synced 2026-04-03 10:54:00 -06:00
Full-featured self-hosted yt-dlp web frontend:
- Python 3.12+ / FastAPI backend with async SQLite, SSE transport, session isolation
- Vue 3 / TypeScript / Pinia frontend with real-time progress, theme picker
- 3 built-in themes (cyberpunk/dark/light) + drop-in custom theme system
- Admin auth (bcrypt), purge system, cookie upload, file serving
- Docker multi-stage build, GitHub Actions CI/CD
- 179 backend tests, 29 frontend tests (208 total)
Slices: S01 (Foundation), S02 (SSE+Sessions), S03 (Frontend),
S04 (Admin+Auth), S05 (Themes), S06 (Docker+CI)
138 lines
4.3 KiB
Python
138 lines
4.3 KiB
Python
"""Tests for the purge service."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from app.core.config import AppConfig
|
|
from app.core.database import create_job, init_db, close_db
|
|
from app.models.job import Job
|
|
from app.services.purge import run_purge
|
|
|
|
|
|
def _make_job(
|
|
session_id: str,
|
|
status: str = "completed",
|
|
filename: str | None = None,
|
|
hours_ago: int = 0,
|
|
) -> Job:
|
|
completed_at = (
|
|
(datetime.now(timezone.utc) - timedelta(hours=hours_ago)).isoformat()
|
|
if status in ("completed", "failed", "expired")
|
|
else None
|
|
)
|
|
return Job(
|
|
id=str(uuid.uuid4()),
|
|
session_id=session_id,
|
|
url="https://example.com/video",
|
|
status=status,
|
|
filename=filename,
|
|
created_at=datetime.now(timezone.utc).isoformat(),
|
|
completed_at=completed_at,
|
|
)
|
|
|
|
|
|
class TestPurge:
|
|
"""Purge service tests."""
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_deletes_old_completed_jobs(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 24},
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
# Create an old completed job (48 hours ago)
|
|
job = _make_job(sid, "completed", hours_ago=48)
|
|
await create_job(db, job)
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["rows_deleted"] == 1
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_skips_recent_completed(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 24},
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
# Create a recent completed job (1 hour ago)
|
|
job = _make_job(sid, "completed", hours_ago=1)
|
|
await create_job(db, job)
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["rows_deleted"] == 0
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_skips_active_jobs(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 0}, # purge everything terminal
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
# Active jobs should never be purged regardless of age
|
|
await create_job(db, _make_job(sid, "queued", hours_ago=0))
|
|
await create_job(db, _make_job(sid, "downloading", hours_ago=0))
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["rows_deleted"] == 0
|
|
assert result["active_skipped"] == 2
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_deletes_files(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 0},
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
# Create a file on disk
|
|
test_file = tmp_path / "video.mp4"
|
|
test_file.write_text("fake video data")
|
|
|
|
job = _make_job(sid, "completed", filename="video.mp4", hours_ago=1)
|
|
await create_job(db, job)
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["files_deleted"] == 1
|
|
assert not test_file.exists()
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_handles_missing_files(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 0},
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
# Job references a file that doesn't exist on disk
|
|
job = _make_job(sid, "completed", filename="gone.mp4", hours_ago=1)
|
|
await create_job(db, job)
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["rows_deleted"] == 1
|
|
assert result["files_missing"] == 1
|
|
|
|
@pytest.mark.anyio
|
|
async def test_purge_mixed_statuses(self, db, tmp_path):
|
|
config = AppConfig(
|
|
downloads={"output_dir": str(tmp_path)},
|
|
purge={"max_age_hours": 0},
|
|
)
|
|
sid = str(uuid.uuid4())
|
|
|
|
await create_job(db, _make_job(sid, "completed", hours_ago=1))
|
|
await create_job(db, _make_job(sid, "failed", hours_ago=1))
|
|
await create_job(db, _make_job(sid, "queued", hours_ago=0))
|
|
|
|
result = await run_purge(db, config)
|
|
assert result["rows_deleted"] == 2
|
|
assert result["active_skipped"] == 1
|