"""Authentication utilities — password hashing, JWT, FastAPI dependencies.""" from __future__ import annotations import uuid from datetime import datetime, timedelta, timezone from typing import Annotated import bcrypt import jwt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from config import get_settings from database import get_session from models import User, UserRole # ── Password hashing ───────────────────────────────────────────────────────── def hash_password(plain: str) -> str: """Hash a plaintext password with bcrypt.""" return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: """Verify a plaintext password against a bcrypt hash.""" return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) # ── JWT ────────────────────────────────────────────────────────────────────── _ALGORITHM = "HS256" _ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") def create_access_token( user_id: uuid.UUID | str, role: str, *, expires_minutes: int = _ACCESS_TOKEN_EXPIRE_MINUTES, ) -> str: """Create a signed JWT with user_id and role claims.""" settings = get_settings() now = datetime.now(timezone.utc) payload = { "sub": str(user_id), "role": role, "iat": now, "exp": now + timedelta(minutes=expires_minutes), } return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM) def decode_access_token(token: str) -> dict: """Decode and validate a JWT. Raises on expiry or malformed tokens.""" settings = get_settings() try: payload = jwt.decode( token, settings.app_secret_key, algorithms=[_ALGORITHM], options={"require": ["sub", "role", "exp"]}, ) except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", ) except jwt.InvalidTokenError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {exc}", ) return payload # ── FastAPI dependencies ───────────────────────────────────────────────────── async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], session: Annotated[AsyncSession, Depends(get_session)], ) -> User: """Decode JWT, load User from DB, raise 401 if missing or inactive.""" payload = decode_access_token(token) user_id = payload.get("sub") result = await session.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user is None or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive", ) return user def require_role(required_role: UserRole): """Return a dependency that checks the current user has the given role.""" async def _check( current_user: Annotated[User, Depends(get_current_user)], ) -> User: if current_user.role != required_role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Requires {required_role.value} role", ) return current_user return _check