"""Search endpoint for semantic + keyword search with graceful fallback.""" from __future__ import annotations import asyncio import json import logging from typing import Annotated from fastapi import APIRouter, Depends, Query from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from config import get_settings from database import async_session, get_session from models import Creator, SearchLog, TechniquePage from redis_client import get_redis from schemas import ( PopularSearchesResponse, PopularSearchItem, SearchResponse, SearchResultItem, SuggestionItem, SuggestionsResponse, ) from search_service import SearchService logger = logging.getLogger("chrysopedia.search.router") router = APIRouter(prefix="/search", tags=["search"]) POPULAR_CACHE_KEY = "chrysopedia:popular_searches" POPULAR_CACHE_TTL = 300 # 5 minutes def _get_search_service() -> SearchService: """Build a SearchService from current settings.""" return SearchService(get_settings()) async def _log_search(query: str, scope: str, result_count: int) -> None: """Fire-and-forget: persist a search log row. Opens its own session so it doesn't interfere with the request session. Catches all exceptions — a logging failure must never break a search request. """ try: async with async_session() as session: session.add(SearchLog(query=query, scope=scope, result_count=result_count)) await session.commit() except Exception: logger.warning("Failed to log search query %r", query, exc_info=True) @router.get("", response_model=SearchResponse) async def search( q: Annotated[str, Query(max_length=500)] = "", scope: Annotated[str, Query()] = "all", sort: Annotated[str, Query()] = "relevance", limit: Annotated[int, Query(ge=1, le=100)] = 20, creator: Annotated[str, Query(max_length=100)] = "", db: AsyncSession = Depends(get_session), ) -> SearchResponse: """Semantic search with keyword fallback. - **q**: Search query (max 500 chars). Empty → empty results. - **scope**: ``all`` | ``topics`` | ``creators``. Invalid → defaults to ``all``. - **limit**: Max results (1–100, default 20). - **creator**: Creator slug or UUID for cascade search. Empty → normal search. """ svc = _get_search_service() result = await svc.search(query=q, scope=scope, sort=sort, limit=limit, db=db, creator=creator or None) # Fire-and-forget search logging — only non-empty queries if q.strip(): asyncio.create_task(_log_search(q.strip(), scope, result["total"])) return SearchResponse( items=[SearchResultItem(**item) for item in result["items"]], partial_matches=[SearchResultItem(**item) for item in result.get("partial_matches", [])], total=result["total"], query=result["query"], fallback_used=result["fallback_used"], cascade_tier=result.get("cascade_tier", ""), ) @router.get("/suggestions", response_model=SuggestionsResponse) async def suggestions( db: AsyncSession = Depends(get_session), ) -> SuggestionsResponse: """Return popular search suggestions for autocomplete. Combines top technique pages (by view_count), popular topic tags (by technique count), and top creators (by view_count). Returns 8–12 deduplicated items. """ seen: set[str] = set() items: list[SuggestionItem] = [] def _add(text: str, type_: str) -> None: key = text.lower() if key not in seen: seen.add(key) items.append(SuggestionItem(text=text, type=type_)) # Top 4 technique pages by view_count tp_stmt = ( select(TechniquePage.title) .order_by(TechniquePage.view_count.desc(), TechniquePage.title) .limit(4) ) tp_result = await db.execute(tp_stmt) for (title,) in tp_result.all(): _add(title, "technique") # Top 4 topic tags by how many technique pages use them # Unnest the topic_tags ARRAY and count occurrences tag_unnest = ( select( func.unnest(TechniquePage.topic_tags).label("tag"), ) .where(TechniquePage.topic_tags.isnot(None)) .subquery() ) tag_stmt = ( select( tag_unnest.c.tag, func.count().label("cnt"), ) .group_by(tag_unnest.c.tag) .order_by(func.count().desc(), tag_unnest.c.tag) .limit(4) ) tag_result = await db.execute(tag_stmt) for tag, _cnt in tag_result.all(): _add(tag, "topic") # Top 4 creators by view_count cr_stmt = ( select(Creator.name) .where(Creator.hidden.is_(False)) .order_by(Creator.view_count.desc(), Creator.name) .limit(4) ) cr_result = await db.execute(cr_stmt) for (name,) in cr_result.all(): _add(name, "creator") return SuggestionsResponse(suggestions=items) @router.get("/popular", response_model=PopularSearchesResponse) async def popular_searches( db: AsyncSession = Depends(get_session), ) -> PopularSearchesResponse: """Return the top 10 search queries from the last 7 days. Results are cached in Redis for 5 minutes. Falls through to a direct DB query when Redis is unavailable. """ # Try Redis cache first try: redis = await get_redis() cached = await redis.get(POPULAR_CACHE_KEY) await redis.aclose() if cached is not None: items = json.loads(cached) return PopularSearchesResponse( items=[PopularSearchItem(**i) for i in items], cached=True, ) except Exception: logger.warning("Redis unavailable for popular searches cache", exc_info=True) # Cache miss or Redis down — query DB stmt = ( select( func.lower(SearchLog.query).label("q"), func.count().label("cnt"), ) .where(SearchLog.created_at > func.now() - text("interval '7 days'")) .group_by(func.lower(SearchLog.query)) .order_by(func.count().desc()) .limit(10) ) result = await db.execute(stmt) items = [PopularSearchItem(query=row.q, count=row.cnt) for row in result.all()] # Write to Redis cache (best-effort) try: redis = await get_redis() await redis.set( POPULAR_CACHE_KEY, json.dumps([i.model_dump() for i in items]), ex=POPULAR_CACHE_TTL, ) await redis.aclose() except Exception: logger.warning("Failed to cache popular searches in Redis", exc_info=True) return PopularSearchesResponse(items=items, cached=False)