MAESTRO: Create backend/auth.py with JWT, API key auth, and first-boot setup flow

This commit is contained in:
John Lightner 2026-04-07 01:59:24 -05:00
parent 15ca2c922a
commit 848fb06407
3 changed files with 394 additions and 1 deletions

View file

@ -32,7 +32,8 @@ Set up the PromptLooper repository, Docker infrastructure, and basic project ske
- [x] Create backend/main.py with the FastAPI application. Set up CORS middleware, mount all routers (even if they're stubs), configure the WebSocket endpoint, add the /health endpoint that checks DB and Redis connectivity, and add startup/shutdown lifecycle hooks.
> Created backend/main.py with: CORS middleware (allow all origins), /health endpoint checking DB (SELECT 1) and Redis (ping) connectivity, /ws WebSocket endpoint with ConnectionManager for real-time broadcasts, async lifespan hooks for DB engine + Redis init/teardown, get_db dependency yielding sessions, dynamic router mounting (silently skips missing routers). 10 tests in tests/test_main.py covering health, CORS, WebSocket connect/disconnect/echo, OpenAPI schema, 404s, broadcast, get_db, and get_redis. All 74 backend tests pass.
- [ ] Create backend/auth.py implementing JWT token generation/verification, API key validation, and the first-boot setup flow. The setup endpoint should check if any users exist — if not, accept username + password to create the admin account. Include a dependency function for route-level auth that supports both JWT and API key.
- [x] Create backend/auth.py implementing JWT token generation/verification, API key validation, and the first-boot setup flow. The setup endpoint should check if any users exist — if not, accept username + password to create the admin account. Include a dependency function for route-level auth that supports both JWT and API key.
> Created backend/auth.py with: bcrypt password hashing via passlib, JWT token creation/verification (HS256, 24h expiry) using python-jose, first-boot `needs_setup()` + `create_admin()` flow (409 if admin exists), `authenticate_user()` for login, and `get_current_user` FastAPI dependency supporting both JWT Bearer tokens and X-Api-Key header (API key grants first admin user). UUID string-to-UUID conversion for SQLite compatibility. 21 tests in tests/test_auth.py covering hashing, JWT lifecycle, setup flow, login, and all auth dependency paths. All 95 backend tests pass.
- [ ] Scaffold all router files in backend/routers/ as stubs: auth.py, projects.py, experiments.py, runs.py, endpoints.py, export.py, webhooks.py, admin.py. Each should have the correct APIRouter prefix and tags, with placeholder endpoints that return 501 Not Implemented.

154
backend/auth.py Normal file
View file

@ -0,0 +1,154 @@
"""PromptLooper authentication — JWT tokens, API keys, first-boot setup."""
import uuid as _uuid
from datetime import datetime, timedelta, timezone
from typing import Generator
from fastapi import Depends, HTTPException, Header, status
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from config import settings
from models import User
# ---------------------------------------------------------------------------
# Password hashing
# ---------------------------------------------------------------------------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# ---------------------------------------------------------------------------
# JWT
# ---------------------------------------------------------------------------
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
def create_access_token(user_id: str, *, expires_delta: timedelta | None = None) -> str:
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
payload = {"sub": user_id, "exp": expire}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_access_token(token: str) -> str:
"""Return the user_id (sub) from a valid JWT, or raise."""
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
user_id: str | None = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
return user_id
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
# ---------------------------------------------------------------------------
# First-boot setup
# ---------------------------------------------------------------------------
def needs_setup(db: Session) -> bool:
"""Return True if no users exist yet (first-boot state)."""
return db.query(User).count() == 0
def create_admin(db: Session, username: str, password: str) -> User:
"""Create the first admin user. Raises if users already exist."""
if not needs_setup(db):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Admin account already exists",
)
user = User(
username=username,
password_hash=hash_password(password),
is_admin=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
# ---------------------------------------------------------------------------
# Authenticate (login)
# ---------------------------------------------------------------------------
def authenticate_user(db: Session, username: str, password: str) -> User:
"""Verify credentials and return the User, or raise 401."""
user = db.query(User).filter(User.username == username).first()
if user is None or not verify_password(password, user.password_hash):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
return user
# ---------------------------------------------------------------------------
# Database session dependency (local to avoid circular import with main.py)
# ---------------------------------------------------------------------------
def _get_db() -> Generator[Session, None, None]:
"""Yield a DB session. Imported lazily from main to avoid circular import."""
from main import get_db
yield from get_db()
# ---------------------------------------------------------------------------
# Dependency: get current user (JWT or API key)
# ---------------------------------------------------------------------------
def get_current_user(
authorization: str | None = Header(None),
x_api_key: str | None = Header(None),
db: Session = Depends(_get_db),
) -> User:
"""FastAPI dependency — resolve the current user from JWT Bearer token or API key.
Priority:
1. X-Api-Key header matched against settings.api_key (grants first admin).
2. Authorization: Bearer <jwt> decoded to get user_id.
"""
# --- API key path ---
if x_api_key is not None:
if settings.api_key is None or x_api_key != settings.api_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
# API key grants the first admin user
admin = db.query(User).filter(User.is_admin.is_(True)).first()
if admin is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="No admin user exists")
return admin
# --- JWT path ---
if authorization is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication",
headers={"WWW-Authenticate": "Bearer"},
)
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer" or not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization header",
headers={"WWW-Authenticate": "Bearer"},
)
user_id_str = decode_access_token(token)
try:
user_id = _uuid.UUID(user_id_str)
except ValueError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user

