1209 lines
49 KiB
Python
1209 lines
49 KiB
Python
"""Async search service for the public search endpoint.
|
|
|
|
Orchestrates semantic search (embedding + Qdrant) with keyword search.
|
|
Both run in parallel — results are merged and deduplicated. Keyword matches
|
|
get a match_context explaining WHY they matched. Semantic-only results get
|
|
a "Semantic match" context and are filtered by a minimum score threshold.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import time
|
|
import uuid as uuid_mod
|
|
from typing import Any
|
|
|
|
import httpx
|
|
import openai
|
|
from qdrant_client import AsyncQdrantClient
|
|
from qdrant_client.http import exceptions as qdrant_exceptions
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
|
from sqlalchemy import and_, func, or_, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from config import Settings
|
|
from models import Creator, KeyMoment, SourceVideo, TechniquePage
|
|
|
|
logger = logging.getLogger("chrysopedia.search")
|
|
|
|
# Timeout for external calls (embedding API, Qdrant) in seconds
|
|
_EXTERNAL_TIMEOUT = 2.0 # 2s — Ollama local embedding needs more than 300ms
|
|
|
|
# Minimum cosine similarity score for semantic results to be included
|
|
_SEMANTIC_MIN_SCORE = 0.45
|
|
|
|
|
|
class SearchService:
|
|
"""Async search service with parallel semantic + keyword search.
|
|
|
|
Parameters
|
|
----------
|
|
settings:
|
|
Application settings containing embedding and Qdrant config.
|
|
"""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self._openai = openai.AsyncOpenAI(
|
|
base_url=settings.embedding_api_url,
|
|
api_key=settings.llm_api_key,
|
|
)
|
|
self._qdrant = AsyncQdrantClient(url=settings.qdrant_url)
|
|
self._collection = settings.qdrant_collection
|
|
|
|
# LightRAG client
|
|
self._httpx = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(settings.lightrag_search_timeout),
|
|
)
|
|
self._lightrag_url = settings.lightrag_url
|
|
self._lightrag_min_query_length = settings.lightrag_min_query_length
|
|
|
|
# ── Embedding ────────────────────────────────────────────────────────
|
|
|
|
async def embed_query(self, text: str) -> list[float] | None:
|
|
"""Embed a query string into a vector.
|
|
|
|
Returns None on any failure (timeout, connection, malformed response)
|
|
so the caller can fall back to keyword search.
|
|
"""
|
|
try:
|
|
response = await asyncio.wait_for(
|
|
self._openai.embeddings.create(
|
|
model=self.settings.embedding_model,
|
|
input=text,
|
|
),
|
|
timeout=_EXTERNAL_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Embedding API timeout (%.0fms limit) for query: %.50s…", _EXTERNAL_TIMEOUT * 1000, text)
|
|
return None
|
|
except (openai.APIConnectionError, openai.APITimeoutError) as exc:
|
|
logger.warning("Embedding API connection error (%s: %s)", type(exc).__name__, exc)
|
|
return None
|
|
except openai.APIError as exc:
|
|
logger.warning("Embedding API error (%s: %s)", type(exc).__name__, exc)
|
|
return None
|
|
|
|
if not response.data:
|
|
logger.warning("Embedding API returned empty data for query: %.50s…", text)
|
|
return None
|
|
|
|
vector = response.data[0].embedding
|
|
if len(vector) != self.settings.embedding_dimensions:
|
|
logger.warning(
|
|
"Embedding dimension mismatch: expected %d, got %d",
|
|
self.settings.embedding_dimensions,
|
|
len(vector),
|
|
)
|
|
return None
|
|
|
|
return vector
|
|
|
|
# ── Qdrant vector search ─────────────────────────────────────────────
|
|
|
|
async def search_qdrant(
|
|
self,
|
|
vector: list[float],
|
|
limit: int = 20,
|
|
type_filter: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Search Qdrant for nearest neighbours.
|
|
|
|
Returns a list of dicts with 'score' and 'payload' keys.
|
|
Returns empty list on failure.
|
|
"""
|
|
query_filter = None
|
|
if type_filter:
|
|
query_filter = Filter(
|
|
must=[FieldCondition(key="type", match=MatchValue(value=type_filter))]
|
|
)
|
|
|
|
try:
|
|
results = await asyncio.wait_for(
|
|
self._qdrant.query_points(
|
|
collection_name=self._collection,
|
|
query=vector,
|
|
query_filter=query_filter,
|
|
limit=limit,
|
|
with_payload=True,
|
|
),
|
|
timeout=_EXTERNAL_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Qdrant search timeout (%.0fms limit)", _EXTERNAL_TIMEOUT * 1000)
|
|
return []
|
|
except qdrant_exceptions.UnexpectedResponse as exc:
|
|
logger.warning("Qdrant search error: %s", exc)
|
|
return []
|
|
except Exception as exc:
|
|
logger.warning("Qdrant connection error (%s: %s)", type(exc).__name__, exc)
|
|
return []
|
|
|
|
return [
|
|
{"score": point.score, "payload": point.payload}
|
|
for point in results.points
|
|
]
|
|
|
|
# ── Token helpers ───────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _tokenize(query: str) -> list[str]:
|
|
"""Split query into non-empty lowercase tokens."""
|
|
return [t.lower() for t in query.split() if t]
|
|
|
|
@staticmethod
|
|
def _tp_token_condition(token: str):
|
|
"""Build an OR condition for a single token across TechniquePage + Creator fields."""
|
|
pat = f"%{token}%"
|
|
return or_(
|
|
TechniquePage.title.ilike(pat),
|
|
TechniquePage.summary.ilike(pat),
|
|
TechniquePage.topic_category.ilike(pat),
|
|
func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat),
|
|
Creator.name.ilike(pat),
|
|
)
|
|
|
|
@staticmethod
|
|
def _km_token_condition(token: str):
|
|
"""Build an OR condition for a single token across KeyMoment + Creator fields."""
|
|
pat = f"%{token}%"
|
|
return or_(
|
|
KeyMoment.title.ilike(pat),
|
|
KeyMoment.summary.ilike(pat),
|
|
Creator.name.ilike(pat),
|
|
)
|
|
|
|
@staticmethod
|
|
def _cr_token_condition(token: str):
|
|
"""Build an OR condition for a single token across Creator fields."""
|
|
pat = f"%{token}%"
|
|
return or_(
|
|
Creator.name.ilike(pat),
|
|
func.array_to_string(Creator.genres, " ").ilike(pat),
|
|
)
|
|
|
|
# ── Match context generation ────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _build_match_context(item: dict[str, Any], tokens: list[str]) -> str:
|
|
"""Generate a human-readable string explaining which fields matched.
|
|
|
|
Checks each token against each field and returns a compact summary
|
|
like "Creator: Keota · Tag: snare" or "Title: ...snare drum...".
|
|
"""
|
|
if not tokens:
|
|
return ""
|
|
|
|
matches: list[str] = []
|
|
seen: set[str] = set()
|
|
|
|
for token in tokens:
|
|
t = token.lower()
|
|
|
|
# Check creator name
|
|
creator = item.get("creator_name", "")
|
|
if creator and t in creator.lower() and "creator" not in seen:
|
|
matches.append(f"Creator: {creator}")
|
|
seen.add("creator")
|
|
continue
|
|
|
|
# Check title
|
|
title = item.get("title", "")
|
|
if title and t in title.lower() and "title" not in seen:
|
|
matches.append(f"Title match")
|
|
seen.add("title")
|
|
continue
|
|
|
|
# Check topic_category
|
|
cat = item.get("topic_category", "")
|
|
if cat and t in cat.lower() and "category" not in seen:
|
|
matches.append(f"Category: {cat}")
|
|
seen.add("category")
|
|
continue
|
|
|
|
# Check topic_tags
|
|
tags = item.get("topic_tags", [])
|
|
matched_tag = next((tag for tag in tags if t in tag.lower()), None)
|
|
if matched_tag and f"tag:{matched_tag.lower()}" not in seen:
|
|
matches.append(f"Tag: {matched_tag}")
|
|
seen.add(f"tag:{matched_tag.lower()}")
|
|
continue
|
|
|
|
# Check summary
|
|
summary = item.get("summary", "")
|
|
if summary and t in summary.lower() and "summary" not in seen:
|
|
# Extract a small context window around the match
|
|
idx = summary.lower().find(t)
|
|
start = max(0, idx - 20)
|
|
end = min(len(summary), idx + len(t) + 20)
|
|
snippet = summary[start:end].strip()
|
|
if start > 0:
|
|
snippet = "…" + snippet
|
|
if end < len(summary):
|
|
snippet = snippet + "…"
|
|
matches.append(f"Content: {snippet}")
|
|
seen.add("summary")
|
|
continue
|
|
|
|
return " · ".join(matches) if matches else ""
|
|
|
|
# ── Keyword search (multi-token AND) ─────────────────────────────────
|
|
|
|
async def keyword_search(
|
|
self,
|
|
query: str,
|
|
scope: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
sort: str = "relevance",
|
|
) -> dict[str, list[dict[str, Any]]]:
|
|
"""Multi-token AND keyword search across technique pages, key moments, and creators.
|
|
|
|
Tokenizes the query by whitespace. Each token must match at least one
|
|
indexed field (title, summary, topic_category, topic_tags, creator name,
|
|
genres). All tokens must match for a row to be included.
|
|
|
|
If AND matching returns zero results but individual tokens would match,
|
|
returns up to 5 partial_matches scored by the number of tokens matched.
|
|
|
|
Returns ``{"items": [...], "partial_matches": [...]}``.
|
|
"""
|
|
tokens = self._tokenize(query)
|
|
if not tokens:
|
|
return {"items": [], "partial_matches": []}
|
|
|
|
items = await self._keyword_search_and(tokens, scope, limit, db)
|
|
|
|
# Add match_context to each item
|
|
for item in items:
|
|
item["match_context"] = self._build_match_context(item, tokens)
|
|
|
|
partial: list[dict[str, Any]] = []
|
|
if not items and len(tokens) > 1:
|
|
partial = await self._keyword_partial_matches(tokens, scope, db)
|
|
for p in partial:
|
|
p["match_context"] = self._build_match_context(p, tokens)
|
|
|
|
return {"items": items, "partial_matches": partial}
|
|
|
|
async def _keyword_search_and(
|
|
self,
|
|
tokens: list[str],
|
|
scope: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""Run AND-logic keyword search — every token must match at least one field."""
|
|
results: list[dict[str, Any]] = []
|
|
|
|
if scope in ("all", "topics"):
|
|
tp_stmt = (
|
|
select(TechniquePage, Creator)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
.where(and_(*(self._tp_token_condition(t) for t in tokens)))
|
|
.limit(limit)
|
|
)
|
|
tp_rows = await db.execute(tp_stmt)
|
|
for tp, cr in tp_rows.all():
|
|
results.append({
|
|
"type": "technique_page",
|
|
"title": tp.title,
|
|
"slug": tp.slug,
|
|
"technique_page_slug": tp.slug,
|
|
"summary": tp.summary or "",
|
|
"topic_category": tp.topic_category,
|
|
"topic_tags": tp.topic_tags or [],
|
|
"creator_id": str(tp.creator_id),
|
|
"creator_name": cr.name,
|
|
"creator_slug": cr.slug,
|
|
"created_at": tp.created_at.isoformat() if tp.created_at else "",
|
|
"score": 0.0,
|
|
})
|
|
|
|
if scope in ("all",):
|
|
km_stmt = (
|
|
select(KeyMoment, SourceVideo, Creator, TechniquePage)
|
|
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
|
|
.join(Creator, SourceVideo.creator_id == Creator.id)
|
|
.outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id)
|
|
.where(and_(*(self._km_token_condition(t) for t in tokens)))
|
|
.limit(limit)
|
|
)
|
|
km_rows = await db.execute(km_stmt)
|
|
for km, sv, cr, tp in km_rows.all():
|
|
results.append({
|
|
"type": "key_moment",
|
|
"title": km.title,
|
|
"slug": "",
|
|
"technique_page_slug": tp.slug if tp else "",
|
|
"summary": km.summary or "",
|
|
"topic_category": "",
|
|
"topic_tags": [],
|
|
"creator_id": str(cr.id),
|
|
"creator_name": cr.name,
|
|
"creator_slug": cr.slug,
|
|
"created_at": km.created_at.isoformat() if hasattr(km, "created_at") and km.created_at else "",
|
|
"score": 0.0,
|
|
"source_video_id": str(km.source_video_id) if km.source_video_id else "",
|
|
"start_time": km.start_time,
|
|
"end_time": km.end_time,
|
|
"video_filename": (sv.filename or "") if sv else "",
|
|
})
|
|
|
|
if scope in ("all", "creators"):
|
|
cr_stmt = (
|
|
select(Creator)
|
|
.where(and_(*(self._cr_token_condition(t) for t in tokens)))
|
|
.limit(limit)
|
|
)
|
|
cr_rows = await db.execute(cr_stmt)
|
|
for cr in cr_rows.scalars().all():
|
|
results.append({
|
|
"type": "creator",
|
|
"title": cr.name,
|
|
"slug": cr.slug,
|
|
"technique_page_slug": "",
|
|
"summary": "",
|
|
"topic_category": "",
|
|
"topic_tags": cr.genres or [],
|
|
"creator_id": str(cr.id),
|
|
"created_at": cr.created_at.isoformat() if hasattr(cr, "created_at") and cr.created_at else "",
|
|
"score": 0.0,
|
|
})
|
|
|
|
return results[:limit]
|
|
|
|
async def _keyword_partial_matches(
|
|
self,
|
|
tokens: list[str],
|
|
scope: str,
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""When AND produces zero results, score rows by how many tokens match.
|
|
|
|
Returns the top 5 results ordered by match count descending.
|
|
"""
|
|
seen: dict[tuple[str, str], dict[str, Any]] = {}
|
|
match_counts: dict[tuple[str, str], int] = {}
|
|
|
|
for token in tokens:
|
|
single_results = await self._keyword_search_and([token], scope, 20, db)
|
|
for r in single_results:
|
|
key = (r["type"], r.get("slug") or r.get("title", ""))
|
|
if key not in seen:
|
|
seen[key] = r
|
|
match_counts[key] = 0
|
|
match_counts[key] += 1
|
|
|
|
ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True)
|
|
partial: list[dict[str, Any]] = []
|
|
for key in ranked[:5]:
|
|
item = seen[key]
|
|
item["score"] = match_counts[key] / len(tokens)
|
|
partial.append(item)
|
|
|
|
return partial
|
|
|
|
# ── LightRAG search ───────────────────────────────────────────────────
|
|
|
|
# Regex to parse file_source format: technique:{slug}:creator:{creator_id}
|
|
_FILE_SOURCE_RE = re.compile(r"^technique:(?P<slug>[^:]+):creator:(?P<creator_id>.+)$")
|
|
|
|
async def _lightrag_search(
|
|
self,
|
|
query: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""Query LightRAG /query/data for entities, relationships, and chunks.
|
|
|
|
Maps results back to SearchResultItem dicts using file_source parsing
|
|
and DB batch lookup. Returns empty list on any failure (timeout,
|
|
connection, parse error) with a WARNING log — caller falls back.
|
|
"""
|
|
start = time.monotonic()
|
|
try:
|
|
resp = await self._httpx.post(
|
|
f"{self._lightrag_url}/query/data",
|
|
json={"query": query, "mode": "hybrid", "top_k": limit},
|
|
)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
except httpx.TimeoutException:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=timeout query=%r latency_ms=%.1f",
|
|
query, elapsed_ms,
|
|
)
|
|
return []
|
|
except httpx.HTTPError as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=http_error query=%r error=%s latency_ms=%.1f",
|
|
query, exc, elapsed_ms,
|
|
)
|
|
return []
|
|
except Exception as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=unexpected query=%r error=%s latency_ms=%.1f",
|
|
query, exc, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
# Parse response
|
|
try:
|
|
data = body.get("data", {})
|
|
if not data:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=empty_data query=%r latency_ms=%.1f",
|
|
query, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
chunks = data.get("chunks", [])
|
|
entities = data.get("entities", [])
|
|
# relationships = data.get("relationships", []) # available for future use
|
|
|
|
# Extract technique slugs from chunk file_path/file_source fields
|
|
slug_set: set[str] = set()
|
|
slug_order: list[str] = [] # preserve retrieval rank
|
|
for chunk in chunks:
|
|
file_path = chunk.get("file_path", "")
|
|
m = self._FILE_SOURCE_RE.match(file_path)
|
|
if m and m.group("slug") not in slug_set:
|
|
slug = m.group("slug")
|
|
slug_set.add(slug)
|
|
slug_order.append(slug)
|
|
|
|
# Also try to extract slugs from entity names by matching DB later
|
|
entity_names: list[str] = []
|
|
for ent in entities:
|
|
name = ent.get("entity_name", "")
|
|
if name:
|
|
entity_names.append(name)
|
|
|
|
if not slug_set and not entity_names:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=no_parseable_results query=%r "
|
|
"chunks=%d entities=%d latency_ms=%.1f",
|
|
query, len(chunks), len(entities), elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
# Batch-lookup technique pages by slug
|
|
tp_map: dict[str, tuple] = {} # slug → (TechniquePage, Creator)
|
|
if slug_set:
|
|
tp_stmt = (
|
|
select(TechniquePage, Creator)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
.where(TechniquePage.slug.in_(list(slug_set)))
|
|
)
|
|
tp_rows = await db.execute(tp_stmt)
|
|
for tp, cr in tp_rows.all():
|
|
tp_map[tp.slug] = (tp, cr)
|
|
|
|
# If we have entity names but no chunk matches, try matching
|
|
# entity names against technique page titles or creator names
|
|
if entity_names and not tp_map:
|
|
entity_name_pats = [f"%{name}%" for name in entity_names[:20]]
|
|
tp_stmt2 = (
|
|
select(TechniquePage, Creator)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
.where(
|
|
or_(
|
|
TechniquePage.title.ilike(func.any_(entity_name_pats)),
|
|
Creator.name.ilike(func.any_(entity_name_pats)),
|
|
)
|
|
)
|
|
.limit(limit)
|
|
)
|
|
tp_rows2 = await db.execute(tp_stmt2)
|
|
for tp, cr in tp_rows2.all():
|
|
if tp.slug not in tp_map:
|
|
tp_map[tp.slug] = (tp, cr)
|
|
if tp.slug not in slug_set:
|
|
slug_order.append(tp.slug)
|
|
slug_set.add(tp.slug)
|
|
|
|
# Build result items in retrieval-rank order
|
|
results: list[dict[str, Any]] = []
|
|
seen_slugs: set[str] = set()
|
|
for idx, slug in enumerate(slug_order):
|
|
if slug in seen_slugs:
|
|
continue
|
|
seen_slugs.add(slug)
|
|
pair = tp_map.get(slug)
|
|
if not pair:
|
|
continue
|
|
tp, cr = pair
|
|
# Score: higher rank → higher score (1.0 down to ~0.5)
|
|
score = max(1.0 - (idx * 0.05), 0.5)
|
|
results.append({
|
|
"type": "technique_page",
|
|
"title": tp.title,
|
|
"slug": tp.slug,
|
|
"technique_page_slug": tp.slug,
|
|
"summary": tp.summary or "",
|
|
"topic_category": tp.topic_category,
|
|
"topic_tags": tp.topic_tags or [],
|
|
"creator_id": str(tp.creator_id),
|
|
"creator_name": cr.name,
|
|
"creator_slug": cr.slug,
|
|
"created_at": tp.created_at.isoformat() if tp.created_at else "",
|
|
"score": score,
|
|
"match_context": "LightRAG graph match",
|
|
})
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"lightrag_search query=%r latency_ms=%.1f result_count=%d chunks=%d entities=%d",
|
|
query, elapsed_ms, len(results), len(chunks), len(entities),
|
|
)
|
|
return results
|
|
|
|
except (KeyError, ValueError, TypeError) as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
body_snippet = str(body)[:200] if body else "<empty>"
|
|
logger.warning(
|
|
"lightrag_search_fallback reason=parse_error query=%r error=%s body=%.200s latency_ms=%.1f",
|
|
query, exc, body_snippet, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
# ── Creator-scoped cascade helpers ──────────────────────────────────
|
|
|
|
async def _resolve_creator(
|
|
self,
|
|
creator_ref: str,
|
|
db: AsyncSession,
|
|
) -> tuple[str | None, str | None]:
|
|
"""Resolve a creator slug or UUID to (creator_id, creator_name).
|
|
|
|
Returns (None, None) if the creator is not found.
|
|
"""
|
|
try:
|
|
creator_uuid = uuid_mod.UUID(creator_ref)
|
|
stmt = select(Creator).where(Creator.id == creator_uuid)
|
|
except (ValueError, AttributeError):
|
|
stmt = select(Creator).where(Creator.slug == creator_ref)
|
|
|
|
result = await db.execute(stmt)
|
|
cr = result.scalars().first()
|
|
if cr is None:
|
|
return None, None
|
|
return str(cr.id), cr.name
|
|
|
|
async def _get_creator_domain(
|
|
self,
|
|
creator_id: str,
|
|
db: AsyncSession,
|
|
) -> str | None:
|
|
"""Return the dominant topic_category for a creator, or None if <2 technique pages."""
|
|
stmt = (
|
|
select(
|
|
TechniquePage.topic_category,
|
|
func.count().label("cnt"),
|
|
)
|
|
.where(TechniquePage.creator_id == uuid_mod.UUID(creator_id))
|
|
.group_by(TechniquePage.topic_category)
|
|
.order_by(func.count().desc())
|
|
.limit(1)
|
|
)
|
|
result = await db.execute(stmt)
|
|
row = result.first()
|
|
if row is None:
|
|
return None
|
|
# Require at least 2 technique pages to declare a domain
|
|
if row.cnt < 2:
|
|
return None
|
|
return row.topic_category
|
|
|
|
async def _creator_scoped_search(
|
|
self,
|
|
query: str,
|
|
creator_id: str,
|
|
creator_name: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""Search LightRAG with creator name as keyword, post-filter by creator_id."""
|
|
start = time.monotonic()
|
|
try:
|
|
resp = await self._httpx.post(
|
|
f"{self._lightrag_url}/query/data",
|
|
json={
|
|
"query": query,
|
|
"mode": "hybrid",
|
|
"top_k": limit * 3,
|
|
"ll_keywords": [creator_name],
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
except Exception as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"creator_scoped_search reason=%s query=%r creator=%s latency_ms=%.1f",
|
|
type(exc).__name__, query, creator_id, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
try:
|
|
data = body.get("data", {})
|
|
chunks = data.get("chunks", []) if data else []
|
|
|
|
slug_set: set[str] = set()
|
|
slug_order: list[str] = []
|
|
for chunk in chunks:
|
|
file_path = chunk.get("file_path", "")
|
|
m = self._FILE_SOURCE_RE.match(file_path)
|
|
if m and m.group("slug") not in slug_set:
|
|
slug = m.group("slug")
|
|
slug_set.add(slug)
|
|
slug_order.append(slug)
|
|
|
|
if not slug_set:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"creator_scoped_search reason=no_parseable_results query=%r creator=%s latency_ms=%.1f",
|
|
query, creator_id, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
# Batch lookup and post-filter by creator_id
|
|
tp_stmt = (
|
|
select(TechniquePage, Creator)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
.where(TechniquePage.slug.in_(list(slug_set)))
|
|
)
|
|
tp_rows = await db.execute(tp_stmt)
|
|
tp_map: dict[str, tuple] = {}
|
|
for tp, cr in tp_rows.all():
|
|
if str(tp.creator_id) == creator_id:
|
|
tp_map[tp.slug] = (tp, cr)
|
|
|
|
results: list[dict[str, Any]] = []
|
|
for idx, slug in enumerate(slug_order):
|
|
pair = tp_map.get(slug)
|
|
if not pair:
|
|
continue
|
|
tp, cr = pair
|
|
score = max(1.0 - (idx * 0.05), 0.5)
|
|
results.append({
|
|
"type": "technique_page",
|
|
"title": tp.title,
|
|
"slug": tp.slug,
|
|
"technique_page_slug": tp.slug,
|
|
"summary": tp.summary or "",
|
|
"topic_category": tp.topic_category,
|
|
"topic_tags": tp.topic_tags or [],
|
|
"creator_id": str(tp.creator_id),
|
|
"creator_name": cr.name,
|
|
"creator_slug": cr.slug,
|
|
"created_at": tp.created_at.isoformat() if tp.created_at else "",
|
|
"score": score,
|
|
"match_context": "Creator-scoped LightRAG match",
|
|
})
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"creator_scoped_search query=%r creator=%s latency_ms=%.1f result_count=%d",
|
|
query, creator_id, elapsed_ms, len(results),
|
|
)
|
|
return results
|
|
|
|
except (KeyError, ValueError, TypeError) as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"creator_scoped_search reason=parse_error query=%r creator=%s error=%s latency_ms=%.1f",
|
|
query, creator_id, exc, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
async def _domain_scoped_search(
|
|
self,
|
|
query: str,
|
|
domain: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""Search LightRAG with domain keyword — no post-filtering."""
|
|
start = time.monotonic()
|
|
try:
|
|
resp = await self._httpx.post(
|
|
f"{self._lightrag_url}/query/data",
|
|
json={
|
|
"query": query,
|
|
"mode": "hybrid",
|
|
"top_k": limit,
|
|
"ll_keywords": [domain],
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
except Exception as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"domain_scoped_search reason=%s query=%r domain=%s latency_ms=%.1f",
|
|
type(exc).__name__, query, domain, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
try:
|
|
data = body.get("data", {})
|
|
chunks = data.get("chunks", []) if data else []
|
|
|
|
slug_set: set[str] = set()
|
|
slug_order: list[str] = []
|
|
for chunk in chunks:
|
|
file_path = chunk.get("file_path", "")
|
|
m = self._FILE_SOURCE_RE.match(file_path)
|
|
if m and m.group("slug") not in slug_set:
|
|
slug = m.group("slug")
|
|
slug_set.add(slug)
|
|
slug_order.append(slug)
|
|
|
|
if not slug_set:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"domain_scoped_search reason=no_parseable_results query=%r domain=%s latency_ms=%.1f",
|
|
query, domain, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
tp_stmt = (
|
|
select(TechniquePage, Creator)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
.where(TechniquePage.slug.in_(list(slug_set)))
|
|
)
|
|
tp_rows = await db.execute(tp_stmt)
|
|
tp_map: dict[str, tuple] = {}
|
|
for tp, cr in tp_rows.all():
|
|
tp_map[tp.slug] = (tp, cr)
|
|
|
|
results: list[dict[str, Any]] = []
|
|
for idx, slug in enumerate(slug_order):
|
|
pair = tp_map.get(slug)
|
|
if not pair:
|
|
continue
|
|
tp, cr = pair
|
|
score = max(1.0 - (idx * 0.05), 0.5)
|
|
results.append({
|
|
"type": "technique_page",
|
|
"title": tp.title,
|
|
"slug": tp.slug,
|
|
"technique_page_slug": tp.slug,
|
|
"summary": tp.summary or "",
|
|
"topic_category": tp.topic_category,
|
|
"topic_tags": tp.topic_tags or [],
|
|
"creator_id": str(tp.creator_id),
|
|
"creator_name": cr.name,
|
|
"creator_slug": cr.slug,
|
|
"created_at": tp.created_at.isoformat() if tp.created_at else "",
|
|
"score": score,
|
|
"match_context": "Domain-scoped LightRAG match",
|
|
})
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"domain_scoped_search query=%r domain=%s latency_ms=%.1f result_count=%d",
|
|
query, domain, elapsed_ms, len(results),
|
|
)
|
|
return results
|
|
|
|
except (KeyError, ValueError, TypeError) as exc:
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.warning(
|
|
"domain_scoped_search reason=parse_error query=%r domain=%s error=%s latency_ms=%.1f",
|
|
query, domain, exc, elapsed_ms,
|
|
)
|
|
return []
|
|
|
|
# ── Orchestrator ─────────────────────────────────────────────────────
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
scope: str,
|
|
limit: int,
|
|
db: AsyncSession,
|
|
sort: str = "relevance",
|
|
creator: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Run semantic and keyword search in parallel, merge and deduplicate.
|
|
|
|
When ``creator`` is provided, executes a 4-tier cascade:
|
|
creator → domain → global → none, returning results from the first
|
|
tier that produces hits. ``cascade_tier`` indicates which tier served.
|
|
|
|
Both engines run concurrently. Keyword results are always included
|
|
(with match_context). Semantic results above the score threshold are
|
|
merged in, deduplicated by (type, slug/title). Keyword matches rank
|
|
higher when they exist.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
if not query or not query.strip():
|
|
return {"items": [], "partial_matches": [], "total": 0, "query": query, "fallback_used": False, "cascade_tier": ""}
|
|
|
|
query = query.strip()[:500]
|
|
if scope not in ("all", "topics", "creators"):
|
|
scope = "all"
|
|
|
|
cascade_tier = ""
|
|
|
|
# ── Creator-scoped cascade ──────────────────────────────────────
|
|
use_lightrag = len(query) >= self._lightrag_min_query_length
|
|
|
|
if creator and use_lightrag:
|
|
creator_id, creator_name = await self._resolve_creator(creator, db)
|
|
if creator_id and creator_name:
|
|
# Tier 1: creator-scoped
|
|
tier1 = await self._creator_scoped_search(query, creator_id, creator_name, limit, db)
|
|
if tier1:
|
|
cascade_tier = "creator"
|
|
lightrag_results = tier1
|
|
fallback_used = False
|
|
else:
|
|
# Tier 2: domain-scoped
|
|
domain = await self._get_creator_domain(creator_id, db)
|
|
tier2: list[dict[str, Any]] = []
|
|
if domain:
|
|
tier2 = await self._domain_scoped_search(query, domain, limit, db)
|
|
if tier2:
|
|
cascade_tier = "domain"
|
|
lightrag_results = tier2
|
|
fallback_used = False
|
|
else:
|
|
# Tier 3: global LightRAG
|
|
tier3 = await self._lightrag_search(query, limit, db)
|
|
if tier3:
|
|
cascade_tier = "global"
|
|
lightrag_results = tier3
|
|
fallback_used = False
|
|
else:
|
|
# Tier 4: no LightRAG results at all
|
|
cascade_tier = "none"
|
|
lightrag_results = []
|
|
fallback_used = True
|
|
|
|
elapsed_cascade = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"cascade_search query=%r creator=%s tier=%s latency_ms=%.1f result_count=%d",
|
|
query, creator, cascade_tier, elapsed_cascade, len(lightrag_results),
|
|
)
|
|
|
|
# Skip to merge phase (keyword still runs for supplementary)
|
|
# Jump past the non-cascade LightRAG block
|
|
kw_result = await self.keyword_search(query, scope, limit, db, sort=sort)
|
|
|
|
if fallback_used:
|
|
# Qdrant semantic fallback
|
|
vector = await self.embed_query(query)
|
|
semantic_results: list[dict[str, Any]] = []
|
|
if vector:
|
|
raw = await self.search_qdrant(vector, limit=limit)
|
|
enriched = await self._enrich_qdrant_results(raw, db)
|
|
semantic_results = [
|
|
item for item in enriched
|
|
if item.get("score", 0) >= _SEMANTIC_MIN_SCORE
|
|
]
|
|
for item in semantic_results:
|
|
if not item.get("match_context"):
|
|
item["match_context"] = "Semantic match"
|
|
else:
|
|
semantic_results = []
|
|
|
|
# Handle exceptions
|
|
kw_items = kw_result["items"] if not isinstance(kw_result, Exception) else []
|
|
partial_matches = kw_result.get("partial_matches", []) if not isinstance(kw_result, Exception) else []
|
|
|
|
# Merge: cascade results first, then keyword, then semantic
|
|
seen_keys: set[str] = set()
|
|
merged: list[dict[str, Any]] = []
|
|
|
|
def _dedup_key(item: dict) -> str:
|
|
t = item.get("type", "")
|
|
s = item.get("slug") or item.get("technique_page_slug") or ""
|
|
title = item.get("title", "")
|
|
return f"{t}:{s}:{title}"
|
|
|
|
for item in lightrag_results:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
for item in kw_items:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
for item in semantic_results:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
merged = self._apply_sort(merged, sort)
|
|
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"Search query=%r scope=%s cascade_tier=%s lightrag=%d keyword=%d semantic=%d merged=%d fallback=%s latency_ms=%.1f",
|
|
query, scope, cascade_tier, len(lightrag_results), len(kw_items),
|
|
len(semantic_results), len(merged), fallback_used, elapsed_ms,
|
|
)
|
|
|
|
return {
|
|
"items": merged[:limit],
|
|
"partial_matches": partial_matches,
|
|
"total": len(merged),
|
|
"query": query,
|
|
"fallback_used": fallback_used,
|
|
"cascade_tier": cascade_tier,
|
|
}
|
|
else:
|
|
logger.warning("cascade_search reason=creator_not_found creator_ref=%r", creator)
|
|
# Fall through to normal search path
|
|
|
|
# ── Primary: try LightRAG for queries ≥ min length ─────────────
|
|
lightrag_results: list[dict[str, Any]] = []
|
|
fallback_used = True # assume fallback until LightRAG succeeds
|
|
|
|
use_lightrag = len(query) >= self._lightrag_min_query_length
|
|
|
|
if use_lightrag:
|
|
lightrag_results = await self._lightrag_search(query, limit, db)
|
|
if lightrag_results:
|
|
fallback_used = False
|
|
|
|
# ── Keyword search always runs (for merge/dedup) ─────────────
|
|
async def _keyword():
|
|
return await self.keyword_search(query, scope, limit, db, sort=sort)
|
|
|
|
# ── Fallback: Qdrant semantic (only when LightRAG didn't deliver) ──
|
|
qdrant_type_filter = None # no type filter — all result types welcome
|
|
async def _semantic():
|
|
vector = await self.embed_query(query)
|
|
if vector is None:
|
|
return []
|
|
results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter)
|
|
enriched = await self._enrich_qdrant_results(results, db)
|
|
# Filter by minimum score and add match_context
|
|
filtered = []
|
|
for item in enriched:
|
|
if item.get("score", 0) >= _SEMANTIC_MIN_SCORE:
|
|
if not item.get("match_context"):
|
|
item["match_context"] = "Semantic match"
|
|
filtered.append(item)
|
|
return filtered
|
|
|
|
if fallback_used:
|
|
# LightRAG returned nothing — run Qdrant semantic + keyword in parallel
|
|
semantic_results, kw_result = await asyncio.gather(
|
|
_semantic(),
|
|
_keyword(),
|
|
return_exceptions=True,
|
|
)
|
|
else:
|
|
# LightRAG succeeded — only need keyword for supplementary merge
|
|
semantic_results = []
|
|
kw_result = await _keyword()
|
|
|
|
# Handle exceptions gracefully
|
|
if isinstance(semantic_results, Exception):
|
|
logger.warning("Semantic search failed: %s", semantic_results)
|
|
semantic_results = []
|
|
if isinstance(kw_result, Exception):
|
|
logger.warning("Keyword search failed: %s", kw_result)
|
|
kw_result = {"items": [], "partial_matches": []}
|
|
|
|
kw_items = kw_result["items"]
|
|
partial_matches = kw_result.get("partial_matches", [])
|
|
|
|
# Merge: LightRAG results first (primary), then keyword, then Qdrant semantic
|
|
seen_keys: set[str] = set()
|
|
merged: list[dict[str, Any]] = []
|
|
|
|
def _dedup_key(item: dict) -> str:
|
|
t = item.get("type", "")
|
|
s = item.get("slug") or item.get("technique_page_slug") or ""
|
|
title = item.get("title", "")
|
|
return f"{t}:{s}:{title}"
|
|
|
|
for item in lightrag_results:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
for item in kw_items:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
for item in semantic_results:
|
|
key = _dedup_key(item)
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
merged.append(item)
|
|
|
|
# Apply sort
|
|
merged = self._apply_sort(merged, sort)
|
|
|
|
# fallback_used is already set above
|
|
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
logger.info(
|
|
"Search query=%r scope=%s lightrag=%d keyword=%d semantic=%d merged=%d partial=%d fallback=%s latency_ms=%.1f",
|
|
query, scope, len(lightrag_results), len(kw_items), len(semantic_results),
|
|
len(merged), len(partial_matches), fallback_used, elapsed_ms,
|
|
)
|
|
|
|
return {
|
|
"items": merged[:limit],
|
|
"partial_matches": partial_matches,
|
|
"total": len(merged),
|
|
"query": query,
|
|
"fallback_used": fallback_used,
|
|
"cascade_tier": cascade_tier,
|
|
}
|
|
|
|
# ── Sort helpers ────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]:
|
|
"""Sort enriched result dicts by the requested criterion."""
|
|
if sort == "relevance" or not items:
|
|
return items
|
|
if sort == "newest":
|
|
return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
|
|
elif sort == "oldest":
|
|
return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False)
|
|
elif sort == "alpha":
|
|
return sorted(items, key=lambda r: (r.get("title") or "").lower())
|
|
elif sort == "creator":
|
|
return sorted(
|
|
items,
|
|
key=lambda r: ((r.get("creator_name") or "").lower(), (r.get("title") or "").lower()),
|
|
)
|
|
return items
|
|
|
|
# ── Result enrichment ────────────────────────────────────────────────
|
|
|
|
async def _enrich_qdrant_results(
|
|
self,
|
|
qdrant_results: list[dict[str, Any]],
|
|
db: AsyncSession,
|
|
) -> list[dict[str, Any]]:
|
|
"""Enrich Qdrant results with creator names and slugs from DB.
|
|
|
|
First reads creator_name from Qdrant payload; only hits DB for missing ones.
|
|
"""
|
|
enriched: list[dict[str, Any]] = []
|
|
|
|
# Collect creator_ids that need DB lookup
|
|
needs_db_lookup: set[str] = set()
|
|
for r in qdrant_results:
|
|
payload = r.get("payload", {})
|
|
if not payload.get("creator_name") and payload.get("creator_id"):
|
|
needs_db_lookup.add(payload["creator_id"])
|
|
|
|
# Collect source_video_ids for key_moment results to batch-fetch filenames
|
|
video_ids_needed: set[str] = set()
|
|
for r in qdrant_results:
|
|
payload = r.get("payload", {})
|
|
if payload.get("type") == "key_moment" and payload.get("source_video_id"):
|
|
video_ids_needed.add(payload["source_video_id"])
|
|
|
|
# Batch fetch creators from DB
|
|
creator_map: dict[str, dict[str, str]] = {}
|
|
if needs_db_lookup:
|
|
valid_ids = []
|
|
for cid in needs_db_lookup:
|
|
try:
|
|
valid_ids.append(uuid_mod.UUID(cid))
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
if valid_ids:
|
|
stmt = select(Creator).where(Creator.id.in_(valid_ids))
|
|
result = await db.execute(stmt)
|
|
for c in result.scalars().all():
|
|
creator_map[str(c.id)] = {"name": c.name, "slug": c.slug}
|
|
|
|
# Batch fetch video filenames for key_moment results
|
|
video_map: dict[str, str] = {}
|
|
if video_ids_needed:
|
|
valid_vids = []
|
|
for vid in video_ids_needed:
|
|
try:
|
|
valid_vids.append(uuid_mod.UUID(vid))
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
if valid_vids:
|
|
v_stmt = select(SourceVideo).where(SourceVideo.id.in_(valid_vids))
|
|
v_result = await db.execute(v_stmt)
|
|
for sv in v_result.scalars().all():
|
|
video_map[str(sv.id)] = sv.filename or ""
|
|
|
|
for r in qdrant_results:
|
|
payload = r.get("payload", {})
|
|
cid = payload.get("creator_id", "")
|
|
result_type = payload.get("type", "")
|
|
|
|
# Creator name: prefer payload, fall back to DB
|
|
creator_name = payload.get("creator_name", "")
|
|
creator_slug = ""
|
|
if not creator_name and cid:
|
|
info = creator_map.get(cid, {"name": "", "slug": ""})
|
|
creator_name = info["name"]
|
|
creator_slug = info["slug"]
|
|
elif creator_name and cid:
|
|
# We have the name from payload but need the slug from DB
|
|
info = creator_map.get(cid, {})
|
|
creator_slug = info.get("slug", "")
|
|
|
|
# Determine technique_page_slug based on result type
|
|
if result_type == "technique_page":
|
|
tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-"))
|
|
elif result_type == "technique_section":
|
|
tp_slug = payload.get("slug", "")
|
|
else:
|
|
tp_slug = payload.get("technique_page_slug", "")
|
|
|
|
enriched.append({
|
|
"type": result_type,
|
|
"title": payload.get("title", ""),
|
|
"slug": payload.get("slug", payload.get("title", "").lower().replace(" ", "-")),
|
|
"technique_page_slug": tp_slug,
|
|
"summary": payload.get("summary", ""),
|
|
"topic_category": payload.get("topic_category", ""),
|
|
"topic_tags": payload.get("topic_tags", []),
|
|
"creator_id": cid,
|
|
"creator_name": creator_name,
|
|
"creator_slug": creator_slug,
|
|
"created_at": payload.get("created_at", ""),
|
|
"score": r.get("score", 0.0),
|
|
"match_context": "",
|
|
"section_anchor": payload.get("section_anchor", "") if result_type == "technique_section" else "",
|
|
"section_heading": payload.get("section_heading", "") if result_type == "technique_section" else "",
|
|
"source_video_id": payload.get("source_video_id", "") if result_type == "key_moment" else "",
|
|
"start_time": payload.get("start_time") if result_type == "key_moment" else None,
|
|
"end_time": payload.get("end_time") if result_type == "key_moment" else None,
|
|
"video_filename": video_map.get(payload.get("source_video_id", ""), "") if result_type == "key_moment" else "",
|
|
})
|
|
|
|
return enriched
|