"""Shared fixtures for Chrysopedia integration tests. Provides: - Async SQLAlchemy engine/session against a real PostgreSQL test database - Sync SQLAlchemy engine/session for pipeline stage tests (Celery stages are sync) - httpx.AsyncClient wired to the FastAPI app with dependency overrides - Pre-ingest fixture for pipeline tests - Sample transcript fixture path and temporary storage directory Key design choice: function-scoped engine with NullPool avoids asyncpg "another operation in progress" errors caused by session-scoped connection reuse between the ASGI test client and verification queries. """ import json import os import pathlib import uuid import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import NullPool # Ensure backend/ is on sys.path so "from models import ..." works import sys sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent)) from database import Base, get_session # noqa: E402 from main import app # noqa: E402 from models import ( # noqa: E402 ContentType, Creator, InviteCode, ProcessingStatus, SourceVideo, TranscriptSegment, User, UserRole, ) TEST_DATABASE_URL = os.getenv( "TEST_DATABASE_URL", "postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test", ) TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace( "postgresql+asyncpg://", "postgresql+psycopg2://" ) @pytest_asyncio.fixture() async def db_engine(): """Create a per-test async engine (NullPool) and create/drop all tables.""" engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool) # Create all tables fresh for each test async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) yield engine # Drop all tables after test async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await engine.dispose() @pytest_asyncio.fixture() async def client(db_engine, tmp_path): """Async HTTP test client wired to FastAPI with dependency overrides.""" session_factory = async_sessionmaker( db_engine, class_=AsyncSession, expire_on_commit=False ) async def _override_get_session(): async with session_factory() as session: yield session # Override DB session dependency app.dependency_overrides[get_session] = _override_get_session # Override transcript_storage_path via environment variable os.environ["TRANSCRIPT_STORAGE_PATH"] = str(tmp_path) # Clear the lru_cache so Settings picks up the new env var from config import get_settings get_settings.cache_clear() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as ac: yield ac # Teardown: clean overrides and restore settings cache app.dependency_overrides.clear() os.environ.pop("TRANSCRIPT_STORAGE_PATH", None) get_settings.cache_clear() @pytest.fixture() def sample_transcript_path() -> pathlib.Path: """Path to the sample 5-segment transcript JSON fixture.""" return pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json" @pytest.fixture() def tmp_transcript_dir(tmp_path) -> pathlib.Path: """Temporary directory for transcript storage during tests.""" return tmp_path # ── Sync engine/session for pipeline stages ────────────────────────────────── @pytest.fixture() def sync_engine(db_engine): """Create a sync SQLAlchemy engine pointing at the test database. Tables are already created/dropped by the async ``db_engine`` fixture, so this fixture just wraps a sync engine around the same DB URL. """ engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool) yield engine engine.dispose() @pytest.fixture() def sync_session(sync_engine) -> Session: """Create a sync SQLAlchemy session for pipeline stage tests.""" factory = sessionmaker(bind=sync_engine) session = factory() yield session session.close() # ── Pre-ingest fixture for pipeline tests ──────────────────────────────────── @pytest.fixture() def pre_ingested_video(sync_engine): """Ingest the sample transcript directly into the test DB via sync ORM. Returns a dict with ``video_id``, ``creator_id``, and ``segment_count``. """ factory = sessionmaker(bind=sync_engine) session = factory() try: # Create creator creator = Creator( name="Skope", slug="skope", folder_name="Skope", ) session.add(creator) session.flush() # Create video video = SourceVideo( creator_id=creator.id, filename="mixing-basics-ep1.mp4", file_path="Skope/mixing-basics-ep1.mp4", duration_seconds=1234, content_type=ContentType.tutorial, processing_status=ProcessingStatus.transcribed, ) session.add(video) session.flush() # Create transcript segments sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json" data = json.loads(sample.read_text()) for idx, seg in enumerate(data["segments"]): session.add(TranscriptSegment( source_video_id=video.id, start_time=float(seg["start"]), end_time=float(seg["end"]), text=str(seg["text"]), segment_index=idx, )) session.commit() result = { "video_id": str(video.id), "creator_id": str(creator.id), "segment_count": len(data["segments"]), } finally: session.close() return result # ── Auth fixtures ──────────────────────────────────────────────────────────── _TEST_INVITE_CODE = "TEST-INVITE-2026" _TEST_EMAIL = "testuser@chrysopedia.com" _TEST_PASSWORD = "securepass123" @pytest_asyncio.fixture() async def invite_code(db_engine): """Create a test invite code in the DB and return the code string.""" factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) async with factory() as session: code = InviteCode(code=_TEST_INVITE_CODE, uses_remaining=10) session.add(code) await session.commit() return _TEST_INVITE_CODE @pytest_asyncio.fixture() async def registered_user(client, invite_code): """Register a user via the API and return the response dict.""" resp = await client.post("/api/v1/auth/register", json={ "email": _TEST_EMAIL, "password": _TEST_PASSWORD, "display_name": "Test User", "invite_code": invite_code, }) assert resp.status_code == 201 return resp.json() @pytest_asyncio.fixture() async def auth_headers(client, registered_user): """Log in and return Authorization headers dict.""" resp = await client.post("/api/v1/auth/login", json={ "email": _TEST_EMAIL, "password": _TEST_PASSWORD, }) assert resp.status_code == 200 token = resp.json()["access_token"] return {"Authorization": f"Bearer {token}"}