chrysopedia/backend/routers/chat.py
jlightner a9589bfc93 test: Built ChatService with retrieve-prompt-stream pipeline, POST /api…
- "backend/chat_service.py"
- "backend/routers/chat.py"
- "backend/main.py"
- "backend/tests/test_chat.py"

GSD-Task: S03/T01
2026-04-04 05:19:44 +00:00

60 lines
1.8 KiB
Python

"""Chat endpoint — POST /api/v1/chat with SSE streaming response.
Accepts a query and optional creator filter, returns a Server-Sent Events
stream with sources, token, done, and error events.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from chat_service import ChatService
from config import Settings, get_settings
from database import get_session
logger = logging.getLogger("chrysopedia.chat.router")
router = APIRouter(prefix="/chat", tags=["chat"])
class ChatRequest(BaseModel):
"""Request body for the chat endpoint."""
query: str = Field(..., min_length=1, max_length=1000)
creator: str | None = None
def _get_chat_service(settings: Settings = Depends(get_settings)) -> ChatService:
"""Build a ChatService from current settings."""
return ChatService(settings)
@router.post("")
async def chat(
body: ChatRequest,
db: AsyncSession = Depends(get_session),
service: ChatService = Depends(_get_chat_service),
) -> StreamingResponse:
"""Stream a chat response as Server-Sent Events.
SSE protocol:
- ``event: sources`` — citation metadata array (sent first)
- ``event: token`` — streamed text chunk (repeated)
- ``event: done`` — completion metadata with cascade_tier
- ``event: error`` — error message (on failure)
"""
logger.info("chat_request query=%r creator=%r", body.query, body.creator)
return StreamingResponse(
service.stream_response(query=body.query, db=db, creator=body.creator),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)