chrysopedia/backend/tests/test_notifications.py
jlightner cb3a6c919c test: Added GET/PUT notification preferences endpoints, signed-token un…
- "backend/routers/notifications.py"
- "backend/main.py"
- "backend/tests/notifications/test_notifications.py"
- "frontend/src/api/notifications.ts"
- "frontend/src/pages/CreatorSettings.tsx"

GSD-Task: S01/T03
2026-04-04 12:27:18 +00:00

395 lines
14 KiB
Python

"""Integration tests for notification preferences, unsubscribe, and digest task.
Tests:
- GET/PUT notification preferences (auth required)
- PUT with invalid digest_frequency value → 422
- GET/PUT without auth → 401
- Unsubscribe with valid token
- Unsubscribe with expired token
- Unsubscribe with tampered token
- Digest task happy path (mocked SMTP)
- Digest task skips when no new content
- Digest task no-op when SMTP unconfigured
Uses a standalone ASGI test client to avoid importing the full app
(which may reference routers not present in the running container image).
"""
from __future__ import annotations
import os
import pathlib
import sys
import time
import uuid
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import jwt
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import select, update as sa_update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool
# Ensure backend/ on path
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
from database import Base, get_session # noqa: E402
from models import ( # noqa: E402
Creator,
CreatorFollow,
EmailDigestLog,
InviteCode,
Post,
User,
)
TEST_DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
)
TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace(
"postgresql+asyncpg://", "postgresql+psycopg2://"
)
_SECRET = "changeme-generate-a-real-secret"
# ── Standalone test app ──────────────────────────────────────────────────────
def _make_test_app():
"""Build a minimal FastAPI app with only the notifications router + auth."""
from fastapi import FastAPI
from routers.notifications import router as notif_router
from routers.auth import router as auth_router
test_app = FastAPI()
test_app.include_router(auth_router, prefix="/api/v1")
test_app.include_router(notif_router, prefix="/api/v1")
return test_app
# ── Fixtures ─────────────────────────────────────────────────────────────────
@pytest_asyncio.fixture()
async def db_engine():
engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest_asyncio.fixture()
async def sync_engine_fix(db_engine):
"""Sync engine pointing at same test DB (tables created by db_engine)."""
from sqlalchemy import create_engine
engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool)
yield engine
engine.dispose()
@pytest_asyncio.fixture()
async def client(db_engine, tmp_path):
test_app = _make_test_app()
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async def _override_get_session():
async with session_factory() as session:
yield session
test_app.dependency_overrides[get_session] = _override_get_session
# Clear settings cache so tests get fresh defaults
from config import get_settings
get_settings.cache_clear()
transport = ASGITransport(app=test_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
yield ac
test_app.dependency_overrides.clear()
get_settings.cache_clear()
@pytest_asyncio.fixture()
async def invite_code(db_engine):
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
code = InviteCode(code="NOTIF-TEST-2026", uses_remaining=10)
session.add(code)
await session.commit()
return "NOTIF-TEST-2026"
@pytest_asyncio.fixture()
async def registered_user(client, invite_code):
resp = await client.post("/api/v1/auth/register", json={
"email": "notiftest@chrysopedia.com",
"password": "testpass123",
"display_name": "Notif Test User",
"invite_code": invite_code,
})
assert resp.status_code == 201, f"Register failed: {resp.text}"
return resp.json()
@pytest_asyncio.fixture()
async def auth_headers(client, registered_user):
resp = await client.post("/api/v1/auth/login", json={
"email": "notiftest@chrysopedia.com",
"password": "testpass123",
})
assert resp.status_code == 200, f"Login failed: {resp.text}"
token = resp.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_unsubscribe_token(user_id: str, secret: str = _SECRET, iat: float | None = None) -> str:
payload = {
"sub": user_id,
"purpose": "unsubscribe",
"iat": iat or datetime.now(timezone.utc).timestamp(),
}
return jwt.encode(payload, secret, algorithm="HS256")
# ── Preference endpoint tests ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_preferences_requires_auth(client):
resp = await client.get("/api/v1/notifications/preferences")
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_put_preferences_requires_auth(client):
resp = await client.put(
"/api/v1/notifications/preferences",
json={"email_digests": False},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_get_preferences_defaults(client, auth_headers):
resp = await client.get("/api/v1/notifications/preferences", headers=auth_headers)
assert resp.status_code == 200
data = resp.json()
assert data["email_digests"] is True
assert data["digest_frequency"] == "daily"
@pytest.mark.asyncio
async def test_put_preferences_update_digests(client, auth_headers):
resp = await client.put(
"/api/v1/notifications/preferences",
json={"email_digests": False},
headers=auth_headers,
)
assert resp.status_code == 200
assert resp.json()["email_digests"] is False
assert resp.json()["digest_frequency"] == "daily"
# Verify persistence
resp2 = await client.get("/api/v1/notifications/preferences", headers=auth_headers)
assert resp2.json()["email_digests"] is False
@pytest.mark.asyncio
async def test_put_preferences_update_frequency(client, auth_headers):
resp = await client.put(
"/api/v1/notifications/preferences",
json={"digest_frequency": "weekly"},
headers=auth_headers,
)
assert resp.status_code == 200
assert resp.json()["digest_frequency"] == "weekly"
@pytest.mark.asyncio
async def test_put_preferences_invalid_frequency(client, auth_headers):
resp = await client.put(
"/api/v1/notifications/preferences",
json={"digest_frequency": "monthly"},
headers=auth_headers,
)
assert resp.status_code == 422
# ── Unsubscribe endpoint tests ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_unsubscribe_valid_token(client, auth_headers, db_engine):
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
result = await session.execute(select(User).limit(1))
user = result.scalar_one()
user_id = str(user.id)
token = _make_unsubscribe_token(user_id)
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
assert resp.status_code == 200
assert "Unsubscribed" in resp.text
# Verify preferences updated
async with factory() as session:
result = await session.execute(select(User).where(User.id == user.id))
updated = result.scalar_one()
assert updated.notification_preferences.get("email_digests") is False
@pytest.mark.asyncio
async def test_unsubscribe_expired_token(client, auth_headers, db_engine):
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
result = await session.execute(select(User).limit(1))
user = result.scalar_one()
expired_iat = time.time() - (31 * 24 * 3600)
token = _make_unsubscribe_token(str(user.id), iat=expired_iat)
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_unsubscribe_tampered_token(client):
token = _make_unsubscribe_token(str(uuid.uuid4()), secret="wrong-secret")
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
assert resp.status_code == 400
# ── Digest task tests ────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_digest_task_smtp_unconfigured():
"""Digest task gracefully skips when SMTP is not configured."""
from tasks.notifications import send_digest_emails
with patch("tasks.notifications.is_smtp_configured", return_value=False):
result = send_digest_emails()
assert result["skipped"] is True
assert result["reason"] == "smtp_not_configured"
@pytest.mark.asyncio
async def test_digest_task_happy_path(client, auth_headers, db_engine, sync_engine_fix):
"""Full digest: user follows creator, new post exists, SMTP mocked → email sent."""
sync_factory = sessionmaker(bind=sync_engine_fix)
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
result = await session.execute(select(User).limit(1))
user = result.scalar_one()
user_id = user.id
user_email = user.email
# Ensure digests enabled
await session.execute(
sa_update(User).where(User.id == user_id).values(
notification_preferences={"email_digests": True, "digest_frequency": "daily"}
)
)
await session.commit()
# Create creator
creator = Creator(
name="DigestTestCreator",
slug="digest-test-creator",
folder_name="DigestTestCreator",
)
session.add(creator)
await session.flush()
creator_id = creator.id
# Follow
follow = CreatorFollow(user_id=user_id, creator_id=creator_id)
session.add(follow)
# Published post
post = Post(
creator_id=creator_id,
title="Test Digest Post",
body_json={"blocks": [{"type": "paragraph", "content": "Some content"}]},
is_published=True,
)
session.add(post)
await session.commit()
# Mock SMTP and sync session
mock_send = MagicMock(return_value=True)
with (
patch("tasks.notifications.is_smtp_configured", return_value=True),
patch("tasks.notifications.send_email", mock_send),
patch("tasks.notifications._get_sync_session") as mock_session_fn,
):
sync_session = sync_factory()
mock_session_fn.return_value = sync_session
from tasks.notifications import send_digest_emails
result = send_digest_emails()
assert result["sent"] >= 1
assert mock_send.called
call_args = mock_send.call_args
assert user_email in str(call_args)
# Verify EmailDigestLog
verify_session = sync_factory()
try:
logs = verify_session.execute(
select(EmailDigestLog).where(EmailDigestLog.user_id == user_id)
).scalars().all()
assert len(logs) >= 1
finally:
verify_session.close()
@pytest.mark.asyncio
async def test_digest_task_no_new_content(client, auth_headers, db_engine, sync_engine_fix):
"""Digest task skips users with no new content."""
sync_factory = sessionmaker(bind=sync_engine_fix)
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
result = await session.execute(select(User).limit(1))
user = result.scalar_one()
user_id = user.id
# Digests enabled but no follows
await session.execute(
sa_update(User).where(User.id == user_id).values(
notification_preferences={"email_digests": True, "digest_frequency": "daily"}
)
)
await session.commit()
mock_send = MagicMock(return_value=True)
with (
patch("tasks.notifications.is_smtp_configured", return_value=True),
patch("tasks.notifications.send_email", mock_send),
patch("tasks.notifications._get_sync_session") as mock_session_fn,
):
sync_session = sync_factory()
mock_session_fn.return_value = sync_session
from tasks.notifications import send_digest_emails
result = send_digest_emails()
assert result["sent"] == 0
assert result["skipped"] >= 1
assert not mock_send.called