"""Async search service for the public search endpoint. Orchestrates semantic search (embedding + Qdrant) with keyword search. Both run in parallel — results are merged and deduplicated. Keyword matches get a match_context explaining WHY they matched. Semantic-only results get a "Semantic match" context and are filtered by a minimum score threshold. """ from __future__ import annotations import asyncio import logging import re import time import uuid as uuid_mod from typing import Any import httpx import openai from qdrant_client import AsyncQdrantClient from qdrant_client.http import exceptions as qdrant_exceptions from qdrant_client.models import FieldCondition, Filter, MatchValue from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from config import Settings from models import Creator, KeyMoment, SourceVideo, TechniquePage logger = logging.getLogger("chrysopedia.search") # Timeout for external calls (embedding API, Qdrant) in seconds _EXTERNAL_TIMEOUT = 2.0 # 2s — Ollama local embedding needs more than 300ms # Minimum cosine similarity score for semantic results to be included _SEMANTIC_MIN_SCORE = 0.45 class SearchService: """Async search service with parallel semantic + keyword search. Parameters ---------- settings: Application settings containing embedding and Qdrant config. """ def __init__(self, settings: Settings) -> None: self.settings = settings self._openai = openai.AsyncOpenAI( base_url=settings.embedding_api_url, api_key=settings.llm_api_key, ) self._qdrant = AsyncQdrantClient(url=settings.qdrant_url) self._collection = settings.qdrant_collection # LightRAG client self._httpx = httpx.AsyncClient( timeout=httpx.Timeout(settings.lightrag_search_timeout), ) self._lightrag_url = settings.lightrag_url self._lightrag_min_query_length = settings.lightrag_min_query_length # ── Embedding ──────────────────────────────────────────────────────── async def embed_query(self, text: str) -> list[float] | None: """Embed a query string into a vector. Returns None on any failure (timeout, connection, malformed response) so the caller can fall back to keyword search. """ try: response = await asyncio.wait_for( self._openai.embeddings.create( model=self.settings.embedding_model, input=text, ), timeout=_EXTERNAL_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("Embedding API timeout (%.0fms limit) for query: %.50s…", _EXTERNAL_TIMEOUT * 1000, text) return None except (openai.APIConnectionError, openai.APITimeoutError) as exc: logger.warning("Embedding API connection error (%s: %s)", type(exc).__name__, exc) return None except openai.APIError as exc: logger.warning("Embedding API error (%s: %s)", type(exc).__name__, exc) return None if not response.data: logger.warning("Embedding API returned empty data for query: %.50s…", text) return None vector = response.data[0].embedding if len(vector) != self.settings.embedding_dimensions: logger.warning( "Embedding dimension mismatch: expected %d, got %d", self.settings.embedding_dimensions, len(vector), ) return None return vector # ── Qdrant vector search ───────────────────────────────────────────── async def search_qdrant( self, vector: list[float], limit: int = 20, type_filter: str | None = None, ) -> list[dict[str, Any]]: """Search Qdrant for nearest neighbours. Returns a list of dicts with 'score' and 'payload' keys. Returns empty list on failure. """ query_filter = None if type_filter: query_filter = Filter( must=[FieldCondition(key="type", match=MatchValue(value=type_filter))] ) try: results = await asyncio.wait_for( self._qdrant.query_points( collection_name=self._collection, query=vector, query_filter=query_filter, limit=limit, with_payload=True, ), timeout=_EXTERNAL_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("Qdrant search timeout (%.0fms limit)", _EXTERNAL_TIMEOUT * 1000) return [] except qdrant_exceptions.UnexpectedResponse as exc: logger.warning("Qdrant search error: %s", exc) return [] except Exception as exc: logger.warning("Qdrant connection error (%s: %s)", type(exc).__name__, exc) return [] return [ {"score": point.score, "payload": point.payload} for point in results.points ] # ── Token helpers ─────────────────────────────────────────────────── @staticmethod def _tokenize(query: str) -> list[str]: """Split query into non-empty lowercase tokens.""" return [t.lower() for t in query.split() if t] @staticmethod def _tp_token_condition(token: str): """Build an OR condition for a single token across TechniquePage + Creator fields.""" pat = f"%{token}%" return or_( TechniquePage.title.ilike(pat), TechniquePage.summary.ilike(pat), TechniquePage.topic_category.ilike(pat), func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat), Creator.name.ilike(pat), ) @staticmethod def _km_token_condition(token: str): """Build an OR condition for a single token across KeyMoment + Creator fields.""" pat = f"%{token}%" return or_( KeyMoment.title.ilike(pat), KeyMoment.summary.ilike(pat), Creator.name.ilike(pat), ) @staticmethod def _cr_token_condition(token: str): """Build an OR condition for a single token across Creator fields.""" pat = f"%{token}%" return or_( Creator.name.ilike(pat), func.array_to_string(Creator.genres, " ").ilike(pat), ) # ── Match context generation ──────────────────────────────────────── @staticmethod def _build_match_context(item: dict[str, Any], tokens: list[str]) -> str: """Generate a human-readable string explaining which fields matched. Checks each token against each field and returns a compact summary like "Creator: Keota · Tag: snare" or "Title: ...snare drum...". """ if not tokens: return "" matches: list[str] = [] seen: set[str] = set() for token in tokens: t = token.lower() # Check creator name creator = item.get("creator_name", "") if creator and t in creator.lower() and "creator" not in seen: matches.append(f"Creator: {creator}") seen.add("creator") continue # Check title title = item.get("title", "") if title and t in title.lower() and "title" not in seen: matches.append(f"Title match") seen.add("title") continue # Check topic_category cat = item.get("topic_category", "") if cat and t in cat.lower() and "category" not in seen: matches.append(f"Category: {cat}") seen.add("category") continue # Check topic_tags tags = item.get("topic_tags", []) matched_tag = next((tag for tag in tags if t in tag.lower()), None) if matched_tag and f"tag:{matched_tag.lower()}" not in seen: matches.append(f"Tag: {matched_tag}") seen.add(f"tag:{matched_tag.lower()}") continue # Check summary summary = item.get("summary", "") if summary and t in summary.lower() and "summary" not in seen: # Extract a small context window around the match idx = summary.lower().find(t) start = max(0, idx - 20) end = min(len(summary), idx + len(t) + 20) snippet = summary[start:end].strip() if start > 0: snippet = "…" + snippet if end < len(summary): snippet = snippet + "…" matches.append(f"Content: {snippet}") seen.add("summary") continue return " · ".join(matches) if matches else "" # ── Keyword search (multi-token AND) ───────────────────────────────── async def keyword_search( self, query: str, scope: str, limit: int, db: AsyncSession, sort: str = "relevance", ) -> dict[str, list[dict[str, Any]]]: """Multi-token AND keyword search across technique pages, key moments, and creators. Tokenizes the query by whitespace. Each token must match at least one indexed field (title, summary, topic_category, topic_tags, creator name, genres). All tokens must match for a row to be included. If AND matching returns zero results but individual tokens would match, returns up to 5 partial_matches scored by the number of tokens matched. Returns ``{"items": [...], "partial_matches": [...]}``. """ tokens = self._tokenize(query) if not tokens: return {"items": [], "partial_matches": []} items = await self._keyword_search_and(tokens, scope, limit, db) # Add match_context to each item for item in items: item["match_context"] = self._build_match_context(item, tokens) partial: list[dict[str, Any]] = [] if not items and len(tokens) > 1: partial = await self._keyword_partial_matches(tokens, scope, db) for p in partial: p["match_context"] = self._build_match_context(p, tokens) return {"items": items, "partial_matches": partial} async def _keyword_search_and( self, tokens: list[str], scope: str, limit: int, db: AsyncSession, ) -> list[dict[str, Any]]: """Run AND-logic keyword search — every token must match at least one field.""" results: list[dict[str, Any]] = [] if scope in ("all", "topics"): tp_stmt = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where(and_(*(self._tp_token_condition(t) for t in tokens))) .limit(limit) ) tp_rows = await db.execute(tp_stmt) for tp, cr in tp_rows.all(): results.append({ "type": "technique_page", "title": tp.title, "slug": tp.slug, "technique_page_slug": tp.slug, "summary": tp.summary or "", "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": tp.created_at.isoformat() if tp.created_at else "", "score": 0.0, }) if scope in ("all",): km_stmt = ( select(KeyMoment, SourceVideo, Creator, TechniquePage) .join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id) .where(and_(*(self._km_token_condition(t) for t in tokens))) .limit(limit) ) km_rows = await db.execute(km_stmt) for km, sv, cr, tp in km_rows.all(): results.append({ "type": "key_moment", "title": km.title, "slug": "", "technique_page_slug": tp.slug if tp else "", "summary": km.summary or "", "topic_category": "", "topic_tags": [], "creator_id": str(cr.id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": km.created_at.isoformat() if hasattr(km, "created_at") and km.created_at else "", "score": 0.0, "source_video_id": str(km.source_video_id) if km.source_video_id else "", "start_time": km.start_time, "end_time": km.end_time, "video_filename": (sv.filename or "") if sv else "", }) if scope in ("all", "creators"): cr_stmt = ( select(Creator) .where(and_(*(self._cr_token_condition(t) for t in tokens))) .limit(limit) ) cr_rows = await db.execute(cr_stmt) for cr in cr_rows.scalars().all(): results.append({ "type": "creator", "title": cr.name, "slug": cr.slug, "technique_page_slug": "", "summary": "", "topic_category": "", "topic_tags": cr.genres or [], "creator_id": str(cr.id), "created_at": cr.created_at.isoformat() if hasattr(cr, "created_at") and cr.created_at else "", "score": 0.0, }) return results[:limit] async def _keyword_partial_matches( self, tokens: list[str], scope: str, db: AsyncSession, ) -> list[dict[str, Any]]: """When AND produces zero results, score rows by how many tokens match. Returns the top 5 results ordered by match count descending. """ seen: dict[tuple[str, str], dict[str, Any]] = {} match_counts: dict[tuple[str, str], int] = {} for token in tokens: single_results = await self._keyword_search_and([token], scope, 20, db) for r in single_results: key = (r["type"], r.get("slug") or r.get("title", "")) if key not in seen: seen[key] = r match_counts[key] = 0 match_counts[key] += 1 ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True) partial: list[dict[str, Any]] = [] for key in ranked[:5]: item = seen[key] item["score"] = match_counts[key] / len(tokens) partial.append(item) return partial # ── LightRAG search ─────────────────────────────────────────────────── # Regex to parse file_source format: technique:{slug}:creator:{creator_id} _FILE_SOURCE_RE = re.compile(r"^technique:(?P[^:]+):creator:(?P.+)$") async def _lightrag_search( self, query: str, limit: int, db: AsyncSession, ) -> list[dict[str, Any]]: """Query LightRAG /query/data for entities, relationships, and chunks. Maps results back to SearchResultItem dicts using file_source parsing and DB batch lookup. Returns empty list on any failure (timeout, connection, parse error) with a WARNING log — caller falls back. """ start = time.monotonic() try: resp = await self._httpx.post( f"{self._lightrag_url}/query/data", json={"query": query, "mode": "hybrid", "top_k": limit}, ) resp.raise_for_status() body = resp.json() except httpx.TimeoutException: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "lightrag_search_fallback reason=timeout query=%r latency_ms=%.1f", query, elapsed_ms, ) return [] except httpx.HTTPError as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "lightrag_search_fallback reason=http_error query=%r error=%s latency_ms=%.1f", query, exc, elapsed_ms, ) return [] except Exception as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "lightrag_search_fallback reason=unexpected query=%r error=%s latency_ms=%.1f", query, exc, elapsed_ms, ) return [] # Parse response try: data = body.get("data", {}) if not data: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "lightrag_search_fallback reason=empty_data query=%r latency_ms=%.1f", query, elapsed_ms, ) return [] chunks = data.get("chunks", []) entities = data.get("entities", []) # relationships = data.get("relationships", []) # available for future use # Extract technique slugs from chunk file_path/file_source fields slug_set: set[str] = set() slug_order: list[str] = [] # preserve retrieval rank for chunk in chunks: file_path = chunk.get("file_path", "") m = self._FILE_SOURCE_RE.match(file_path) if m and m.group("slug") not in slug_set: slug = m.group("slug") slug_set.add(slug) slug_order.append(slug) # Also try to extract slugs from entity names by matching DB later entity_names: list[str] = [] for ent in entities: name = ent.get("entity_name", "") if name: entity_names.append(name) if not slug_set and not entity_names: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "lightrag_search_fallback reason=no_parseable_results query=%r " "chunks=%d entities=%d latency_ms=%.1f", query, len(chunks), len(entities), elapsed_ms, ) return [] # Batch-lookup technique pages by slug tp_map: dict[str, tuple] = {} # slug → (TechniquePage, Creator) if slug_set: tp_stmt = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where(TechniquePage.slug.in_(list(slug_set))) ) tp_rows = await db.execute(tp_stmt) for tp, cr in tp_rows.all(): tp_map[tp.slug] = (tp, cr) # If we have entity names but no chunk matches, try matching # entity names against technique page titles or creator names if entity_names and not tp_map: entity_name_pats = [f"%{name}%" for name in entity_names[:20]] tp_stmt2 = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where( or_( TechniquePage.title.ilike(func.any_(entity_name_pats)), Creator.name.ilike(func.any_(entity_name_pats)), ) ) .limit(limit) ) tp_rows2 = await db.execute(tp_stmt2) for tp, cr in tp_rows2.all(): if tp.slug not in tp_map: tp_map[tp.slug] = (tp, cr) if tp.slug not in slug_set: slug_order.append(tp.slug) slug_set.add(tp.slug) # Build result items in retrieval-rank order results: list[dict[str, Any]] = [] seen_slugs: set[str] = set() for idx, slug in enumerate(slug_order): if slug in seen_slugs: continue seen_slugs.add(slug) pair = tp_map.get(slug) if not pair: continue tp, cr = pair # Score: higher rank → higher score (1.0 down to ~0.5) score = max(1.0 - (idx * 0.05), 0.5) results.append({ "type": "technique_page", "title": tp.title, "slug": tp.slug, "technique_page_slug": tp.slug, "summary": tp.summary or "", "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": tp.created_at.isoformat() if tp.created_at else "", "score": score, "match_context": "LightRAG graph match", }) if len(results) >= limit: break elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "lightrag_search query=%r latency_ms=%.1f result_count=%d chunks=%d entities=%d", query, elapsed_ms, len(results), len(chunks), len(entities), ) return results except (KeyError, ValueError, TypeError) as exc: elapsed_ms = (time.monotonic() - start) * 1000 body_snippet = str(body)[:200] if body else "" logger.warning( "lightrag_search_fallback reason=parse_error query=%r error=%s body=%.200s latency_ms=%.1f", query, exc, body_snippet, elapsed_ms, ) return [] # ── Creator-scoped cascade helpers ────────────────────────────────── async def _resolve_creator( self, creator_ref: str, db: AsyncSession, ) -> tuple[str | None, str | None]: """Resolve a creator slug or UUID to (creator_id, creator_name). Returns (None, None) if the creator is not found. """ try: creator_uuid = uuid_mod.UUID(creator_ref) stmt = select(Creator).where(Creator.id == creator_uuid) except (ValueError, AttributeError): stmt = select(Creator).where(Creator.slug == creator_ref) result = await db.execute(stmt) cr = result.scalars().first() if cr is None: return None, None return str(cr.id), cr.name async def _get_creator_domain( self, creator_id: str, db: AsyncSession, ) -> str | None: """Return the dominant topic_category for a creator, or None if <2 technique pages.""" stmt = ( select( TechniquePage.topic_category, func.count().label("cnt"), ) .where(TechniquePage.creator_id == uuid_mod.UUID(creator_id)) .group_by(TechniquePage.topic_category) .order_by(func.count().desc()) .limit(1) ) result = await db.execute(stmt) row = result.first() if row is None: return None # Require at least 2 technique pages to declare a domain if row.cnt < 2: return None return row.topic_category async def _creator_scoped_search( self, query: str, creator_id: str, creator_name: str, limit: int, db: AsyncSession, ) -> list[dict[str, Any]]: """Search LightRAG with creator name as keyword, post-filter by creator_id.""" start = time.monotonic() try: resp = await self._httpx.post( f"{self._lightrag_url}/query/data", json={ "query": query, "mode": "hybrid", "top_k": limit * 3, "ll_keywords": [creator_name], }, ) resp.raise_for_status() body = resp.json() except Exception as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "creator_scoped_search reason=%s query=%r creator=%s latency_ms=%.1f", type(exc).__name__, query, creator_id, elapsed_ms, ) return [] try: data = body.get("data", {}) chunks = data.get("chunks", []) if data else [] slug_set: set[str] = set() slug_order: list[str] = [] for chunk in chunks: file_path = chunk.get("file_path", "") m = self._FILE_SOURCE_RE.match(file_path) if m and m.group("slug") not in slug_set: slug = m.group("slug") slug_set.add(slug) slug_order.append(slug) if not slug_set: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "creator_scoped_search reason=no_parseable_results query=%r creator=%s latency_ms=%.1f", query, creator_id, elapsed_ms, ) return [] # Batch lookup and post-filter by creator_id tp_stmt = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where(TechniquePage.slug.in_(list(slug_set))) ) tp_rows = await db.execute(tp_stmt) tp_map: dict[str, tuple] = {} for tp, cr in tp_rows.all(): if str(tp.creator_id) == creator_id: tp_map[tp.slug] = (tp, cr) results: list[dict[str, Any]] = [] for idx, slug in enumerate(slug_order): pair = tp_map.get(slug) if not pair: continue tp, cr = pair score = max(1.0 - (idx * 0.05), 0.5) results.append({ "type": "technique_page", "title": tp.title, "slug": tp.slug, "technique_page_slug": tp.slug, "summary": tp.summary or "", "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": tp.created_at.isoformat() if tp.created_at else "", "score": score, "match_context": "Creator-scoped LightRAG match", }) if len(results) >= limit: break elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "creator_scoped_search query=%r creator=%s latency_ms=%.1f result_count=%d", query, creator_id, elapsed_ms, len(results), ) return results except (KeyError, ValueError, TypeError) as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "creator_scoped_search reason=parse_error query=%r creator=%s error=%s latency_ms=%.1f", query, creator_id, exc, elapsed_ms, ) return [] async def _domain_scoped_search( self, query: str, domain: str, limit: int, db: AsyncSession, ) -> list[dict[str, Any]]: """Search LightRAG with domain keyword — no post-filtering.""" start = time.monotonic() try: resp = await self._httpx.post( f"{self._lightrag_url}/query/data", json={ "query": query, "mode": "hybrid", "top_k": limit, "ll_keywords": [domain], }, ) resp.raise_for_status() body = resp.json() except Exception as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "domain_scoped_search reason=%s query=%r domain=%s latency_ms=%.1f", type(exc).__name__, query, domain, elapsed_ms, ) return [] try: data = body.get("data", {}) chunks = data.get("chunks", []) if data else [] slug_set: set[str] = set() slug_order: list[str] = [] for chunk in chunks: file_path = chunk.get("file_path", "") m = self._FILE_SOURCE_RE.match(file_path) if m and m.group("slug") not in slug_set: slug = m.group("slug") slug_set.add(slug) slug_order.append(slug) if not slug_set: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "domain_scoped_search reason=no_parseable_results query=%r domain=%s latency_ms=%.1f", query, domain, elapsed_ms, ) return [] tp_stmt = ( select(TechniquePage, Creator) .join(Creator, TechniquePage.creator_id == Creator.id) .where(TechniquePage.slug.in_(list(slug_set))) ) tp_rows = await db.execute(tp_stmt) tp_map: dict[str, tuple] = {} for tp, cr in tp_rows.all(): tp_map[tp.slug] = (tp, cr) results: list[dict[str, Any]] = [] for idx, slug in enumerate(slug_order): pair = tp_map.get(slug) if not pair: continue tp, cr = pair score = max(1.0 - (idx * 0.05), 0.5) results.append({ "type": "technique_page", "title": tp.title, "slug": tp.slug, "technique_page_slug": tp.slug, "summary": tp.summary or "", "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), "creator_name": cr.name, "creator_slug": cr.slug, "created_at": tp.created_at.isoformat() if tp.created_at else "", "score": score, "match_context": "Domain-scoped LightRAG match", }) if len(results) >= limit: break elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "domain_scoped_search query=%r domain=%s latency_ms=%.1f result_count=%d", query, domain, elapsed_ms, len(results), ) return results except (KeyError, ValueError, TypeError) as exc: elapsed_ms = (time.monotonic() - start) * 1000 logger.warning( "domain_scoped_search reason=parse_error query=%r domain=%s error=%s latency_ms=%.1f", query, domain, exc, elapsed_ms, ) return [] # ── Orchestrator ───────────────────────────────────────────────────── async def search( self, query: str, scope: str, limit: int, db: AsyncSession, sort: str = "relevance", creator: str | None = None, ) -> dict[str, Any]: """Run semantic and keyword search in parallel, merge and deduplicate. When ``creator`` is provided, executes a 4-tier cascade: creator → domain → global → none, returning results from the first tier that produces hits. ``cascade_tier`` indicates which tier served. Both engines run concurrently. Keyword results are always included (with match_context). Semantic results above the score threshold are merged in, deduplicated by (type, slug/title). Keyword matches rank higher when they exist. """ start = time.monotonic() if not query or not query.strip(): return {"items": [], "partial_matches": [], "total": 0, "query": query, "fallback_used": False, "cascade_tier": ""} query = query.strip()[:500] if scope not in ("all", "topics", "creators"): scope = "all" cascade_tier = "" # ── Creator-scoped cascade ────────────────────────────────────── use_lightrag = len(query) >= self._lightrag_min_query_length if creator and use_lightrag: creator_id, creator_name = await self._resolve_creator(creator, db) if creator_id and creator_name: # Tier 1: creator-scoped tier1 = await self._creator_scoped_search(query, creator_id, creator_name, limit, db) if tier1: cascade_tier = "creator" lightrag_results = tier1 fallback_used = False else: # Tier 2: domain-scoped domain = await self._get_creator_domain(creator_id, db) tier2: list[dict[str, Any]] = [] if domain: tier2 = await self._domain_scoped_search(query, domain, limit, db) if tier2: cascade_tier = "domain" lightrag_results = tier2 fallback_used = False else: # Tier 3: global LightRAG tier3 = await self._lightrag_search(query, limit, db) if tier3: cascade_tier = "global" lightrag_results = tier3 fallback_used = False else: # Tier 4: no LightRAG results at all cascade_tier = "none" lightrag_results = [] fallback_used = True elapsed_cascade = (time.monotonic() - start) * 1000 logger.info( "cascade_search query=%r creator=%s tier=%s latency_ms=%.1f result_count=%d", query, creator, cascade_tier, elapsed_cascade, len(lightrag_results), ) # Skip to merge phase (keyword still runs for supplementary) # Jump past the non-cascade LightRAG block kw_result = await self.keyword_search(query, scope, limit, db, sort=sort) if fallback_used: # Qdrant semantic fallback vector = await self.embed_query(query) semantic_results: list[dict[str, Any]] = [] if vector: raw = await self.search_qdrant(vector, limit=limit) enriched = await self._enrich_qdrant_results(raw, db) semantic_results = [ item for item in enriched if item.get("score", 0) >= _SEMANTIC_MIN_SCORE ] for item in semantic_results: if not item.get("match_context"): item["match_context"] = "Semantic match" else: semantic_results = [] # Handle exceptions kw_items = kw_result["items"] if not isinstance(kw_result, Exception) else [] partial_matches = kw_result.get("partial_matches", []) if not isinstance(kw_result, Exception) else [] # Merge: cascade results first, then keyword, then semantic seen_keys: set[str] = set() merged: list[dict[str, Any]] = [] def _dedup_key(item: dict) -> str: t = item.get("type", "") s = item.get("slug") or item.get("technique_page_slug") or "" title = item.get("title", "") return f"{t}:{s}:{title}" for item in lightrag_results: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) for item in kw_items: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) for item in semantic_results: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) merged = self._apply_sort(merged, sort) elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "Search query=%r scope=%s cascade_tier=%s lightrag=%d keyword=%d semantic=%d merged=%d fallback=%s latency_ms=%.1f", query, scope, cascade_tier, len(lightrag_results), len(kw_items), len(semantic_results), len(merged), fallback_used, elapsed_ms, ) return { "items": merged[:limit], "partial_matches": partial_matches, "total": len(merged), "query": query, "fallback_used": fallback_used, "cascade_tier": cascade_tier, } else: logger.warning("cascade_search reason=creator_not_found creator_ref=%r", creator) # Fall through to normal search path # ── Primary: try LightRAG for queries ≥ min length ───────────── lightrag_results: list[dict[str, Any]] = [] fallback_used = True # assume fallback until LightRAG succeeds use_lightrag = len(query) >= self._lightrag_min_query_length if use_lightrag: lightrag_results = await self._lightrag_search(query, limit, db) if lightrag_results: fallback_used = False # ── Keyword search always runs (for merge/dedup) ───────────── async def _keyword(): return await self.keyword_search(query, scope, limit, db, sort=sort) # ── Fallback: Qdrant semantic (only when LightRAG didn't deliver) ── qdrant_type_filter = None # no type filter — all result types welcome async def _semantic(): vector = await self.embed_query(query) if vector is None: return [] results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter) enriched = await self._enrich_qdrant_results(results, db) # Filter by minimum score and add match_context filtered = [] for item in enriched: if item.get("score", 0) >= _SEMANTIC_MIN_SCORE: if not item.get("match_context"): item["match_context"] = "Semantic match" filtered.append(item) return filtered if fallback_used: # LightRAG returned nothing — run Qdrant semantic + keyword in parallel semantic_results, kw_result = await asyncio.gather( _semantic(), _keyword(), return_exceptions=True, ) else: # LightRAG succeeded — only need keyword for supplementary merge semantic_results = [] kw_result = await _keyword() # Handle exceptions gracefully if isinstance(semantic_results, Exception): logger.warning("Semantic search failed: %s", semantic_results) semantic_results = [] if isinstance(kw_result, Exception): logger.warning("Keyword search failed: %s", kw_result) kw_result = {"items": [], "partial_matches": []} kw_items = kw_result["items"] partial_matches = kw_result.get("partial_matches", []) # Merge: LightRAG results first (primary), then keyword, then Qdrant semantic seen_keys: set[str] = set() merged: list[dict[str, Any]] = [] def _dedup_key(item: dict) -> str: t = item.get("type", "") s = item.get("slug") or item.get("technique_page_slug") or "" title = item.get("title", "") return f"{t}:{s}:{title}" for item in lightrag_results: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) for item in kw_items: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) for item in semantic_results: key = _dedup_key(item) if key not in seen_keys: seen_keys.add(key) merged.append(item) # Apply sort merged = self._apply_sort(merged, sort) # fallback_used is already set above elapsed_ms = (time.monotonic() - start) * 1000 logger.info( "Search query=%r scope=%s lightrag=%d keyword=%d semantic=%d merged=%d partial=%d fallback=%s latency_ms=%.1f", query, scope, len(lightrag_results), len(kw_items), len(semantic_results), len(merged), len(partial_matches), fallback_used, elapsed_ms, ) return { "items": merged[:limit], "partial_matches": partial_matches, "total": len(merged), "query": query, "fallback_used": fallback_used, "cascade_tier": cascade_tier, } # ── Sort helpers ──────────────────────────────────────────────────── @staticmethod def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]: """Sort enriched result dicts by the requested criterion.""" if sort == "relevance" or not items: return items if sort == "newest": return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True) elif sort == "oldest": return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False) elif sort == "alpha": return sorted(items, key=lambda r: (r.get("title") or "").lower()) elif sort == "creator": return sorted( items, key=lambda r: ((r.get("creator_name") or "").lower(), (r.get("title") or "").lower()), ) return items # ── Result enrichment ──────────────────────────────────────────────── async def _enrich_qdrant_results( self, qdrant_results: list[dict[str, Any]], db: AsyncSession, ) -> list[dict[str, Any]]: """Enrich Qdrant results with creator names and slugs from DB. First reads creator_name from Qdrant payload; only hits DB for missing ones. """ enriched: list[dict[str, Any]] = [] # Collect creator_ids that need DB lookup needs_db_lookup: set[str] = set() for r in qdrant_results: payload = r.get("payload", {}) if not payload.get("creator_name") and payload.get("creator_id"): needs_db_lookup.add(payload["creator_id"]) # Collect source_video_ids for key_moment results to batch-fetch filenames video_ids_needed: set[str] = set() for r in qdrant_results: payload = r.get("payload", {}) if payload.get("type") == "key_moment" and payload.get("source_video_id"): video_ids_needed.add(payload["source_video_id"]) # Batch fetch creators from DB creator_map: dict[str, dict[str, str]] = {} if needs_db_lookup: valid_ids = [] for cid in needs_db_lookup: try: valid_ids.append(uuid_mod.UUID(cid)) except (ValueError, AttributeError): pass if valid_ids: stmt = select(Creator).where(Creator.id.in_(valid_ids)) result = await db.execute(stmt) for c in result.scalars().all(): creator_map[str(c.id)] = {"name": c.name, "slug": c.slug} # Batch fetch video filenames for key_moment results video_map: dict[str, str] = {} if video_ids_needed: valid_vids = [] for vid in video_ids_needed: try: valid_vids.append(uuid_mod.UUID(vid)) except (ValueError, AttributeError): pass if valid_vids: v_stmt = select(SourceVideo).where(SourceVideo.id.in_(valid_vids)) v_result = await db.execute(v_stmt) for sv in v_result.scalars().all(): video_map[str(sv.id)] = sv.filename or "" for r in qdrant_results: payload = r.get("payload", {}) cid = payload.get("creator_id", "") result_type = payload.get("type", "") # Creator name: prefer payload, fall back to DB creator_name = payload.get("creator_name", "") creator_slug = "" if not creator_name and cid: info = creator_map.get(cid, {"name": "", "slug": ""}) creator_name = info["name"] creator_slug = info["slug"] elif creator_name and cid: # We have the name from payload but need the slug from DB info = creator_map.get(cid, {}) creator_slug = info.get("slug", "") # Determine technique_page_slug based on result type if result_type == "technique_page": tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-")) elif result_type == "technique_section": tp_slug = payload.get("slug", "") else: tp_slug = payload.get("technique_page_slug", "") enriched.append({ "type": result_type, "title": payload.get("title", ""), "slug": payload.get("slug", payload.get("title", "").lower().replace(" ", "-")), "technique_page_slug": tp_slug, "summary": payload.get("summary", ""), "topic_category": payload.get("topic_category", ""), "topic_tags": payload.get("topic_tags", []), "creator_id": cid, "creator_name": creator_name, "creator_slug": creator_slug, "created_at": payload.get("created_at", ""), "score": r.get("score", 0.0), "match_context": "", "section_anchor": payload.get("section_anchor", "") if result_type == "technique_section" else "", "section_heading": payload.get("section_heading", "") if result_type == "technique_section" else "", "source_video_id": payload.get("source_video_id", "") if result_type == "key_moment" else "", "start_time": payload.get("start_time") if result_type == "key_moment" else None, "end_time": payload.get("end_time") if result_type == "key_moment" else None, "video_filename": video_map.get(payload.get("source_video_id", ""), "") if result_type == "key_moment" else "", }) return enriched