"""Async search service for the public search endpoint. Orchestrates semantic search (embedding + Qdrant) with keyword search. Both run in parallel — results are merged and deduplicated. Keyword matches get a match_context explaining WHY they matched. Semantic-only results get a "Semantic match" context and are filtered by a minimum score threshold. """ from __future__ import annotations import asyncio import logging import time from typing import Any import openai from qdrant_client import AsyncQdrantClient from qdrant_client.http import exceptions as qdrant_exceptions from qdrant_client.models import FieldCondition, Filter, MatchValue from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from config import Settings from models import Creator, KeyMoment, SourceVideo, TechniquePage logger = logging.getLogger("chrysopedia.search") # Timeout for external calls (embedding API, Qdrant) in seconds _EXTERNAL_TIMEOUT = 2.0 # 2s — Ollama local embedding needs more than 300ms # Minimum cosine similarity score for semantic results to be included _SEMANTIC_MIN_SCORE = 0.45 class SearchService: """Async search service with parallel semantic + keyword search. Parameters ---------- settings: Application settings containing embedding and Qdrant config. """ def __init__(self, settings: Settings) -> None: self.settings = settings self._openai = openai.AsyncOpenAI( base_url=settings.embedding_api_url, api_key=settings.llm_api_key, ) self._qdrant = AsyncQdrantClient(url=settings.qdrant_url) self._collection = settings.qdrant_collection # ── Embedding ──────────────────────────────────────────────────────── async def embed_query(self, text: str) -> list[float] | None: """Embed a query string into a vector. Returns None on any failure (timeout, connection, malformed response) so the caller can fall back to keyword search. """ try: response = await asyncio.wait_for( self._openai.embeddings.create( model=self.settings.embedding_model, input=text, ), timeout=_EXTERNAL_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("Embedding API timeout (%.0fms limit) for query: %.50s…", _EXTERNAL_TIMEOUT * 1000, text) return None except (openai.APIConnectionError, openai.APITimeoutError) as exc: logger.warning("Embedding API connection error (%s: %s)", type(exc).__name__, exc) return None except openai.APIError as exc: logger.warning("Embedding API error (%s: %s)", type(exc).__name__, exc) return None if not response.data: logger.warning("Embedding API returned empty data for query: %.50s…", text) return None vector = response.data[0].embedding if len(vector) != self.settings.embedding_dimensions: logger.warning( "Embedding dimension mismatch: expected %d, got %d", self.settings.embedding_dimensions, len(vector), ) return None return vector # ── Qdrant vector search ───────────────────────────────────────────── async def search_qdrant( self, vector: list[float], limit: int = 20, type_filter: str | None = None, ) -> list[dict[str, Any]]: """Search Qdrant for nearest neighbours. Returns a list of dicts with 'score' and 'payload' keys. Returns empty list on failure. """ query_filter = None if type_filter: query_filter = Filter( must=[FieldCondition(key="type", match=MatchValue(value=type_filter))] ) try: results = await asyncio.wait_for( self._qdrant.query_points( collection_name=self._collection, query=vector, query_filter=query_filter, limit=limit, with_payload=True, ), timeout=_EXTERNAL_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("Qdrant search timeout (%.0fms limit)", _EXTERNAL_TIMEOUT * 1000) return [] except qdrant_exceptions.UnexpectedResponse as exc: logger.warning("Qdrant search error: %s", exc) return [] except Exception as exc: logger.warning("Qdrant connection error (%s: %s)", type(exc).__name__, exc) return [] return [ {"score": point.score, "payload": point.payload} for point in results.points ] # ── Token helpers ─────────────────────────────────────────────────── @staticmethod def _tokenize(query: str) -> list[str]: """Split query into non-empty lowercase tokens.""" return [t.lower() for t in query.split() if t] @staticmethod def _tp_token_condition(token: str): """Build an OR condition for a single token across TechniquePage + Creator fields.""" pat = f"%{token}%" return or_( TechniquePage.title.ilike(pat), TechniquePage.summary.ilike(pat), TechniquePage.topic_category.ilike(pat), func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat), Creator.name.ilike(pat), ) @staticmethod def _km_token_condition(token: str): """Build an OR condition for a single token across KeyMoment + Creator fields.""" pat = f"%{token}%" return or_( KeyMoment.title.ilike(pat), KeyMoment.summary.ilike(pat), Creator.name.ilike(pat), ) @staticmethod def _cr_token_condition(token: str): """Build an OR condition for a single token across Creator fields.""" pat = f"%{token}%" return or_( Creator.name.ilike(pat), func.array_to_string(Creator.genres, " ").ilike(pat), ) # ── Match context generation ──────────────────────────────────────── @staticmethod def _build_match_context(item: dict[str, Any], tokens: list[str]) -> str: """Generate a human-readable string explaining which fields matched. Checks each token against each field and returns a compact summary like "Creator: Keota · Tag: snare" or "Title: ...snare drum...". """ if not tokens: return "" matches: list[str] = [] seen: set[str] = set() for token in tokens: t = token.lower() # Check creator name creator = item.get("creator_name", "") if creator and t in creator.lower() and "creator" not in seen: matches.append(f"Creator: {creator}") seen.add("creator") continue # Check title title = item.get("title", "") if title and t in title.lower() and "title" not in seen: matches.append(f"Title match") seen.add("title") continue # Check topic_category cat = item.get("topic_category", "") if cat and t in cat.lower() and "category" not in seen: matches.append(f"Category: {cat}") seen.add("category") continue # Check topic_tags tags = item.get("topic_tags", []) matched_tag = next((tag for tag in tags if t in tag.lower()), None) if matched_tag and f"tag:{matched_tag.lower()}" not in seen: matches.append(f"Tag: {matched_tag}") seen.add(f"tag:{matched_tag.lower()}") continue # Check summary summary = item.get("summary", "") if summary and t in summary.lower() and "summary" not in seen: # Extract a small context window around the match idx = summary.lower().find(t) start = max(0, idx - 20) end = min(len(summary), idx + len(t) + 20) snippet = summary[start:end].strip() if start > 0: snippet = "…" + snippet if end < len(summary): snippet = snippet + "…" matches.append(f"Content: {snippet}") seen.add("summary") continue return " · ".join(matches) if matches else "" # ── Keyword search (multi-token AND) ───────────────────────────────── async def keyword_search( self, query: str, scope: str, limit: int, db: AsyncSession, sort: str = "relevance", ) -> dict[str, list[dict[str, Any]]]: """Multi-token AND keyword search across technique pages, key moments, and creators. Tokenizes the query by whitespace. Each token must match at least one indexed field (title, summary, topic_category, topic_tags, creator name, genres). All tokens must match for a row to be included. If AND matching returns zero results but individual tokens would match, returns up to 5 partial_matches scored by the number of tokens matched. Returns ``{"items": [...], "partial_matches": [...]}``. """ tokens = self._tokenize(query) if not tokens: return {"items": [], "partial_matches": []} items = await self._keyword_search_and(tokens, scope, limit, db) # Add match_context to each item for item in items: item["match_context"] = self._build_match_context(item, tokens) partial: list[dict[str, Any]] = [] if not items and len(tokens) > 1: partial = await self._keyword_partial_matches(tokens, scope, db) for p in partial: p["match_context"] = self._build_match_context(p, tokens) return {"items": items, "partial_matches": partial} async def _keyword_search_and( self, tokens: list[str], scope: str, limit: int, db: AsyncSession, ) -> list[dict[str, Any]]: """Run AND-logic keyword search — every token must match at least one field.""" results: list[dict[str, Any]] = [] if scope in ("all", "topics"): tp_stmt = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where(and_(*(self._tp_token_condition(t) for t in tokens))) .limit(limit) ) tp_rows = await db.execute(tp_stmt) for tp, cr in tp_rows.all(): results.append({ "type": "technique_page", "title": tp.title, "slug": tp.slug, "technique_page_slug": tp.slug, "summary": tp.summary or "", "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": tp.created_at.isoformat() if tp.created_at else "", "score": 0.0, }) if scope in ("all",): km_stmt = ( select(KeyMoment, SourceVideo, Creator, TechniquePage) .join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id) .where(and_(*(self._km_token_condition(t) for t in tokens))) .limit(limit) ) km_rows = await db.execute(km_stmt) for km, sv, cr, tp in km_rows.all(): results.append({ "type": "key_moment", "title": km.title, "slug": "", "technique_page_slug": tp.slug if tp else "", "summary": km.summary or "", "topic_category": "", "topic_tags": [], "creator_id": str(cr.id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": km.created_at.isoformat() if hasattr(km, "created_at") and km.created_at else "", "score": 0.0, }) if scope in ("all", "creators"): cr_stmt = ( select(Creator) .where(and_(*(self._cr_token_condition(t) for t in tokens))) .limit(limit) ) cr_rows = await db.execute(cr_stmt) for cr in cr_rows.scalars().all(): results.append({ "type": "creator", "title": cr.name, "slug": cr.slug, "technique_page_slug": "", "summary": "", "topic_category": "", "topic_tags": cr.genres or [], "creator_id": str(cr.id), "created_at": cr.created_at.isoformat() if hasattr(cr, "created_at") and cr.created_at else "", "score": 0.0, }) return results[:limit] async def _keyword_partial_matches( self, tokens: list[str], scope: str, db: AsyncSession, ) -> list[dict[str, Any]]: """When AND produces zero results, score rows by how many tokens match. Returns the top 5 results ordered by match count descending. """ seen: dict[tuple[str, str], dict[str, Any]] = {} match_counts: dict[tuple[str, str], int] = {} for token in tokens: single_results = await self._keyword_search_and([token], scope, 20, db) for r in single_results: key = (r["type"], r.get("slug") or r.get("title", "")) if key not in seen: seen[key] = r match_counts[key] = 0 match_counts[key] += 1 ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True) partial: list[dict[str, Any]] = [] for key in ranked[:5]: item = seen[key] item["score"] = match_counts[key] / len(tokens) partial.append(item) return partial # ── Orchestrator ───────────────────────────────────────────────────── async def search( self, query: str, scope: str, limit: int, db: AsyncSession, sort: str = "relevance", ) -> dict[str, Any]: """Run semantic and keyword search in parallel, merge and deduplicate. Both engines run concurrently. Keyword results are always included (with match_context). Semantic results above the score threshold are merged in, deduplicated by (type, slug/title). Keyword matches rank higher when they exist. """ start = time.monotonic() if not query or not query.strip(): return {"items": [], "partial_matches": [], "total": 0, "query": query, "fallback_used": False} query = query.strip()[:500] if scope not in ("all", "topics", "creators"): scope = "all" # Map scope to Qdrant type filter type_filter_map = { "all": None, "topics": "technique_page", "creators": None, } qdrant_type_filter = type_filter_map.get(scope) # Run both searches in parallel async def _semantic(): vector = await self.embed_query(query) if vector is None: return [] results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter) enriched = await self._enrich_qdrant_results(results, db) # Filter by minimum score and add match_context filtered = [] for item in enriched: if item.get("score", 0) >= _SEMANTIC_MIN_SCORE: if not item.get("match_context"): item["match_context"] = "Semantic match" filtered.append(item) return filtered async def _keyword(): return await self.keyword_search(query, scope, limit, db, sort=sort) semantic_results, kw_result = await asyncio.gather( _semantic(), _keyword(), return_exceptions=True, ) # Handle exceptions gracefully if isinstance(semantic_results, Exception): logger.warning("Semantic search failed: %s", semantic_results) semantic_results = [] if isinstance(kw_result, Exception): logger.warning("Keyword search failed: %s", kw_result) kw_result = {"items": [], "partial_matches": []} kw_items = kw_result["items"] partial_matches = kw_result.get("partial_matches", []) # Merge: keyword results first (they have explicit match_context), # then semantic results that aren't already present seen_keys: set[str] = set() merged: list[dict[str, Any]] = [] def _dedup_key(item: dict) -> str: t = item.get("type", "") s = item.get("slug") or item.get("technique_page_slug") or "" title = item.get("title", "") return f"{t}:{s}:{title}" for item in kw_items: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) for item in semantic_results: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) # Apply sort merged = self._apply_sort(merged, sort) fallback_used = len(kw_items) > 0 and len(semantic_results) == 0 elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "Search query=%r scope=%s keyword=%d semantic=%d merged=%d partial=%d latency_ms=%.1f", query, scope, len(kw_items), len(semantic_results), len(merged), len(partial_matches), elapsed_ms, ) return { "items": merged[:limit], "partial_matches": partial_matches, "total": len(merged), "query": query, "fallback_used": fallback_used, } # ── Sort helpers ──────────────────────────────────────────────────── @staticmethod def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]: """Sort enriched result dicts by the requested criterion.""" if sort == "relevance" or not items: return items if sort == "newest": return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True) elif sort == "oldest": return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False) elif sort == "alpha": return sorted(items, key=lambda r: (r.get("title") or "").lower()) elif sort == "creator": return sorted( items, key=lambda r: ((r.get("creator_name") or "").lower(), (r.get("title") or "").lower()), ) return items # ── Result enrichment ──────────────────────────────────────────────── async def _enrich_qdrant_results( self, qdrant_results: list[dict[str, Any]], db: AsyncSession, ) -> list[dict[str, Any]]: """Enrich Qdrant results with creator names and slugs from DB. First reads creator_name from Qdrant payload; only hits DB for missing ones. """ enriched: list[dict[str, Any]] = [] # Collect creator_ids that need DB lookup needs_db_lookup: set[str] = set() for r in qdrant_results: payload = r.get("payload", {}) if not payload.get("creator_name") and payload.get("creator_id"): needs_db_lookup.add(payload["creator_id"]) # Batch fetch creators from DB creator_map: dict[str, dict[str, str]] = {} if needs_db_lookup: import uuid as uuid_mod valid_ids = [] for cid in needs_db_lookup: try: valid_ids.append(uuid_mod.UUID(cid)) except (ValueError, AttributeError): pass if valid_ids: stmt = select(Creator).where(Creator.id.in_(valid_ids)) result = await db.execute(stmt) for c in result.scalars().all(): creator_map[str(c.id)] = {"name": c.name, "slug": c.slug} for r in qdrant_results: payload = r.get("payload", {}) cid = payload.get("creator_id", "") result_type = payload.get("type", "") # Creator name: prefer payload, fall back to DB creator_name = payload.get("creator_name", "") creator_slug = "" if not creator_name and cid: info = creator_map.get(cid, {"name": "", "slug": ""}) creator_name = info["name"] creator_slug = info["slug"] elif creator_name and cid: # We have the name from payload but need the slug from DB info = creator_map.get(cid, {}) creator_slug = info.get("slug", "") # Determine technique_page_slug based on result type if result_type == "technique_page": tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-")) else: tp_slug = payload.get("technique_page_slug", "") enriched.append({ "type": result_type, "title": payload.get("title", ""), "slug": payload.get("slug", payload.get("title", "").lower().replace(" ", "-")), "technique_page_slug": tp_slug, "summary": payload.get("summary", ""), "topic_category": payload.get("topic_category", ""), "topic_tags": payload.get("topic_tags", []), "creator_id": cid, "creator_name": creator_name, "creator_slug": creator_slug, "created_at": payload.get("created_at", ""), "score": r.get("score", 0.0), "match_context": "", }) return enriched