"""Pytest configuration and shared fixtures for fractafrag-api tests. Integration test infrastructure: - Async SQLite in-memory database (via aiosqlite) - FastAPI test client with dependency overrides - Auth dependency overrides (mock pro-tier user) - Celery worker mock (process_desire.delay → no-op) Environment variables are set BEFORE any app.* imports to ensure get_settings() picks up test values (database.py calls get_settings() at module scope with @lru_cache). """ import os import sys import uuid from pathlib import Path from unittest.mock import MagicMock, patch # ── 1. sys.path setup ───────────────────────────────────── _api_root = str(Path(__file__).resolve().parent.parent) if _api_root not in sys.path: sys.path.insert(0, _api_root) # ── 2. Set env vars BEFORE any app.* imports ────────────── # We do NOT override DATABASE_URL — the module-level engine in database.py # uses pool_size/max_overflow which are PostgreSQL-specific. The default # PostgreSQL URL creates an engine that never actually connects (no queries # hit it). Our integration tests override get_db with a test SQLite session. # We only set dummy values for env vars that cause validation failures. os.environ.setdefault("JWT_SECRET", "test-secret") os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0") os.environ.setdefault("BYOK_MASTER_KEY", "test-master-key-0123456789abcdef") # ── 3. Now safe to import app modules ───────────────────── import pytest # noqa: E402 import pytest_asyncio # noqa: E402 from httpx import ASGITransport, AsyncClient # noqa: E402 from sqlalchemy import event, text # noqa: E402 from sqlalchemy.ext.asyncio import ( # noqa: E402 AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.ext.compiler import compiles # noqa: E402 from pgvector.sqlalchemy import Vector # noqa: E402 from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB, ARRAY # noqa: E402 from app.database import Base, get_db # noqa: E402 from app.main import app # noqa: E402 from app.middleware.auth import get_current_user, require_tier # noqa: E402 from app.models.models import User # noqa: E402 # ── 4. SQLite type compilation overrides ────────────────── # pgvector Vector, PostgreSQL UUID, JSONB, and ARRAY don't exist in SQLite. # Register custom compilation rules so create_all() works. @compiles(Vector, "sqlite") def _compile_vector_sqlite(type_, compiler, **kw): """Render pgvector Vector as TEXT in SQLite.""" return "TEXT" # Override PostgreSQL UUID to TEXT for SQLite @compiles(PG_UUID, "sqlite") def _compile_pg_uuid_sqlite(type_, compiler, **kw): """Render PostgreSQL UUID as TEXT in SQLite (standard UUID is fine, dialect-specific isn't).""" return "TEXT" # Override JSONB to TEXT for SQLite @compiles(JSONB, "sqlite") def _compile_jsonb_sqlite(type_, compiler, **kw): """Render JSONB as TEXT in SQLite.""" return "TEXT" # Override ARRAY to TEXT for SQLite @compiles(ARRAY, "sqlite") def _compile_array_sqlite(type_, compiler, **kw): """Render PostgreSQL ARRAY as TEXT in SQLite.""" return "TEXT" # Register Python uuid.UUID as a SQLite adapter so raw text() queries # can bind UUID parameters without "type 'UUID' is not supported" errors. # IMPORTANT: Use .hex (no hyphens) to match SQLAlchemy's UUID storage format in SQLite. # Also register list adapter so ARRAY columns (compiled as TEXT in SQLite) # can bind Python lists without "type 'list' is not supported" errors. import json as _json # noqa: E402 import sqlite3 # noqa: E402 sqlite3.register_adapter(uuid.UUID, lambda u: u.hex) sqlite3.register_adapter(list, lambda lst: _json.dumps(lst)) # ── 5. Test database engine and session fixtures ────────── # Shared test user ID — consistent across all integration tests TEST_USER_ID = uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") @pytest_asyncio.fixture(scope="session") async def db_engine(): """Create an async SQLite engine and all tables. Session-scoped.""" engine = create_async_engine( "sqlite+aiosqlite://", echo=False, # SQLite doesn't support pool_size/max_overflow ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield engine async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await engine.dispose() @pytest_asyncio.fixture async def db_session(db_engine): """Yield a fresh AsyncSession per test. Rolls back after each test for isolation.""" session_factory = async_sessionmaker( db_engine, class_=AsyncSession, expire_on_commit=False ) async with session_factory() as session: # Start a nested transaction so we can roll back after the test async with session.begin(): yield session # Rollback ensures test isolation — no committed state leaks between tests await session.rollback() # ── 6. Mock user fixture ───────────────────────────────── @pytest.fixture def test_user(): """Return a mock User object for auth dependency overrides.""" user = MagicMock(spec=User) user.id = TEST_USER_ID user.username = "testuser" user.email = "testuser@test.com" user.role = "user" user.subscription_tier = "pro" user.is_system = False user.trust_tier = "standard" return user # ── 7. FastAPI test client fixture ──────────────────────── @pytest_asyncio.fixture async def client(db_session, test_user): """Async HTTP client wired to the FastAPI app with dependency overrides. Overrides: - get_db → yields the test db_session - get_current_user → returns test_user (pro tier) - require_tier → returns test_user unconditionally (bypasses tier check) - process_desire.delay → no-op (prevents Celery/Redis connection) """ # Override get_db to yield test session async def _override_get_db(): yield db_session # Override get_current_user to return mock user async def _override_get_current_user(): return test_user app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[get_current_user] = _override_get_current_user # require_tier is a factory that returns inner functions depending on # get_current_user. Since we override get_current_user to return a pro-tier # user, the tier check inside require_tier will pass naturally. # We still need to mock process_desire to prevent Celery/Redis connection. with patch("app.worker.process_desire") as mock_task: # process_desire.delay() should be a no-op mock_task.delay = MagicMock(return_value=None) transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac # Clean up dependency overrides after test app.dependency_overrides.clear()