From 848fb06407dbaf1abd922e215d52f080e4be6788 Mon Sep 17 00:00:00 2001 From: John Lightner Date: Tue, 7 Apr 2026 01:59:24 -0500 Subject: [PATCH] MAESTRO: Create backend/auth.py with JWT, API key auth, and first-boot setup flow --- Auto Run Docs/01-scaffold.md | 3 +- backend/auth.py | 154 +++++++++++++++++++++++ backend/tests/test_auth.py | 238 +++++++++++++++++++++++++++++++++++ 3 files changed, 394 insertions(+), 1 deletion(-) create mode 100644 backend/auth.py create mode 100644 backend/tests/test_auth.py diff --git a/Auto Run Docs/01-scaffold.md b/Auto Run Docs/01-scaffold.md index 8107b5d..02e2b42 100644 --- a/Auto Run Docs/01-scaffold.md +++ b/Auto Run Docs/01-scaffold.md @@ -32,7 +32,8 @@ Set up the PromptLooper repository, Docker infrastructure, and basic project ske - [x] Create backend/main.py with the FastAPI application. Set up CORS middleware, mount all routers (even if they're stubs), configure the WebSocket endpoint, add the /health endpoint that checks DB and Redis connectivity, and add startup/shutdown lifecycle hooks. > Created backend/main.py with: CORS middleware (allow all origins), /health endpoint checking DB (SELECT 1) and Redis (ping) connectivity, /ws WebSocket endpoint with ConnectionManager for real-time broadcasts, async lifespan hooks for DB engine + Redis init/teardown, get_db dependency yielding sessions, dynamic router mounting (silently skips missing routers). 10 tests in tests/test_main.py covering health, CORS, WebSocket connect/disconnect/echo, OpenAPI schema, 404s, broadcast, get_db, and get_redis. All 74 backend tests pass. -- [ ] Create backend/auth.py implementing JWT token generation/verification, API key validation, and the first-boot setup flow. The setup endpoint should check if any users exist — if not, accept username + password to create the admin account. Include a dependency function for route-level auth that supports both JWT and API key. +- [x] Create backend/auth.py implementing JWT token generation/verification, API key validation, and the first-boot setup flow. The setup endpoint should check if any users exist — if not, accept username + password to create the admin account. Include a dependency function for route-level auth that supports both JWT and API key. + > Created backend/auth.py with: bcrypt password hashing via passlib, JWT token creation/verification (HS256, 24h expiry) using python-jose, first-boot `needs_setup()` + `create_admin()` flow (409 if admin exists), `authenticate_user()` for login, and `get_current_user` FastAPI dependency supporting both JWT Bearer tokens and X-Api-Key header (API key grants first admin user). UUID string-to-UUID conversion for SQLite compatibility. 21 tests in tests/test_auth.py covering hashing, JWT lifecycle, setup flow, login, and all auth dependency paths. All 95 backend tests pass. - [ ] Scaffold all router files in backend/routers/ as stubs: auth.py, projects.py, experiments.py, runs.py, endpoints.py, export.py, webhooks.py, admin.py. Each should have the correct APIRouter prefix and tags, with placeholder endpoints that return 501 Not Implemented. diff --git a/backend/auth.py b/backend/auth.py new file mode 100644 index 0000000..75de1c6 --- /dev/null +++ b/backend/auth.py @@ -0,0 +1,154 @@ +"""PromptLooper authentication — JWT tokens, API keys, first-boot setup.""" + +import uuid as _uuid +from datetime import datetime, timedelta, timezone +from typing import Generator + +from fastapi import Depends, HTTPException, Header, status +from jose import JWTError, jwt +from passlib.context import CryptContext +from sqlalchemy.orm import Session + +from config import settings +from models import User + +# --------------------------------------------------------------------------- +# Password hashing +# --------------------------------------------------------------------------- + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +def hash_password(password: str) -> str: + return pwd_context.hash(password) + + +def verify_password(plain: str, hashed: str) -> bool: + return pwd_context.verify(plain, hashed) + + +# --------------------------------------------------------------------------- +# JWT +# --------------------------------------------------------------------------- + +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours + + +def create_access_token(user_id: str, *, expires_delta: timedelta | None = None) -> str: + expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) + payload = {"sub": user_id, "exp": expire} + return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) + + +def decode_access_token(token: str) -> str: + """Return the user_id (sub) from a valid JWT, or raise.""" + try: + payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM]) + user_id: str | None = payload.get("sub") + if user_id is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + return user_id + except JWTError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + + +# --------------------------------------------------------------------------- +# First-boot setup +# --------------------------------------------------------------------------- + +def needs_setup(db: Session) -> bool: + """Return True if no users exist yet (first-boot state).""" + return db.query(User).count() == 0 + + +def create_admin(db: Session, username: str, password: str) -> User: + """Create the first admin user. Raises if users already exist.""" + if not needs_setup(db): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Admin account already exists", + ) + user = User( + username=username, + password_hash=hash_password(password), + is_admin=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +# --------------------------------------------------------------------------- +# Authenticate (login) +# --------------------------------------------------------------------------- + +def authenticate_user(db: Session, username: str, password: str) -> User: + """Verify credentials and return the User, or raise 401.""" + user = db.query(User).filter(User.username == username).first() + if user is None or not verify_password(password, user.password_hash): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") + return user + + +# --------------------------------------------------------------------------- +# Database session dependency (local to avoid circular import with main.py) +# --------------------------------------------------------------------------- + +def _get_db() -> Generator[Session, None, None]: + """Yield a DB session. Imported lazily from main to avoid circular import.""" + from main import get_db + yield from get_db() + + +# --------------------------------------------------------------------------- +# Dependency: get current user (JWT or API key) +# --------------------------------------------------------------------------- + +def get_current_user( + authorization: str | None = Header(None), + x_api_key: str | None = Header(None), + db: Session = Depends(_get_db), +) -> User: + """FastAPI dependency — resolve the current user from JWT Bearer token or API key. + + Priority: + 1. X-Api-Key header — matched against settings.api_key (grants first admin). + 2. Authorization: Bearer — decoded to get user_id. + """ + # --- API key path --- + if x_api_key is not None: + if settings.api_key is None or x_api_key != settings.api_key: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") + # API key grants the first admin user + admin = db.query(User).filter(User.is_admin.is_(True)).first() + if admin is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="No admin user exists") + return admin + + # --- JWT path --- + if authorization is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing authentication", + headers={"WWW-Authenticate": "Bearer"}, + ) + + scheme, _, token = authorization.partition(" ") + if scheme.lower() != "bearer" or not token: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authorization header", + headers={"WWW-Authenticate": "Bearer"}, + ) + + user_id_str = decode_access_token(token) + try: + user_id = _uuid.UUID(user_id_str) + except ValueError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + user = db.query(User).filter(User.id == user_id).first() + if user is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") + return user diff --git a/backend/tests/test_auth.py b/backend/tests/test_auth.py new file mode 100644 index 0000000..92218cc --- /dev/null +++ b/backend/tests/test_auth.py @@ -0,0 +1,238 @@ +"""Tests for backend/auth.py — JWT, API key, setup flow, and auth dependency.""" + +import os +from datetime import timedelta +from unittest.mock import patch + +import pytest +from fastapi import FastAPI, Depends +from fastapi.testclient import TestClient + + +@pytest.fixture(autouse=True) +def _isolate_settings(tmp_path): + """Ensure tests use a temp SQLite DB and no Redis.""" + env = { + "DATABASE_URL": f"sqlite:///{tmp_path / 'test.db'}", + "REDIS_URL": "", + "DATA_DIR": str(tmp_path), + "JWT_SECRET": "test-secret-key-for-jwt-signing", + "API_KEY": "test-api-key-12345", + } + with patch.dict(os.environ, env, clear=False): + import config + new_settings = config.Settings(_env_file=None) + config.settings = new_settings + + import main + main.settings = new_settings + main._init_db() + main._init_redis() + + from models import Base + Base.metadata.create_all(bind=main.engine) + + # Also patch auth module's settings reference + import auth + auth.settings = new_settings + + yield + + +@pytest.fixture +def db_session(): + from main import get_db + gen = get_db() + session = next(gen) + yield session + try: + next(gen) + except StopIteration: + pass + + +# --------------------------------------------------------------------------- +# Password hashing +# --------------------------------------------------------------------------- + +class TestPasswordHashing: + def test_hash_and_verify(self): + from auth import hash_password, verify_password + hashed = hash_password("my-secret-password") + assert hashed != "my-secret-password" + assert verify_password("my-secret-password", hashed) + + def test_wrong_password_fails(self): + from auth import hash_password, verify_password + hashed = hash_password("correct-password") + assert not verify_password("wrong-password", hashed) + + +# --------------------------------------------------------------------------- +# JWT +# --------------------------------------------------------------------------- + +class TestJWT: + def test_create_and_decode_token(self): + from auth import create_access_token, decode_access_token + token = create_access_token("user-123") + assert decode_access_token(token) == "user-123" + + def test_expired_token_raises(self): + from auth import create_access_token, decode_access_token + token = create_access_token("user-123", expires_delta=timedelta(seconds=-1)) + with pytest.raises(Exception) as exc_info: + decode_access_token(token) + assert exc_info.value.status_code == 401 + + def test_invalid_token_raises(self): + from auth import decode_access_token + with pytest.raises(Exception) as exc_info: + decode_access_token("not-a-valid-token") + assert exc_info.value.status_code == 401 + + def test_token_without_sub_raises(self): + from jose import jwt + import config + token = jwt.encode({"foo": "bar"}, config.settings.jwt_secret, algorithm="HS256") + from auth import decode_access_token + with pytest.raises(Exception) as exc_info: + decode_access_token(token) + assert exc_info.value.status_code == 401 + + +# --------------------------------------------------------------------------- +# First-boot setup +# --------------------------------------------------------------------------- + +class TestSetup: + def test_needs_setup_true_when_no_users(self, db_session): + from auth import needs_setup + assert needs_setup(db_session) is True + + def test_create_admin_succeeds(self, db_session): + from auth import create_admin, needs_setup + user = create_admin(db_session, "admin", "password123") + assert user.username == "admin" + assert user.is_admin is True + assert needs_setup(db_session) is False + + def test_create_admin_twice_raises_409(self, db_session): + from auth import create_admin + create_admin(db_session, "admin", "password123") + with pytest.raises(Exception) as exc_info: + create_admin(db_session, "admin2", "password456") + assert exc_info.value.status_code == 409 + + def test_admin_password_is_hashed(self, db_session): + from auth import create_admin + user = create_admin(db_session, "admin", "password123") + assert user.password_hash != "password123" + assert user.password_hash.startswith("$2b$") + + +# --------------------------------------------------------------------------- +# Authenticate user (login) +# --------------------------------------------------------------------------- + +class TestAuthenticateUser: + def test_valid_credentials(self, db_session): + from auth import create_admin, authenticate_user + create_admin(db_session, "admin", "password123") + user = authenticate_user(db_session, "admin", "password123") + assert user.username == "admin" + + def test_wrong_password_raises_401(self, db_session): + from auth import create_admin, authenticate_user + create_admin(db_session, "admin", "password123") + with pytest.raises(Exception) as exc_info: + authenticate_user(db_session, "admin", "wrong") + assert exc_info.value.status_code == 401 + + def test_unknown_user_raises_401(self, db_session): + from auth import authenticate_user + with pytest.raises(Exception) as exc_info: + authenticate_user(db_session, "nonexistent", "password") + assert exc_info.value.status_code == 401 + + +# --------------------------------------------------------------------------- +# get_current_user dependency (integration via test app) +# --------------------------------------------------------------------------- + +@pytest.fixture +def auth_app(): + """Create a minimal FastAPI app with a protected endpoint for testing auth.""" + from auth import get_current_user + from schemas import UserResponse + + test_app = FastAPI() + + @test_app.get("/protected") + def protected(user=Depends(get_current_user)): + return {"user_id": str(user.id), "username": user.username} + + return test_app + + +@pytest.fixture +def auth_client(auth_app): + return TestClient(auth_app) + + +class TestGetCurrentUser: + def test_no_auth_returns_401(self, auth_client): + resp = auth_client.get("/protected") + assert resp.status_code == 401 + assert "Missing authentication" in resp.json()["detail"] + + def test_invalid_bearer_format_returns_401(self, auth_client): + resp = auth_client.get("/protected", headers={"Authorization": "NotBearer token"}) + assert resp.status_code == 401 + + def test_jwt_auth_succeeds(self, auth_client, db_session): + from auth import create_admin, create_access_token + user = create_admin(db_session, "admin", "password123") + token = create_access_token(str(user.id)) + resp = auth_client.get("/protected", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + assert resp.json()["username"] == "admin" + + def test_jwt_for_deleted_user_returns_401(self, auth_client, db_session): + from auth import create_access_token + import uuid + token = create_access_token(str(uuid.uuid4())) + resp = auth_client.get("/protected", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 401 + + def test_api_key_auth_succeeds(self, auth_client, db_session): + from auth import create_admin + create_admin(db_session, "admin", "password123") + resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"}) + assert resp.status_code == 200 + assert resp.json()["username"] == "admin" + + def test_wrong_api_key_returns_401(self, auth_client): + resp = auth_client.get("/protected", headers={"X-Api-Key": "wrong-key"}) + assert resp.status_code == 401 + + def test_api_key_without_admin_returns_401(self, auth_client): + # No admin user created yet + resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"}) + assert resp.status_code == 401 + + def test_api_key_disabled_when_not_configured(self, auth_client, db_session): + """When API_KEY is not set in config, API key auth should fail.""" + from auth import create_admin + import config, auth + create_admin(db_session, "admin", "password123") + + old_key = config.settings.api_key + config.settings.api_key = None + auth.settings = config.settings + try: + resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"}) + assert resp.status_code == 401 + finally: + config.settings.api_key = old_key + auth.settings = config.settings