chrysopedia/backend/auth.py
jlightner cc60852ac9 feat: Built post CRUD and file upload/download API routers with auth, o…
- "backend/routers/posts.py"
- "backend/routers/files.py"
- "backend/minio_client.py"
- "backend/auth.py"
- "backend/main.py"

GSD-Task: S01/T02
2026-04-04 09:07:35 +00:00

193 lines
6.5 KiB
Python

"""Authentication utilities — password hashing, JWT, FastAPI dependencies."""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from typing import Annotated
import bcrypt
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import get_session
from models import User, UserRole
# ── Password hashing ─────────────────────────────────────────────────────────
def hash_password(plain: str) -> str:
"""Hash a plaintext password with bcrypt."""
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
"""Verify a plaintext password against a bcrypt hash."""
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
# ── JWT ──────────────────────────────────────────────────────────────────────
_ALGORITHM = "HS256"
_ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def create_access_token(
user_id: uuid.UUID | str,
role: str,
*,
expires_minutes: int = _ACCESS_TOKEN_EXPIRE_MINUTES,
) -> str:
"""Create a signed JWT with user_id and role claims."""
settings = get_settings()
now = datetime.now(timezone.utc)
payload = {
"sub": str(user_id),
"role": role,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM)
_IMPERSONATION_EXPIRE_MINUTES = 60 # 1 hour
def create_impersonation_token(
admin_user_id: uuid.UUID | str,
target_user_id: uuid.UUID | str,
target_role: str,
*,
write_mode: bool = False,
) -> str:
"""Create a scoped JWT for admin impersonation.
The token has sub=target_user_id so get_current_user loads the target,
plus original_user_id so the system knows it's impersonation.
When write_mode is True, the token allows write operations.
"""
settings = get_settings()
now = datetime.now(timezone.utc)
payload = {
"sub": str(target_user_id),
"role": target_role,
"original_user_id": str(admin_user_id),
"type": "impersonation",
"iat": now,
"exp": now + timedelta(minutes=_IMPERSONATION_EXPIRE_MINUTES),
}
if write_mode:
payload["write_mode"] = True
return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM)
def decode_access_token(token: str) -> dict:
"""Decode and validate a JWT. Raises on expiry or malformed tokens."""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.app_secret_key,
algorithms=[_ALGORITHM],
options={"require": ["sub", "role", "exp"]},
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
except jwt.InvalidTokenError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {exc}",
)
return payload
# ── FastAPI dependencies ─────────────────────────────────────────────────────
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> User:
"""Decode JWT, load User from DB, raise 401 if missing or inactive.
If the token contains an original_user_id claim (impersonation),
sets _impersonating_admin_id on the returned user object.
"""
payload = decode_access_token(token)
user_id = payload.get("sub")
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
# Attach impersonation metadata (non-column runtime attribute)
user._impersonating_admin_id = payload.get("original_user_id") # type: ignore[attr-defined]
user._impersonation_write_mode = payload.get("write_mode", False) # type: ignore[attr-defined]
return user
_optional_oauth2 = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login", auto_error=False)
async def get_optional_user(
token: Annotated[str | None, Depends(_optional_oauth2)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> User | None:
"""Like get_current_user but returns None instead of 401 when no token."""
if token is None:
return None
try:
payload = decode_access_token(token)
except HTTPException:
return None
user_id = payload.get("sub")
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None or not user.is_active:
return None
return user
def require_role(required_role: UserRole):
"""Return a dependency that checks the current user has the given role."""
async def _check(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {required_role.value} role",
)
return current_user
return _check
async def reject_impersonation(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Dependency that blocks write operations during impersonation.
If the impersonation token was issued with write_mode=True,
writes are permitted.
"""
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is not None:
write_mode = getattr(current_user, "_impersonation_write_mode", False)
if not write_mode:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Write operations are not allowed during impersonation",
)
return current_user