chrysopedia/backend/routers/chat.py
jlightner a5d3af55ca feat: Built Redis sliding-window rate limiter, ChatUsageLog model with…
- "backend/rate_limiter.py"
- "backend/models.py"
- "backend/routers/chat.py"
- "backend/chat_service.py"
- "backend/config.py"
- "alembic/versions/031_add_chat_usage_log.py"

GSD-Task: S04/T01
2026-04-04 13:36:29 +00:00

145 lines
5.2 KiB
Python

"""Chat endpoint — POST /api/v1/chat with SSE streaming response.
Accepts a query and optional creator filter, returns a Server-Sent Events
stream with sources, token, done, and error events.
Rate limiting: per-user (authenticated), per-IP (anonymous), and per-creator.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from auth import get_optional_user
from chat_service import ChatService
from config import Settings, get_settings
from database import get_session
from models import User
from rate_limiter import RateLimiter
from redis_client import get_redis
logger = logging.getLogger("chrysopedia.chat.router")
router = APIRouter(prefix="/chat", tags=["chat"])
class ChatRequest(BaseModel):
"""Request body for the chat endpoint."""
query: str = Field(..., min_length=1, max_length=1000)
creator: str | None = None
conversation_id: str | None = None
personality_weight: float = Field(default=0.0, ge=0.0, le=1.0)
def _get_client_ip(request: Request) -> str:
"""Extract client IP, preferring X-Forwarded-For behind a reverse proxy."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
@router.post("", response_model=None)
async def chat(
body: ChatRequest,
request: Request,
db: AsyncSession = Depends(get_session),
settings: Settings = Depends(get_settings),
user: User | None = Depends(get_optional_user),
):
"""Stream a chat response as Server-Sent Events.
Rate limits are checked before processing:
- Authenticated users: ``rate_limit_user_per_hour`` requests/hour
- Anonymous (IP-based): ``rate_limit_ip_per_hour`` requests/hour
- Per-creator (if creator filter set): ``rate_limit_creator_per_hour`` requests/hour
SSE protocol:
- ``event: sources`` — citation metadata array (sent first)
- ``event: token`` — streamed text chunk (repeated)
- ``event: done`` — completion metadata with cascade_tier, conversation_id
- ``event: error`` — error message (on failure)
"""
client_ip = _get_client_ip(request)
user_id = user.id if user else None
logger.info(
"chat_request query=%r creator=%r cid=%r weight=%.2f user=%s ip=%s",
body.query, body.creator, body.conversation_id,
body.personality_weight, user_id, client_ip,
)
redis = await get_redis()
# ── Rate limiting ───────────────────────────────────────────────────
limiter = RateLimiter(redis)
# User-based limit (authenticated) or IP-based limit (anonymous)
if user_id:
identity_key = RateLimiter.key("user", str(user_id))
identity_limit = settings.rate_limit_user_per_hour
else:
identity_key = RateLimiter.key("ip", client_ip)
identity_limit = settings.rate_limit_ip_per_hour
result = await limiter.check_rate_limit(identity_key, identity_limit, window_seconds=3600)
if not result.allowed:
scope = "user" if user_id else "ip"
logger.warning(
"rate_limit_exceeded scope=%s key=%s remaining=%d retry_after=%d",
scope, identity_key, result.remaining, result.retry_after,
)
return JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"retry_after": result.retry_after,
},
headers={"Retry-After": str(result.retry_after)},
)
# Per-creator limit (if creator filter is provided)
if body.creator:
creator_key = RateLimiter.key("creator", body.creator)
creator_result = await limiter.check_rate_limit(
creator_key, settings.rate_limit_creator_per_hour, window_seconds=3600,
)
if not creator_result.allowed:
logger.warning(
"rate_limit_exceeded scope=creator key=%s retry_after=%d",
creator_key, creator_result.retry_after,
)
return JSONResponse(
status_code=429,
content={
"error": "Creator rate limit exceeded",
"retry_after": creator_result.retry_after,
},
headers={"Retry-After": str(creator_result.retry_after)},
)
# ── Stream response ─────────────────────────────────────────────────
service = ChatService(settings, redis=redis)
return StreamingResponse(
service.stream_response(
query=body.query,
db=db,
creator=body.creator,
conversation_id=body.conversation_id,
personality_weight=body.personality_weight,
user_id=user_id,
client_ip=client_ip,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)