chrysopedia/backend/routers/admin.py
jlightner 17b43d9778 feat: Added LightRAG /query/data as primary search engine with file_sou…
- "backend/config.py"
- "backend/search_service.py"

GSD-Task: S01/T01
2026-04-04 04:44:24 +00:00

180 lines
5.3 KiB
Python

"""Admin router — user management and impersonation."""
from __future__ import annotations
import logging
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import (
create_impersonation_token,
decode_access_token,
get_current_user,
require_role,
)
from database import get_session
from models import ImpersonationLog, User, UserRole
logger = logging.getLogger("chrysopedia.admin")
router = APIRouter(prefix="/admin", tags=["admin"])
_require_admin = require_role(UserRole.admin)
# ── Schemas ──────────────────────────────────────────────────────────────────
class UserListItem(BaseModel):
id: str
email: str
display_name: str
role: str
creator_id: str | None
is_active: bool
class Config:
from_attributes = True
class ImpersonateResponse(BaseModel):
access_token: str
token_type: str = "bearer"
target_user: UserListItem
class StopImpersonateResponse(BaseModel):
message: str
# ── Helpers ──────────────────────────────────────────────────────────────────
def _client_ip(request: Request) -> str | None:
"""Best-effort client IP from X-Forwarded-For or direct connection."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
# ── Endpoints ────────────────────────────────────────────────────────────────
@router.get("/users", response_model=list[UserListItem])
async def list_users(
_admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""List all users. Admin only."""
result = await session.execute(
select(User).order_by(User.display_name)
)
users = result.scalars().all()
return [
UserListItem(
id=str(u.id),
email=u.email,
display_name=u.display_name,
role=u.role.value,
creator_id=str(u.creator_id) if u.creator_id else None,
is_active=u.is_active,
)
for u in users
]
@router.post("/impersonate/{user_id}", response_model=ImpersonateResponse)
async def start_impersonation(
user_id: UUID,
request: Request,
admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Start impersonating a user. Admin only. Returns a scoped JWT."""
# Cannot impersonate yourself
if admin.id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot impersonate yourself",
)
# Load target user
result = await session.execute(select(User).where(User.id == user_id))
target = result.scalar_one_or_none()
if target is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Target user not found",
)
# Create impersonation token
token = create_impersonation_token(
admin_user_id=admin.id,
target_user_id=target.id,
target_role=target.role.value,
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin.id,
target_user_id=target.id,
action="start",
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation started: admin=%s target=%s",
admin.id, target.id,
)
return ImpersonateResponse(
access_token=token,
target_user=UserListItem(
id=str(target.id),
email=target.email,
display_name=target.display_name,
role=target.role.value,
creator_id=str(target.creator_id) if target.creator_id else None,
is_active=target.is_active,
),
)
@router.post("/impersonate/stop", response_model=StopImpersonateResponse)
async def stop_impersonation(
request: Request,
current_user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Stop impersonation. Requires a valid impersonation token."""
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not currently impersonating",
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin_id,
target_user_id=current_user.id,
action="stop",
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation stopped: admin=%s target=%s",
admin_id, current_user.id,
)
return StopImpersonateResponse(message="Impersonation ended")