media-rip/backend/tests/test_purge.py
xpltd 43ddf43951 Purge intervals: hours→minutes, default ON at 1440min (24h)
- PurgeConfig: max_age_hours→max_age_minutes (default 1440)
- PurgeConfig: privacy_retention_hours→privacy_retention_minutes (default 1440)
- PurgeConfig: enabled default False→True
- PurgeConfig: cron default every minute (was daily 3am)
- Purge scheduler runs every minute for minute-granularity testing
- All API fields renamed: purge_max_age_minutes, privacy_retention_minutes
- Frontend admin panel inputs show minutes with updated labels
- Updated test assertions for new defaults
2026-03-21 20:33:13 -05:00

136 lines
4.2 KiB
Python

"""Tests for the purge service."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone, timedelta
import pytest
from app.core.config import AppConfig
from app.core.database import create_job
from app.models.job import Job
from app.services.purge import run_purge
def _make_job(
session_id: str,
status: str = "completed",
filename: str | None = None,
hours_ago: int = 0,
) -> Job:
completed_at = (
(datetime.now(timezone.utc) - timedelta(hours=hours_ago)).isoformat()
if status in ("completed", "failed", "expired")
else None
)
return Job(
id=str(uuid.uuid4()),
session_id=session_id,
url="https://example.com/video",
status=status,
filename=filename,
created_at=datetime.now(timezone.utc).isoformat(),
completed_at=completed_at,
)
class TestPurge:
"""Purge service tests."""
@pytest.mark.anyio
async def test_purge_deletes_old_completed_jobs(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 1440},
)
sid = str(uuid.uuid4())
# Create an old completed job (48 hours ago)
job = _make_job(sid, "completed", hours_ago=48)
await create_job(db, job)
result = await run_purge(db, config)
assert result["rows_deleted"] == 1
@pytest.mark.anyio
async def test_purge_skips_recent_completed(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 1440},
)
sid = str(uuid.uuid4())
# Create a recent completed job (1 hour ago)
job = _make_job(sid, "completed", hours_ago=1)
await create_job(db, job)
result = await run_purge(db, config)
assert result["rows_deleted"] == 0
@pytest.mark.anyio
async def test_purge_skips_active_jobs(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 0}, # purge everything terminal
)
sid = str(uuid.uuid4())
# Active jobs should never be purged regardless of age
await create_job(db, _make_job(sid, "queued", hours_ago=0))
await create_job(db, _make_job(sid, "downloading", hours_ago=0))
result = await run_purge(db, config)
assert result["rows_deleted"] == 0
assert result["active_skipped"] == 2
@pytest.mark.anyio
async def test_purge_deletes_files(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 0},
)
sid = str(uuid.uuid4())
# Create a file on disk
test_file = tmp_path / "video.mp4"
test_file.write_text("fake video data")
job = _make_job(sid, "completed", filename="video.mp4", hours_ago=1)
await create_job(db, job)
result = await run_purge(db, config)
assert result["files_deleted"] == 1
assert not test_file.exists()
@pytest.mark.anyio
async def test_purge_handles_missing_files(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 0},
)
sid = str(uuid.uuid4())
# Job references a file that doesn't exist on disk
job = _make_job(sid, "completed", filename="gone.mp4", hours_ago=1)
await create_job(db, job)
result = await run_purge(db, config)
assert result["rows_deleted"] == 1
assert result["files_missing"] == 1
@pytest.mark.anyio
async def test_purge_mixed_statuses(self, db, tmp_path):
config = AppConfig(
downloads={"output_dir": str(tmp_path)},
purge={"max_age_minutes": 0},
)
sid = str(uuid.uuid4())
await create_job(db, _make_job(sid, "completed", hours_ago=1))
await create_job(db, _make_job(sid, "failed", hours_ago=1))
await create_job(db, _make_job(sid, "queued", hours_ago=0))
result = await run_purge(db, config)
assert result["rows_deleted"] == 2
assert result["active_skipped"] == 1