"""Admin router — user management and impersonation.""" from __future__ import annotations import logging from datetime import datetime from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased from auth import ( create_impersonation_token, decode_access_token, get_current_user, require_role, ) from database import get_session from models import ImpersonationLog, User, UserRole logger = logging.getLogger("chrysopedia.admin") router = APIRouter(prefix="/admin", tags=["admin"]) _require_admin = require_role(UserRole.admin) # ── Schemas ────────────────────────────────────────────────────────────────── class UserListItem(BaseModel): id: str email: str display_name: str role: str creator_id: str | None is_active: bool class Config: from_attributes = True class ImpersonateResponse(BaseModel): access_token: str token_type: str = "bearer" target_user: UserListItem class StopImpersonateResponse(BaseModel): message: str class StartImpersonationRequest(BaseModel): write_mode: bool = False class ImpersonationLogItem(BaseModel): id: str admin_name: str target_name: str action: str write_mode: bool ip_address: str | None created_at: datetime # ── Helpers ────────────────────────────────────────────────────────────────── def _client_ip(request: Request) -> str | None: """Best-effort client IP from X-Forwarded-For or direct connection.""" forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return None # ── Endpoints ──────────────────────────────────────────────────────────────── @router.get("/users", response_model=list[UserListItem]) async def list_users( _admin: Annotated[User, Depends(_require_admin)], session: Annotated[AsyncSession, Depends(get_session)], ): """List all users. Admin only.""" result = await session.execute( select(User).order_by(User.display_name) ) users = result.scalars().all() return [ UserListItem( id=str(u.id), email=u.email, display_name=u.display_name, role=u.role.value, creator_id=str(u.creator_id) if u.creator_id else None, is_active=u.is_active, ) for u in users ] @router.post("/impersonate/{user_id}", response_model=ImpersonateResponse) async def start_impersonation( user_id: UUID, request: Request, admin: Annotated[User, Depends(_require_admin)], session: Annotated[AsyncSession, Depends(get_session)], body: StartImpersonationRequest | None = None, ): """Start impersonating a user. Admin only. Returns a scoped JWT.""" if body is None: body = StartImpersonationRequest() # Cannot impersonate yourself if admin.id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot impersonate yourself", ) # Load target user result = await session.execute(select(User).where(User.id == user_id)) target = result.scalar_one_or_none() if target is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Target user not found", ) # Create impersonation token token = create_impersonation_token( admin_user_id=admin.id, target_user_id=target.id, target_role=target.role.value, write_mode=body.write_mode, ) # Audit log session.add(ImpersonationLog( admin_user_id=admin.id, target_user_id=target.id, action="start", write_mode=body.write_mode, ip_address=_client_ip(request), )) await session.commit() logger.info( "Impersonation started: admin=%s target=%s write_mode=%s", admin.id, target.id, body.write_mode, ) return ImpersonateResponse( access_token=token, target_user=UserListItem( id=str(target.id), email=target.email, display_name=target.display_name, role=target.role.value, creator_id=str(target.creator_id) if target.creator_id else None, is_active=target.is_active, ), ) @router.post("/impersonate/stop", response_model=StopImpersonateResponse) async def stop_impersonation( request: Request, current_user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """Stop impersonation. Requires a valid impersonation token.""" admin_id = getattr(current_user, "_impersonating_admin_id", None) if admin_id is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Not currently impersonating", ) # Audit log session.add(ImpersonationLog( admin_user_id=admin_id, target_user_id=current_user.id, action="stop", ip_address=_client_ip(request), )) await session.commit() logger.info( "Impersonation stopped: admin=%s target=%s", admin_id, current_user.id, ) return StopImpersonateResponse(message="Impersonation ended") @router.get("/impersonation-log", response_model=list[ImpersonationLogItem]) async def get_impersonation_log( _admin: Annotated[User, Depends(_require_admin)], session: Annotated[AsyncSession, Depends(get_session)], page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), ): """Paginated impersonation audit log. Admin only.""" AdminUser = aliased(User, name="admin_user") TargetUser = aliased(User, name="target_user") stmt = ( select(ImpersonationLog, AdminUser.display_name, TargetUser.display_name) .join(AdminUser, ImpersonationLog.admin_user_id == AdminUser.id) .join(TargetUser, ImpersonationLog.target_user_id == TargetUser.id) .order_by(ImpersonationLog.created_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) result = await session.execute(stmt) rows = result.all() return [ ImpersonationLogItem( id=str(log.id), admin_name=admin_name, target_name=target_name, action=log.action, write_mode=log.write_mode, ip_address=log.ip_address, created_at=log.created_at, ) for log, admin_name, target_name in rows ] @router.post("/creators/{slug}/extract-profile") async def extract_creator_profile( slug: str, _admin: Annotated[User, Depends(_require_admin)], session: Annotated[AsyncSession, Depends(get_session)], ): """Queue personality profile extraction for a creator. Admin only.""" from models import Creator result = await session.execute( select(Creator).where(Creator.slug == slug) ) creator = result.scalar_one_or_none() if creator is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Creator not found: {slug}", ) from pipeline.stages import extract_personality_profile extract_personality_profile.delay(str(creator.id)) logger.info("Queued personality extraction for creator=%s (%s)", slug, creator.id) return {"status": "queued", "creator_id": str(creator.id)}