Completed slices: - S01: Desire Embedding & Clustering - S02: Fulfillment Flow & Frontend Branch: milestone/M001
190 lines
7.1 KiB
Python
190 lines
7.1 KiB
Python
"""Pytest configuration and shared fixtures for fractafrag-api tests.
|
|
|
|
Integration test infrastructure:
|
|
- Async SQLite in-memory database (via aiosqlite)
|
|
- FastAPI test client with dependency overrides
|
|
- Auth dependency overrides (mock pro-tier user)
|
|
- Celery worker mock (process_desire.delay → no-op)
|
|
|
|
Environment variables are set BEFORE any app.* imports to ensure
|
|
get_settings() picks up test values (database.py calls get_settings()
|
|
at module scope with @lru_cache).
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
# ── 1. sys.path setup ─────────────────────────────────────
|
|
_api_root = str(Path(__file__).resolve().parent.parent)
|
|
if _api_root not in sys.path:
|
|
sys.path.insert(0, _api_root)
|
|
|
|
# ── 2. Set env vars BEFORE any app.* imports ──────────────
|
|
# We do NOT override DATABASE_URL — the module-level engine in database.py
|
|
# uses pool_size/max_overflow which are PostgreSQL-specific. The default
|
|
# PostgreSQL URL creates an engine that never actually connects (no queries
|
|
# hit it). Our integration tests override get_db with a test SQLite session.
|
|
# We only set dummy values for env vars that cause validation failures.
|
|
os.environ.setdefault("JWT_SECRET", "test-secret")
|
|
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0")
|
|
os.environ.setdefault("BYOK_MASTER_KEY", "test-master-key-0123456789abcdef")
|
|
|
|
# ── 3. Now safe to import app modules ─────────────────────
|
|
import pytest # noqa: E402
|
|
import pytest_asyncio # noqa: E402
|
|
from httpx import ASGITransport, AsyncClient # noqa: E402
|
|
from sqlalchemy import event, text # noqa: E402
|
|
from sqlalchemy.ext.asyncio import ( # noqa: E402
|
|
AsyncSession,
|
|
async_sessionmaker,
|
|
create_async_engine,
|
|
)
|
|
from sqlalchemy.ext.compiler import compiles # noqa: E402
|
|
from pgvector.sqlalchemy import Vector # noqa: E402
|
|
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB, ARRAY # noqa: E402
|
|
|
|
from app.database import Base, get_db # noqa: E402
|
|
from app.main import app # noqa: E402
|
|
from app.middleware.auth import get_current_user, require_tier # noqa: E402
|
|
from app.models.models import User # noqa: E402
|
|
|
|
|
|
# ── 4. SQLite type compilation overrides ──────────────────
|
|
# pgvector Vector, PostgreSQL UUID, JSONB, and ARRAY don't exist in SQLite.
|
|
# Register custom compilation rules so create_all() works.
|
|
|
|
@compiles(Vector, "sqlite")
|
|
def _compile_vector_sqlite(type_, compiler, **kw):
|
|
"""Render pgvector Vector as TEXT in SQLite."""
|
|
return "TEXT"
|
|
|
|
|
|
# Override PostgreSQL UUID to TEXT for SQLite
|
|
@compiles(PG_UUID, "sqlite")
|
|
def _compile_pg_uuid_sqlite(type_, compiler, **kw):
|
|
"""Render PostgreSQL UUID as TEXT in SQLite (standard UUID is fine, dialect-specific isn't)."""
|
|
return "TEXT"
|
|
|
|
|
|
# Override JSONB to TEXT for SQLite
|
|
@compiles(JSONB, "sqlite")
|
|
def _compile_jsonb_sqlite(type_, compiler, **kw):
|
|
"""Render JSONB as TEXT in SQLite."""
|
|
return "TEXT"
|
|
|
|
|
|
# Override ARRAY to TEXT for SQLite
|
|
@compiles(ARRAY, "sqlite")
|
|
def _compile_array_sqlite(type_, compiler, **kw):
|
|
"""Render PostgreSQL ARRAY as TEXT in SQLite."""
|
|
return "TEXT"
|
|
|
|
|
|
# Register Python uuid.UUID as a SQLite adapter so raw text() queries
|
|
# can bind UUID parameters without "type 'UUID' is not supported" errors.
|
|
# IMPORTANT: Use .hex (no hyphens) to match SQLAlchemy's UUID storage format in SQLite.
|
|
# Also register list adapter so ARRAY columns (compiled as TEXT in SQLite)
|
|
# can bind Python lists without "type 'list' is not supported" errors.
|
|
import json as _json # noqa: E402
|
|
import sqlite3 # noqa: E402
|
|
sqlite3.register_adapter(uuid.UUID, lambda u: u.hex)
|
|
sqlite3.register_adapter(list, lambda lst: _json.dumps(lst))
|
|
|
|
|
|
# ── 5. Test database engine and session fixtures ──────────
|
|
|
|
# Shared test user ID — consistent across all integration tests
|
|
TEST_USER_ID = uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="session")
|
|
async def db_engine():
|
|
"""Create an async SQLite engine and all tables. Session-scoped."""
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite://",
|
|
echo=False,
|
|
# SQLite doesn't support pool_size/max_overflow
|
|
)
|
|
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_session(db_engine):
|
|
"""Yield a fresh AsyncSession per test. Rolls back after each test for isolation."""
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
# Start a nested transaction so we can roll back after the test
|
|
async with session.begin():
|
|
yield session
|
|
# Rollback ensures test isolation — no committed state leaks between tests
|
|
await session.rollback()
|
|
|
|
|
|
# ── 6. Mock user fixture ─────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def test_user():
|
|
"""Return a mock User object for auth dependency overrides."""
|
|
user = MagicMock(spec=User)
|
|
user.id = TEST_USER_ID
|
|
user.username = "testuser"
|
|
user.email = "testuser@test.com"
|
|
user.role = "user"
|
|
user.subscription_tier = "pro"
|
|
user.is_system = False
|
|
user.trust_tier = "standard"
|
|
return user
|
|
|
|
|
|
# ── 7. FastAPI test client fixture ────────────────────────
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client(db_session, test_user):
|
|
"""Async HTTP client wired to the FastAPI app with dependency overrides.
|
|
|
|
Overrides:
|
|
- get_db → yields the test db_session
|
|
- get_current_user → returns test_user (pro tier)
|
|
- require_tier → returns test_user unconditionally (bypasses tier check)
|
|
- process_desire.delay → no-op (prevents Celery/Redis connection)
|
|
"""
|
|
|
|
# Override get_db to yield test session
|
|
async def _override_get_db():
|
|
yield db_session
|
|
|
|
# Override get_current_user to return mock user
|
|
async def _override_get_current_user():
|
|
return test_user
|
|
|
|
app.dependency_overrides[get_db] = _override_get_db
|
|
app.dependency_overrides[get_current_user] = _override_get_current_user
|
|
|
|
# require_tier is a factory that returns inner functions depending on
|
|
# get_current_user. Since we override get_current_user to return a pro-tier
|
|
# user, the tier check inside require_tier will pass naturally.
|
|
# We still need to mock process_desire to prevent Celery/Redis connection.
|
|
with patch("app.worker.process_desire") as mock_task:
|
|
# process_desire.delay() should be a no-op
|
|
mock_task.delay = MagicMock(return_value=None)
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
yield ac
|
|
|
|
# Clean up dependency overrides after test
|
|
app.dependency_overrides.clear()
|