chrysopedia/backend/tests/conftest.py
jlightner bfb303860b test: Add 22 integration tests for consent endpoints covering auth, own…
- "backend/tests/test_consent.py"
- "backend/tests/conftest.py"

GSD-Task: S03/T03
2026-04-03 22:16:31 +00:00

379 lines
12 KiB
Python

"""Shared fixtures for Chrysopedia integration tests.
Provides:
- Async SQLAlchemy engine/session against a real PostgreSQL test database
- Sync SQLAlchemy engine/session for pipeline stage tests (Celery stages are sync)
- httpx.AsyncClient wired to the FastAPI app with dependency overrides
- Pre-ingest fixture for pipeline tests
- Sample transcript fixture path and temporary storage directory
Key design choice: function-scoped engine with NullPool avoids asyncpg
"another operation in progress" errors caused by session-scoped connection
reuse between the ASGI test client and verification queries.
"""
import json
import os
import pathlib
import uuid
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool
# Ensure backend/ is on sys.path so "from models import ..." works
import sys
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
from database import Base, get_session # noqa: E402
from main import app # noqa: E402
from models import ( # noqa: E402
ContentType,
Creator,
InviteCode,
ProcessingStatus,
SourceVideo,
TranscriptSegment,
User,
UserRole,
VideoConsent,
)
TEST_DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
)
TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace(
"postgresql+asyncpg://", "postgresql+psycopg2://"
)
@pytest_asyncio.fixture()
async def db_engine():
"""Create a per-test async engine (NullPool) and create/drop all tables."""
engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool)
# Create all tables fresh for each test
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield engine
# Drop all tables after test
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest_asyncio.fixture()
async def client(db_engine, tmp_path):
"""Async HTTP test client wired to FastAPI with dependency overrides."""
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async def _override_get_session():
async with session_factory() as session:
yield session
# Override DB session dependency
app.dependency_overrides[get_session] = _override_get_session
# Override transcript_storage_path via environment variable
os.environ["TRANSCRIPT_STORAGE_PATH"] = str(tmp_path)
# Clear the lru_cache so Settings picks up the new env var
from config import get_settings
get_settings.cache_clear()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
yield ac
# Teardown: clean overrides and restore settings cache
app.dependency_overrides.clear()
os.environ.pop("TRANSCRIPT_STORAGE_PATH", None)
get_settings.cache_clear()
@pytest.fixture()
def sample_transcript_path() -> pathlib.Path:
"""Path to the sample 5-segment transcript JSON fixture."""
return pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
@pytest.fixture()
def tmp_transcript_dir(tmp_path) -> pathlib.Path:
"""Temporary directory for transcript storage during tests."""
return tmp_path
# ── Sync engine/session for pipeline stages ──────────────────────────────────
@pytest.fixture()
def sync_engine(db_engine):
"""Create a sync SQLAlchemy engine pointing at the test database.
Tables are already created/dropped by the async ``db_engine`` fixture,
so this fixture just wraps a sync engine around the same DB URL.
"""
engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool)
yield engine
engine.dispose()
@pytest.fixture()
def sync_session(sync_engine) -> Session:
"""Create a sync SQLAlchemy session for pipeline stage tests."""
factory = sessionmaker(bind=sync_engine)
session = factory()
yield session
session.close()
# ── Pre-ingest fixture for pipeline tests ────────────────────────────────────
@pytest.fixture()
def pre_ingested_video(sync_engine):
"""Ingest the sample transcript directly into the test DB via sync ORM.
Returns a dict with ``video_id``, ``creator_id``, and ``segment_count``.
"""
factory = sessionmaker(bind=sync_engine)
session = factory()
try:
# Create creator
creator = Creator(
name="Skope",
slug="skope",
folder_name="Skope",
)
session.add(creator)
session.flush()
# Create video
video = SourceVideo(
creator_id=creator.id,
filename="mixing-basics-ep1.mp4",
file_path="Skope/mixing-basics-ep1.mp4",
duration_seconds=1234,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.transcribed,
)
session.add(video)
session.flush()
# Create transcript segments
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
data = json.loads(sample.read_text())
for idx, seg in enumerate(data["segments"]):
session.add(TranscriptSegment(
source_video_id=video.id,
start_time=float(seg["start"]),
end_time=float(seg["end"]),
text=str(seg["text"]),
segment_index=idx,
))
session.commit()
result = {
"video_id": str(video.id),
"creator_id": str(creator.id),
"segment_count": len(data["segments"]),
}
finally:
session.close()
return result
# ── Auth fixtures ────────────────────────────────────────────────────────────
_TEST_INVITE_CODE = "TEST-INVITE-2026"
_TEST_EMAIL = "testuser@chrysopedia.com"
_TEST_PASSWORD = "securepass123"
@pytest_asyncio.fixture()
async def invite_code(db_engine):
"""Create a test invite code in the DB and return the code string."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
code = InviteCode(code=_TEST_INVITE_CODE, uses_remaining=10)
session.add(code)
await session.commit()
return _TEST_INVITE_CODE
@pytest_asyncio.fixture()
async def registered_user(client, invite_code):
"""Register a user via the API and return the response dict."""
resp = await client.post("/api/v1/auth/register", json={
"email": _TEST_EMAIL,
"password": _TEST_PASSWORD,
"display_name": "Test User",
"invite_code": invite_code,
})
assert resp.status_code == 201
return resp.json()
@pytest_asyncio.fixture()
async def auth_headers(client, registered_user):
"""Log in and return Authorization headers dict."""
resp = await client.post("/api/v1/auth/login", json={
"email": _TEST_EMAIL,
"password": _TEST_PASSWORD,
})
assert resp.status_code == 200
token = resp.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
# ── Consent test fixtures ────────────────────────────────────────────────────
_CONSENT_INVITE = "CONSENT-INV-2026"
_CREATOR_EMAIL = "creator@chrysopedia.com"
_CREATOR_PASSWORD = "creatorpass123"
_ADMIN_EMAIL = "admin@chrysopedia.com"
_ADMIN_PASSWORD = "adminpass123"
@pytest_asyncio.fixture()
async def creator_with_videos(db_engine):
"""Create a Creator with 2 SourceVideos. Returns dict with IDs."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
creator = Creator(
name="TestCreator",
slug="testcreator",
folder_name="TestCreator",
)
session.add(creator)
await session.flush()
video1 = SourceVideo(
creator_id=creator.id,
filename="video-one.mp4",
file_path="TestCreator/video-one.mp4",
duration_seconds=600,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.not_started,
)
video2 = SourceVideo(
creator_id=creator.id,
filename="video-two.mp4",
file_path="TestCreator/video-two.mp4",
duration_seconds=900,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.not_started,
)
session.add_all([video1, video2])
await session.flush()
result = {
"creator_id": creator.id,
"video_ids": [video1.id, video2.id],
}
await session.commit()
return result
@pytest_asyncio.fixture()
async def creator_user_auth(client, db_engine, creator_with_videos):
"""Register a user linked to the test creator, return auth headers."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
# Create invite code for this fixture
async with factory() as session:
code = InviteCode(code=_CONSENT_INVITE, uses_remaining=10)
session.add(code)
await session.commit()
# Register user
resp = await client.post("/api/v1/auth/register", json={
"email": _CREATOR_EMAIL,
"password": _CREATOR_PASSWORD,
"display_name": "Creator User",
"invite_code": _CONSENT_INVITE,
})
assert resp.status_code == 201
user_data = resp.json()
# Link user to creator via direct DB update
async with factory() as session:
from sqlalchemy import update
await session.execute(
update(User)
.where(User.id == uuid.UUID(user_data["id"]))
.values(creator_id=creator_with_videos["creator_id"])
)
await session.commit()
# Login to get token
resp = await client.post("/api/v1/auth/login", json={
"email": _CREATOR_EMAIL,
"password": _CREATOR_PASSWORD,
})
assert resp.status_code == 200
token = resp.json()["access_token"]
return {
"headers": {"Authorization": f"Bearer {token}"},
"user_id": user_data["id"],
"creator_id": creator_with_videos["creator_id"],
"video_ids": creator_with_videos["video_ids"],
}
@pytest_asyncio.fixture()
async def admin_auth(client, db_engine):
"""Register an admin-role user, return auth headers."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
# Create invite code
async with factory() as session:
code = InviteCode(code="ADMIN-INV-2026", uses_remaining=10)
session.add(code)
await session.commit()
# Register user
resp = await client.post("/api/v1/auth/register", json={
"email": _ADMIN_EMAIL,
"password": _ADMIN_PASSWORD,
"display_name": "Admin User",
"invite_code": "ADMIN-INV-2026",
})
assert resp.status_code == 201
user_data = resp.json()
# Promote to admin via direct DB update
async with factory() as session:
from sqlalchemy import update
await session.execute(
update(User)
.where(User.id == uuid.UUID(user_data["id"]))
.values(role=UserRole.admin)
)
await session.commit()
# Login to get token (token reflects old role, but DB has admin)
resp = await client.post("/api/v1/auth/login", json={
"email": _ADMIN_EMAIL,
"password": _ADMIN_PASSWORD,
})
assert resp.status_code == 200
token = resp.json()["access_token"]
return {
"headers": {"Authorization": f"Bearer {token}"},
"user_id": user_data["id"],
}