238
backend/tests/test_auth.py Normal file
View file

@ -0,0 +1,238 @@
"""Tests for backend/auth.py — JWT, API key, setup flow, and auth dependency."""
import os
from datetime import timedelta
from unittest.mock import patch
import pytest
from fastapi import FastAPI, Depends
from fastapi.testclient import TestClient
@pytest.fixture(autouse=True)
def _isolate_settings(tmp_path):
"""Ensure tests use a temp SQLite DB and no Redis."""
env = {
"DATABASE_URL": f"sqlite:///{tmp_path / 'test.db'}",
"REDIS_URL": "",
"DATA_DIR": str(tmp_path),
"JWT_SECRET": "test-secret-key-for-jwt-signing",
"API_KEY": "test-api-key-12345",
}
with patch.dict(os.environ, env, clear=False):
import config
new_settings = config.Settings(_env_file=None)
config.settings = new_settings
import main
main.settings = new_settings
main._init_db()
main._init_redis()
from models import Base
Base.metadata.create_all(bind=main.engine)
# Also patch auth module's settings reference
import auth
auth.settings = new_settings
yield
@pytest.fixture
def db_session():
from main import get_db
gen = get_db()
session = next(gen)
yield session
try:
next(gen)
except StopIteration:
pass
# ---------------------------------------------------------------------------
# Password hashing
# ---------------------------------------------------------------------------
class TestPasswordHashing:
def test_hash_and_verify(self):
from auth import hash_password, verify_password
hashed = hash_password("my-secret-password")
assert hashed != "my-secret-password"
assert verify_password("my-secret-password", hashed)
def test_wrong_password_fails(self):
from auth import hash_password, verify_password
hashed = hash_password("correct-password")
assert not verify_password("wrong-password", hashed)
# ---------------------------------------------------------------------------
# JWT
# ---------------------------------------------------------------------------
class TestJWT:
def test_create_and_decode_token(self):
from auth import create_access_token, decode_access_token
token = create_access_token("user-123")
assert decode_access_token(token) == "user-123"
def test_expired_token_raises(self):
from auth import create_access_token, decode_access_token
token = create_access_token("user-123", expires_delta=timedelta(seconds=-1))
with pytest.raises(Exception) as exc_info:
decode_access_token(token)
assert exc_info.value.status_code == 401
def test_invalid_token_raises(self):
from auth import decode_access_token
with pytest.raises(Exception) as exc_info:
decode_access_token("not-a-valid-token")
assert exc_info.value.status_code == 401
def test_token_without_sub_raises(self):
from jose import jwt
import config
token = jwt.encode({"foo": "bar"}, config.settings.jwt_secret, algorithm="HS256")
from auth import decode_access_token
with pytest.raises(Exception) as exc_info:
decode_access_token(token)
assert exc_info.value.status_code == 401
# ---------------------------------------------------------------------------
# First-boot setup
# ---------------------------------------------------------------------------
class TestSetup:
def test_needs_setup_true_when_no_users(self, db_session):
from auth import needs_setup
assert needs_setup(db_session) is True
def test_create_admin_succeeds(self, db_session):
from auth import create_admin, needs_setup
user = create_admin(db_session, "admin", "password123")
assert user.username == "admin"
assert user.is_admin is True
assert needs_setup(db_session) is False
def test_create_admin_twice_raises_409(self, db_session):
from auth import create_admin
create_admin(db_session, "admin", "password123")
with pytest.raises(Exception) as exc_info:
create_admin(db_session, "admin2", "password456")
assert exc_info.value.status_code == 409
def test_admin_password_is_hashed(self, db_session):
from auth import create_admin
user = create_admin(db_session, "admin", "password123")
assert user.password_hash != "password123"
assert user.password_hash.startswith("$2b$")
# ---------------------------------------------------------------------------
# Authenticate user (login)
# ---------------------------------------------------------------------------
class TestAuthenticateUser:
def test_valid_credentials(self, db_session):
from auth import create_admin, authenticate_user
create_admin(db_session, "admin", "password123")
user = authenticate_user(db_session, "admin", "password123")
assert user.username == "admin"
def test_wrong_password_raises_401(self, db_session):
from auth import create_admin, authenticate_user
create_admin(db_session, "admin", "password123")
with pytest.raises(Exception) as exc_info:
authenticate_user(db_session, "admin", "wrong")
assert exc_info.value.status_code == 401
def test_unknown_user_raises_401(self, db_session):
from auth import authenticate_user
with pytest.raises(Exception) as exc_info:
authenticate_user(db_session, "nonexistent", "password")
assert exc_info.value.status_code == 401
# ---------------------------------------------------------------------------
# get_current_user dependency (integration via test app)
# ---------------------------------------------------------------------------
@pytest.fixture
def auth_app():
"""Create a minimal FastAPI app with a protected endpoint for testing auth."""
from auth import get_current_user
from schemas import UserResponse
test_app = FastAPI()
@test_app.get("/protected")
def protected(user=Depends(get_current_user)):
return {"user_id": str(user.id), "username": user.username}
return test_app
@pytest.fixture
def auth_client(auth_app):
return TestClient(auth_app)
class TestGetCurrentUser:
def test_no_auth_returns_401(self, auth_client):
resp = auth_client.get("/protected")
assert resp.status_code == 401
assert "Missing authentication" in resp.json()["detail"]
def test_invalid_bearer_format_returns_401(self, auth_client):
resp = auth_client.get("/protected", headers={"Authorization": "NotBearer token"})
assert resp.status_code == 401
def test_jwt_auth_succeeds(self, auth_client, db_session):
from auth import create_admin, create_access_token
user = create_admin(db_session, "admin", "password123")
token = create_access_token(str(user.id))
resp = auth_client.get("/protected", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["username"] == "admin"
def test_jwt_for_deleted_user_returns_401(self, auth_client, db_session):
from auth import create_access_token
import uuid
token = create_access_token(str(uuid.uuid4()))
resp = auth_client.get("/protected", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 401
def test_api_key_auth_succeeds(self, auth_client, db_session):
from auth import create_admin
create_admin(db_session, "admin", "password123")
resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"})
assert resp.status_code == 200
assert resp.json()["username"] == "admin"
def test_wrong_api_key_returns_401(self, auth_client):
resp = auth_client.get("/protected", headers={"X-Api-Key": "wrong-key"})
assert resp.status_code == 401
def test_api_key_without_admin_returns_401(self, auth_client):
# No admin user created yet
resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"})
assert resp.status_code == 401
def test_api_key_disabled_when_not_configured(self, auth_client, db_session):
"""When API_KEY is not set in config, API key auth should fail."""
from auth import create_admin
import config, auth
create_admin(db_session, "admin", "password123")
old_key = config.settings.api_key
config.settings.api_key = None
auth.settings = config.settings
try:
resp = auth_client.get("/protected", headers={"X-Api-Key": "test-api-key-12345"})
assert resp.status_code == 401
finally:
config.settings.api_key = old_key
auth.settings = config.settings