chrysopedia/backend/auth.py
jlightner a06ea946b1 chore: Added User/InviteCode models, Alembic migration 016, auth utilit…
- "backend/models.py"
- "backend/auth.py"
- "backend/schemas.py"
- "backend/requirements.txt"
- "alembic/versions/016_add_users_and_invite_codes.py"

GSD-Task: S02/T01
2026-04-03 21:47:01 +00:00

116 lines
3.9 KiB
Python

"""Authentication utilities — password hashing, JWT, FastAPI dependencies."""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from typing import Annotated
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import get_session
from models import User, UserRole
# ── Password hashing ─────────────────────────────────────────────────────────
_pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
"""Hash a plaintext password with bcrypt."""
return _pwd_ctx.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
"""Verify a plaintext password against a bcrypt hash."""
return _pwd_ctx.verify(plain, hashed)
# ── JWT ──────────────────────────────────────────────────────────────────────
_ALGORITHM = "HS256"
_ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def create_access_token(
user_id: uuid.UUID | str,
role: str,
*,
expires_minutes: int = _ACCESS_TOKEN_EXPIRE_MINUTES,
) -> str:
"""Create a signed JWT with user_id and role claims."""
settings = get_settings()
now = datetime.now(timezone.utc)
payload = {
"sub": str(user_id),
"role": role,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM)
def decode_access_token(token: str) -> dict:
"""Decode and validate a JWT. Raises on expiry or malformed tokens."""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.app_secret_key,
algorithms=[_ALGORITHM],
options={"require": ["sub", "role", "exp"]},
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
except jwt.InvalidTokenError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {exc}",
)
return payload
# ── FastAPI dependencies ─────────────────────────────────────────────────────
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> User:
"""Decode JWT, load User from DB, raise 401 if missing or inactive."""
payload = decode_access_token(token)
user_id = payload.get("sub")
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
return user
def require_role(required_role: UserRole):
"""Return a dependency that checks the current user has the given role."""
async def _check(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {required_role.value} role",
)
return current_user
return _check