chrysopedia/backend/routers/search.py
jlightner a976129179 feat: Added 4-tier creator-scoped cascade (creator → domain → global →…
- "backend/search_service.py"
- "backend/schemas.py"
- "backend/routers/search.py"

GSD-Task: S02/T01
2026-04-04 05:02:30 +00:00

202 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Search endpoint for semantic + keyword search with graceful fallback."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import async_session, get_session
from models import Creator, SearchLog, TechniquePage
from redis_client import get_redis
from schemas import (
PopularSearchesResponse,
PopularSearchItem,
SearchResponse,
SearchResultItem,
SuggestionItem,
SuggestionsResponse,
)
from search_service import SearchService
logger = logging.getLogger("chrysopedia.search.router")
router = APIRouter(prefix="/search", tags=["search"])
POPULAR_CACHE_KEY = "chrysopedia:popular_searches"
POPULAR_CACHE_TTL = 300 # 5 minutes
def _get_search_service() -> SearchService:
"""Build a SearchService from current settings."""
return SearchService(get_settings())
async def _log_search(query: str, scope: str, result_count: int) -> None:
"""Fire-and-forget: persist a search log row.
Opens its own session so it doesn't interfere with the request session.
Catches all exceptions — a logging failure must never break a search request.
"""
try:
async with async_session() as session:
session.add(SearchLog(query=query, scope=scope, result_count=result_count))
await session.commit()
except Exception:
logger.warning("Failed to log search query %r", query, exc_info=True)
@router.get("", response_model=SearchResponse)
async def search(
q: Annotated[str, Query(max_length=500)] = "",
scope: Annotated[str, Query()] = "all",
sort: Annotated[str, Query()] = "relevance",
limit: Annotated[int, Query(ge=1, le=100)] = 20,
creator: Annotated[str, Query(max_length=100)] = "",
db: AsyncSession = Depends(get_session),
) -> SearchResponse:
"""Semantic search with keyword fallback.
- **q**: Search query (max 500 chars). Empty → empty results.
- **scope**: ``all`` | ``topics`` | ``creators``. Invalid → defaults to ``all``.
- **limit**: Max results (1100, default 20).
- **creator**: Creator slug or UUID for cascade search. Empty → normal search.
"""
svc = _get_search_service()
result = await svc.search(query=q, scope=scope, sort=sort, limit=limit, db=db, creator=creator or None)
# Fire-and-forget search logging — only non-empty queries
if q.strip():
asyncio.create_task(_log_search(q.strip(), scope, result["total"]))
return SearchResponse(
items=[SearchResultItem(**item) for item in result["items"]],
partial_matches=[SearchResultItem(**item) for item in result.get("partial_matches", [])],
total=result["total"],
query=result["query"],
fallback_used=result["fallback_used"],
cascade_tier=result.get("cascade_tier", ""),
)
@router.get("/suggestions", response_model=SuggestionsResponse)
async def suggestions(
db: AsyncSession = Depends(get_session),
) -> SuggestionsResponse:
"""Return popular search suggestions for autocomplete.
Combines top technique pages (by view_count), popular topic tags
(by technique count), and top creators (by view_count).
Returns 812 deduplicated items.
"""
seen: set[str] = set()
items: list[SuggestionItem] = []
def _add(text: str, type_: str) -> None:
key = text.lower()
if key not in seen:
seen.add(key)
items.append(SuggestionItem(text=text, type=type_))
# Top 4 technique pages by view_count
tp_stmt = (
select(TechniquePage.title)
.order_by(TechniquePage.view_count.desc(), TechniquePage.title)
.limit(4)
)
tp_result = await db.execute(tp_stmt)
for (title,) in tp_result.all():
_add(title, "technique")
# Top 4 topic tags by how many technique pages use them
# Unnest the topic_tags ARRAY and count occurrences
tag_unnest = (
select(
func.unnest(TechniquePage.topic_tags).label("tag"),
)
.where(TechniquePage.topic_tags.isnot(None))
.subquery()
)
tag_stmt = (
select(
tag_unnest.c.tag,
func.count().label("cnt"),
)
.group_by(tag_unnest.c.tag)
.order_by(func.count().desc(), tag_unnest.c.tag)
.limit(4)
)
tag_result = await db.execute(tag_stmt)
for tag, _cnt in tag_result.all():
_add(tag, "topic")
# Top 4 creators by view_count
cr_stmt = (
select(Creator.name)
.where(Creator.hidden.is_(False))
.order_by(Creator.view_count.desc(), Creator.name)
.limit(4)
)
cr_result = await db.execute(cr_stmt)
for (name,) in cr_result.all():
_add(name, "creator")
return SuggestionsResponse(suggestions=items)
@router.get("/popular", response_model=PopularSearchesResponse)
async def popular_searches(
db: AsyncSession = Depends(get_session),
) -> PopularSearchesResponse:
"""Return the top 10 search queries from the last 7 days.
Results are cached in Redis for 5 minutes. Falls through to a
direct DB query when Redis is unavailable.
"""
# Try Redis cache first
try:
redis = await get_redis()
cached = await redis.get(POPULAR_CACHE_KEY)
await redis.aclose()
if cached is not None:
items = json.loads(cached)
return PopularSearchesResponse(
items=[PopularSearchItem(**i) for i in items],
cached=True,
)
except Exception:
logger.warning("Redis unavailable for popular searches cache", exc_info=True)
# Cache miss or Redis down — query DB
stmt = (
select(
func.lower(SearchLog.query).label("q"),
func.count().label("cnt"),
)
.where(SearchLog.created_at > func.now() - text("interval '7 days'"))
.group_by(func.lower(SearchLog.query))
.order_by(func.count().desc())
.limit(10)
)
result = await db.execute(stmt)
items = [PopularSearchItem(query=row.q, count=row.cnt) for row in result.all()]
# Write to Redis cache (best-effort)
try:
redis = await get_redis()
await redis.set(
POPULAR_CACHE_KEY,
json.dumps([i.model_dump() for i in items]),
ex=POPULAR_CACHE_TTL,
)
await redis.aclose()
except Exception:
logger.warning("Failed to cache popular searches in Redis", exc_info=True)
return PopularSearchesResponse(items=items, cached=False)