chrysopedia/backend/routers/admin.py
jlightner ab9dd2aa1b feat: Added write_mode support to impersonation tokens with conditional…
- "backend/auth.py"
- "backend/models.py"
- "backend/routers/admin.py"
- "backend/tests/test_impersonation.py"

GSD-Task: S07/T01
2026-04-04 06:24:04 +00:00

238 lines
7 KiB
Python

"""Admin router — user management and impersonation."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from auth import (
create_impersonation_token,
decode_access_token,
get_current_user,
require_role,
)
from database import get_session
from models import ImpersonationLog, User, UserRole
logger = logging.getLogger("chrysopedia.admin")
router = APIRouter(prefix="/admin", tags=["admin"])
_require_admin = require_role(UserRole.admin)
# ── Schemas ──────────────────────────────────────────────────────────────────
class UserListItem(BaseModel):
id: str
email: str
display_name: str
role: str
creator_id: str | None
is_active: bool
class Config:
from_attributes = True
class ImpersonateResponse(BaseModel):
access_token: str
token_type: str = "bearer"
target_user: UserListItem
class StopImpersonateResponse(BaseModel):
message: str
class StartImpersonationRequest(BaseModel):
write_mode: bool = False
class ImpersonationLogItem(BaseModel):
id: str
admin_name: str
target_name: str
action: str
write_mode: bool
ip_address: str | None
created_at: datetime
# ── Helpers ──────────────────────────────────────────────────────────────────
def _client_ip(request: Request) -> str | None:
"""Best-effort client IP from X-Forwarded-For or direct connection."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
# ── Endpoints ────────────────────────────────────────────────────────────────
@router.get("/users", response_model=list[UserListItem])
async def list_users(
_admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""List all users. Admin only."""
result = await session.execute(
select(User).order_by(User.display_name)
)
users = result.scalars().all()
return [
UserListItem(
id=str(u.id),
email=u.email,
display_name=u.display_name,
role=u.role.value,
creator_id=str(u.creator_id) if u.creator_id else None,
is_active=u.is_active,
)
for u in users
]
@router.post("/impersonate/{user_id}", response_model=ImpersonateResponse)
async def start_impersonation(
user_id: UUID,
request: Request,
admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
body: StartImpersonationRequest | None = None,
):
"""Start impersonating a user. Admin only. Returns a scoped JWT."""
if body is None:
body = StartImpersonationRequest()
# Cannot impersonate yourself
if admin.id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot impersonate yourself",
)
# Load target user
result = await session.execute(select(User).where(User.id == user_id))
target = result.scalar_one_or_none()
if target is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Target user not found",
)
# Create impersonation token
token = create_impersonation_token(
admin_user_id=admin.id,
target_user_id=target.id,
target_role=target.role.value,
write_mode=body.write_mode,
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin.id,
target_user_id=target.id,
action="start",
write_mode=body.write_mode,
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation started: admin=%s target=%s write_mode=%s",
admin.id, target.id, body.write_mode,
)
return ImpersonateResponse(
access_token=token,
target_user=UserListItem(
id=str(target.id),
email=target.email,
display_name=target.display_name,
role=target.role.value,
creator_id=str(target.creator_id) if target.creator_id else None,
is_active=target.is_active,
),
)
@router.post("/impersonate/stop", response_model=StopImpersonateResponse)
async def stop_impersonation(
request: Request,
current_user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Stop impersonation. Requires a valid impersonation token."""
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not currently impersonating",
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin_id,
target_user_id=current_user.id,
action="stop",
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation stopped: admin=%s target=%s",
admin_id, current_user.id,
)
return StopImpersonateResponse(message="Impersonation ended")
@router.get("/impersonation-log", response_model=list[ImpersonationLogItem])
async def get_impersonation_log(
_admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
):
"""Paginated impersonation audit log. Admin only."""
AdminUser = aliased(User, name="admin_user")
TargetUser = aliased(User, name="target_user")
stmt = (
select(ImpersonationLog, AdminUser.display_name, TargetUser.display_name)
.join(AdminUser, ImpersonationLog.admin_user_id == AdminUser.id)
.join(TargetUser, ImpersonationLog.target_user_id == TargetUser.id)
.order_by(ImpersonationLog.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
result = await session.execute(stmt)
rows = result.all()
return [
ImpersonationLogItem(
id=str(log.id),
admin_name=admin_name,
target_name=target_name,
action=log.action,
write_mode=log.write_mode,
ip_address=log.ip_address,
created_at=log.created_at,
)
for log, admin_name, target_name in rows
]