379 lines
12 KiB
Python
379 lines
12 KiB
Python
"""Shared fixtures for Chrysopedia integration tests.
|
|
|
|
Provides:
|
|
- Async SQLAlchemy engine/session against a real PostgreSQL test database
|
|
- Sync SQLAlchemy engine/session for pipeline stage tests (Celery stages are sync)
|
|
- httpx.AsyncClient wired to the FastAPI app with dependency overrides
|
|
- Pre-ingest fixture for pipeline tests
|
|
- Sample transcript fixture path and temporary storage directory
|
|
|
|
Key design choice: function-scoped engine with NullPool avoids asyncpg
|
|
"another operation in progress" errors caused by session-scoped connection
|
|
reuse between the ASGI test client and verification queries.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
from sqlalchemy.pool import NullPool
|
|
|
|
# Ensure backend/ is on sys.path so "from models import ..." works
|
|
import sys
|
|
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
|
|
|
|
from database import Base, get_session # noqa: E402
|
|
from main import app # noqa: E402
|
|
from models import ( # noqa: E402
|
|
ContentType,
|
|
Creator,
|
|
InviteCode,
|
|
ProcessingStatus,
|
|
SourceVideo,
|
|
TranscriptSegment,
|
|
User,
|
|
UserRole,
|
|
VideoConsent,
|
|
)
|
|
|
|
TEST_DATABASE_URL = os.getenv(
|
|
"TEST_DATABASE_URL",
|
|
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
|
|
)
|
|
|
|
TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace(
|
|
"postgresql+asyncpg://", "postgresql+psycopg2://"
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def db_engine():
|
|
"""Create a per-test async engine (NullPool) and create/drop all tables."""
|
|
engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool)
|
|
|
|
# Create all tables fresh for each test
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
# Drop all tables after test
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def client(db_engine, tmp_path):
|
|
"""Async HTTP test client wired to FastAPI with dependency overrides."""
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
|
|
async def _override_get_session():
|
|
async with session_factory() as session:
|
|
yield session
|
|
|
|
# Override DB session dependency
|
|
app.dependency_overrides[get_session] = _override_get_session
|
|
|
|
# Override transcript_storage_path via environment variable
|
|
os.environ["TRANSCRIPT_STORAGE_PATH"] = str(tmp_path)
|
|
# Clear the lru_cache so Settings picks up the new env var
|
|
from config import get_settings
|
|
get_settings.cache_clear()
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
|
|
yield ac
|
|
|
|
# Teardown: clean overrides and restore settings cache
|
|
app.dependency_overrides.clear()
|
|
os.environ.pop("TRANSCRIPT_STORAGE_PATH", None)
|
|
get_settings.cache_clear()
|
|
|
|
|
|
@pytest.fixture()
|
|
def sample_transcript_path() -> pathlib.Path:
|
|
"""Path to the sample 5-segment transcript JSON fixture."""
|
|
return pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
|
|
|
|
|
|
@pytest.fixture()
|
|
def tmp_transcript_dir(tmp_path) -> pathlib.Path:
|
|
"""Temporary directory for transcript storage during tests."""
|
|
return tmp_path
|
|
|
|
|
|
# ── Sync engine/session for pipeline stages ──────────────────────────────────
|
|
|
|
|
|
@pytest.fixture()
|
|
def sync_engine(db_engine):
|
|
"""Create a sync SQLAlchemy engine pointing at the test database.
|
|
|
|
Tables are already created/dropped by the async ``db_engine`` fixture,
|
|
so this fixture just wraps a sync engine around the same DB URL.
|
|
"""
|
|
engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool)
|
|
yield engine
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture()
|
|
def sync_session(sync_engine) -> Session:
|
|
"""Create a sync SQLAlchemy session for pipeline stage tests."""
|
|
factory = sessionmaker(bind=sync_engine)
|
|
session = factory()
|
|
yield session
|
|
session.close()
|
|
|
|
|
|
# ── Pre-ingest fixture for pipeline tests ────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture()
|
|
def pre_ingested_video(sync_engine):
|
|
"""Ingest the sample transcript directly into the test DB via sync ORM.
|
|
|
|
Returns a dict with ``video_id``, ``creator_id``, and ``segment_count``.
|
|
"""
|
|
factory = sessionmaker(bind=sync_engine)
|
|
session = factory()
|
|
try:
|
|
# Create creator
|
|
creator = Creator(
|
|
name="Skope",
|
|
slug="skope",
|
|
folder_name="Skope",
|
|
)
|
|
session.add(creator)
|
|
session.flush()
|
|
|
|
# Create video
|
|
video = SourceVideo(
|
|
creator_id=creator.id,
|
|
filename="mixing-basics-ep1.mp4",
|
|
file_path="Skope/mixing-basics-ep1.mp4",
|
|
duration_seconds=1234,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.transcribed,
|
|
)
|
|
session.add(video)
|
|
session.flush()
|
|
|
|
# Create transcript segments
|
|
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
|
|
data = json.loads(sample.read_text())
|
|
for idx, seg in enumerate(data["segments"]):
|
|
session.add(TranscriptSegment(
|
|
source_video_id=video.id,
|
|
start_time=float(seg["start"]),
|
|
end_time=float(seg["end"]),
|
|
text=str(seg["text"]),
|
|
segment_index=idx,
|
|
))
|
|
|
|
session.commit()
|
|
|
|
result = {
|
|
"video_id": str(video.id),
|
|
"creator_id": str(creator.id),
|
|
"segment_count": len(data["segments"]),
|
|
}
|
|
finally:
|
|
session.close()
|
|
|
|
return result
|
|
|
|
|
|
# ── Auth fixtures ────────────────────────────────────────────────────────────
|
|
|
|
_TEST_INVITE_CODE = "TEST-INVITE-2026"
|
|
_TEST_EMAIL = "testuser@chrysopedia.com"
|
|
_TEST_PASSWORD = "securepass123"
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def invite_code(db_engine):
|
|
"""Create a test invite code in the DB and return the code string."""
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with factory() as session:
|
|
code = InviteCode(code=_TEST_INVITE_CODE, uses_remaining=10)
|
|
session.add(code)
|
|
await session.commit()
|
|
return _TEST_INVITE_CODE
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def registered_user(client, invite_code):
|
|
"""Register a user via the API and return the response dict."""
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": _TEST_EMAIL,
|
|
"password": _TEST_PASSWORD,
|
|
"display_name": "Test User",
|
|
"invite_code": invite_code,
|
|
})
|
|
assert resp.status_code == 201
|
|
return resp.json()
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def auth_headers(client, registered_user):
|
|
"""Log in and return Authorization headers dict."""
|
|
resp = await client.post("/api/v1/auth/login", json={
|
|
"email": _TEST_EMAIL,
|
|
"password": _TEST_PASSWORD,
|
|
})
|
|
assert resp.status_code == 200
|
|
token = resp.json()["access_token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
# ── Consent test fixtures ────────────────────────────────────────────────────
|
|
|
|
_CONSENT_INVITE = "CONSENT-INV-2026"
|
|
_CREATOR_EMAIL = "creator@chrysopedia.com"
|
|
_CREATOR_PASSWORD = "creatorpass123"
|
|
_ADMIN_EMAIL = "admin@chrysopedia.com"
|
|
_ADMIN_PASSWORD = "adminpass123"
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def creator_with_videos(db_engine):
|
|
"""Create a Creator with 2 SourceVideos. Returns dict with IDs."""
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with factory() as session:
|
|
creator = Creator(
|
|
name="TestCreator",
|
|
slug="testcreator",
|
|
folder_name="TestCreator",
|
|
)
|
|
session.add(creator)
|
|
await session.flush()
|
|
|
|
video1 = SourceVideo(
|
|
creator_id=creator.id,
|
|
filename="video-one.mp4",
|
|
file_path="TestCreator/video-one.mp4",
|
|
duration_seconds=600,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.not_started,
|
|
)
|
|
video2 = SourceVideo(
|
|
creator_id=creator.id,
|
|
filename="video-two.mp4",
|
|
file_path="TestCreator/video-two.mp4",
|
|
duration_seconds=900,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.not_started,
|
|
)
|
|
session.add_all([video1, video2])
|
|
await session.flush()
|
|
|
|
result = {
|
|
"creator_id": creator.id,
|
|
"video_ids": [video1.id, video2.id],
|
|
}
|
|
await session.commit()
|
|
return result
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def creator_user_auth(client, db_engine, creator_with_videos):
|
|
"""Register a user linked to the test creator, return auth headers."""
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
# Create invite code for this fixture
|
|
async with factory() as session:
|
|
code = InviteCode(code=_CONSENT_INVITE, uses_remaining=10)
|
|
session.add(code)
|
|
await session.commit()
|
|
|
|
# Register user
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": _CREATOR_EMAIL,
|
|
"password": _CREATOR_PASSWORD,
|
|
"display_name": "Creator User",
|
|
"invite_code": _CONSENT_INVITE,
|
|
})
|
|
assert resp.status_code == 201
|
|
user_data = resp.json()
|
|
|
|
# Link user to creator via direct DB update
|
|
async with factory() as session:
|
|
from sqlalchemy import update
|
|
await session.execute(
|
|
update(User)
|
|
.where(User.id == uuid.UUID(user_data["id"]))
|
|
.values(creator_id=creator_with_videos["creator_id"])
|
|
)
|
|
await session.commit()
|
|
|
|
# Login to get token
|
|
resp = await client.post("/api/v1/auth/login", json={
|
|
"email": _CREATOR_EMAIL,
|
|
"password": _CREATOR_PASSWORD,
|
|
})
|
|
assert resp.status_code == 200
|
|
token = resp.json()["access_token"]
|
|
return {
|
|
"headers": {"Authorization": f"Bearer {token}"},
|
|
"user_id": user_data["id"],
|
|
"creator_id": creator_with_videos["creator_id"],
|
|
"video_ids": creator_with_videos["video_ids"],
|
|
}
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def admin_auth(client, db_engine):
|
|
"""Register an admin-role user, return auth headers."""
|
|
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
# Create invite code
|
|
async with factory() as session:
|
|
code = InviteCode(code="ADMIN-INV-2026", uses_remaining=10)
|
|
session.add(code)
|
|
await session.commit()
|
|
|
|
# Register user
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": _ADMIN_EMAIL,
|
|
"password": _ADMIN_PASSWORD,
|
|
"display_name": "Admin User",
|
|
"invite_code": "ADMIN-INV-2026",
|
|
})
|
|
assert resp.status_code == 201
|
|
user_data = resp.json()
|
|
|
|
# Promote to admin via direct DB update
|
|
async with factory() as session:
|
|
from sqlalchemy import update
|
|
await session.execute(
|
|
update(User)
|
|
.where(User.id == uuid.UUID(user_data["id"]))
|
|
.values(role=UserRole.admin)
|
|
)
|
|
await session.commit()
|
|
|
|
# Login to get token (token reflects old role, but DB has admin)
|
|
resp = await client.post("/api/v1/auth/login", json={
|
|
"email": _ADMIN_EMAIL,
|
|
"password": _ADMIN_PASSWORD,
|
|
})
|
|
assert resp.status_code == 200
|
|
token = resp.json()["access_token"]
|
|
return {
|
|
"headers": {"Authorization": f"Bearer {token}"},
|
|
"user_id": user_data["id"],
|
|
}
|
|
|