- "backend/rate_limiter.py" - "backend/models.py" - "backend/routers/chat.py" - "backend/chat_service.py" - "backend/config.py" - "alembic/versions/031_add_chat_usage_log.py" GSD-Task: S04/T01
145 lines
5.2 KiB
Python
145 lines
5.2 KiB
Python
"""Chat endpoint — POST /api/v1/chat with SSE streaming response.
|
|
|
|
Accepts a query and optional creator filter, returns a Server-Sent Events
|
|
stream with sources, token, done, and error events.
|
|
|
|
Rate limiting: per-user (authenticated), per-IP (anonymous), and per-creator.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from auth import get_optional_user
|
|
from chat_service import ChatService
|
|
from config import Settings, get_settings
|
|
from database import get_session
|
|
from models import User
|
|
from rate_limiter import RateLimiter
|
|
from redis_client import get_redis
|
|
|
|
logger = logging.getLogger("chrysopedia.chat.router")
|
|
|
|
router = APIRouter(prefix="/chat", tags=["chat"])
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
"""Request body for the chat endpoint."""
|
|
|
|
query: str = Field(..., min_length=1, max_length=1000)
|
|
creator: str | None = None
|
|
conversation_id: str | None = None
|
|
personality_weight: float = Field(default=0.0, ge=0.0, le=1.0)
|
|
|
|
|
|
def _get_client_ip(request: Request) -> str:
|
|
"""Extract client IP, preferring X-Forwarded-For behind a reverse proxy."""
|
|
forwarded = request.headers.get("x-forwarded-for")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
@router.post("", response_model=None)
|
|
async def chat(
|
|
body: ChatRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_session),
|
|
settings: Settings = Depends(get_settings),
|
|
user: User | None = Depends(get_optional_user),
|
|
):
|
|
"""Stream a chat response as Server-Sent Events.
|
|
|
|
Rate limits are checked before processing:
|
|
- Authenticated users: ``rate_limit_user_per_hour`` requests/hour
|
|
- Anonymous (IP-based): ``rate_limit_ip_per_hour`` requests/hour
|
|
- Per-creator (if creator filter set): ``rate_limit_creator_per_hour`` requests/hour
|
|
|
|
SSE protocol:
|
|
- ``event: sources`` — citation metadata array (sent first)
|
|
- ``event: token`` — streamed text chunk (repeated)
|
|
- ``event: done`` — completion metadata with cascade_tier, conversation_id
|
|
- ``event: error`` — error message (on failure)
|
|
"""
|
|
client_ip = _get_client_ip(request)
|
|
user_id = user.id if user else None
|
|
|
|
logger.info(
|
|
"chat_request query=%r creator=%r cid=%r weight=%.2f user=%s ip=%s",
|
|
body.query, body.creator, body.conversation_id,
|
|
body.personality_weight, user_id, client_ip,
|
|
)
|
|
|
|
redis = await get_redis()
|
|
|
|
# ── Rate limiting ───────────────────────────────────────────────────
|
|
limiter = RateLimiter(redis)
|
|
|
|
# User-based limit (authenticated) or IP-based limit (anonymous)
|
|
if user_id:
|
|
identity_key = RateLimiter.key("user", str(user_id))
|
|
identity_limit = settings.rate_limit_user_per_hour
|
|
else:
|
|
identity_key = RateLimiter.key("ip", client_ip)
|
|
identity_limit = settings.rate_limit_ip_per_hour
|
|
|
|
result = await limiter.check_rate_limit(identity_key, identity_limit, window_seconds=3600)
|
|
if not result.allowed:
|
|
scope = "user" if user_id else "ip"
|
|
logger.warning(
|
|
"rate_limit_exceeded scope=%s key=%s remaining=%d retry_after=%d",
|
|
scope, identity_key, result.remaining, result.retry_after,
|
|
)
|
|
return JSONResponse(
|
|
status_code=429,
|
|
content={
|
|
"error": "Rate limit exceeded",
|
|
"retry_after": result.retry_after,
|
|
},
|
|
headers={"Retry-After": str(result.retry_after)},
|
|
)
|
|
|
|
# Per-creator limit (if creator filter is provided)
|
|
if body.creator:
|
|
creator_key = RateLimiter.key("creator", body.creator)
|
|
creator_result = await limiter.check_rate_limit(
|
|
creator_key, settings.rate_limit_creator_per_hour, window_seconds=3600,
|
|
)
|
|
if not creator_result.allowed:
|
|
logger.warning(
|
|
"rate_limit_exceeded scope=creator key=%s retry_after=%d",
|
|
creator_key, creator_result.retry_after,
|
|
)
|
|
return JSONResponse(
|
|
status_code=429,
|
|
content={
|
|
"error": "Creator rate limit exceeded",
|
|
"retry_after": creator_result.retry_after,
|
|
},
|
|
headers={"Retry-After": str(creator_result.retry_after)},
|
|
)
|
|
|
|
# ── Stream response ─────────────────────────────────────────────────
|
|
service = ChatService(settings, redis=redis)
|
|
|
|
return StreamingResponse(
|
|
service.stream_response(
|
|
query=body.query,
|
|
db=db,
|
|
creator=body.creator,
|
|
conversation_id=body.conversation_id,
|
|
personality_weight=body.personality_weight,
|
|
user_id=user_id,
|
|
client_ip=client_ip,
|
|
),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|