chrysopedia/backend/search_service.py
jlightner 57b8705e26 feat: Added per-section embedding to stage 6 for v2 technique pages wit…
- "backend/schemas.py"
- "backend/pipeline/stages.py"
- "backend/pipeline/qdrant_client.py"
- "backend/search_service.py"
- "backend/pipeline/test_section_embedding.py"

GSD-Task: S07/T01
2026-04-03 02:12:56 +00:00

609 lines
24 KiB
Python

"""Async search service for the public search endpoint.
Orchestrates semantic search (embedding + Qdrant) with keyword search.
Both run in parallel — results are merged and deduplicated. Keyword matches
get a match_context explaining WHY they matched. Semantic-only results get
a "Semantic match" context and are filtered by a minimum score threshold.
"""
from __future__ import annotations
import asyncio
import logging
import time
from typing import Any
import openai
from qdrant_client import AsyncQdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
from qdrant_client.models import FieldCondition, Filter, MatchValue
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import Settings
from models import Creator, KeyMoment, SourceVideo, TechniquePage
logger = logging.getLogger("chrysopedia.search")
# Timeout for external calls (embedding API, Qdrant) in seconds
_EXTERNAL_TIMEOUT = 2.0 # 2s — Ollama local embedding needs more than 300ms
# Minimum cosine similarity score for semantic results to be included
_SEMANTIC_MIN_SCORE = 0.45
class SearchService:
"""Async search service with parallel semantic + keyword search.
Parameters
----------
settings:
Application settings containing embedding and Qdrant config.
"""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._openai = openai.AsyncOpenAI(
base_url=settings.embedding_api_url,
api_key=settings.llm_api_key,
)
self._qdrant = AsyncQdrantClient(url=settings.qdrant_url)
self._collection = settings.qdrant_collection
# ── Embedding ────────────────────────────────────────────────────────
async def embed_query(self, text: str) -> list[float] | None:
"""Embed a query string into a vector.
Returns None on any failure (timeout, connection, malformed response)
so the caller can fall back to keyword search.
"""
try:
response = await asyncio.wait_for(
self._openai.embeddings.create(
model=self.settings.embedding_model,
input=text,
),
timeout=_EXTERNAL_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("Embedding API timeout (%.0fms limit) for query: %.50s", _EXTERNAL_TIMEOUT * 1000, text)
return None
except (openai.APIConnectionError, openai.APITimeoutError) as exc:
logger.warning("Embedding API connection error (%s: %s)", type(exc).__name__, exc)
return None
except openai.APIError as exc:
logger.warning("Embedding API error (%s: %s)", type(exc).__name__, exc)
return None
if not response.data:
logger.warning("Embedding API returned empty data for query: %.50s", text)
return None
vector = response.data[0].embedding
if len(vector) != self.settings.embedding_dimensions:
logger.warning(
"Embedding dimension mismatch: expected %d, got %d",
self.settings.embedding_dimensions,
len(vector),
)
return None
return vector
# ── Qdrant vector search ─────────────────────────────────────────────
async def search_qdrant(
self,
vector: list[float],
limit: int = 20,
type_filter: str | None = None,
) -> list[dict[str, Any]]:
"""Search Qdrant for nearest neighbours.
Returns a list of dicts with 'score' and 'payload' keys.
Returns empty list on failure.
"""
query_filter = None
if type_filter:
query_filter = Filter(
must=[FieldCondition(key="type", match=MatchValue(value=type_filter))]
)
try:
results = await asyncio.wait_for(
self._qdrant.query_points(
collection_name=self._collection,
query=vector,
query_filter=query_filter,
limit=limit,
with_payload=True,
),
timeout=_EXTERNAL_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("Qdrant search timeout (%.0fms limit)", _EXTERNAL_TIMEOUT * 1000)
return []
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning("Qdrant search error: %s", exc)
return []
except Exception as exc:
logger.warning("Qdrant connection error (%s: %s)", type(exc).__name__, exc)
return []
return [
{"score": point.score, "payload": point.payload}
for point in results.points
]
# ── Token helpers ───────────────────────────────────────────────────
@staticmethod
def _tokenize(query: str) -> list[str]:
"""Split query into non-empty lowercase tokens."""
return [t.lower() for t in query.split() if t]
@staticmethod
def _tp_token_condition(token: str):
"""Build an OR condition for a single token across TechniquePage + Creator fields."""
pat = f"%{token}%"
return or_(
TechniquePage.title.ilike(pat),
TechniquePage.summary.ilike(pat),
TechniquePage.topic_category.ilike(pat),
func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat),
Creator.name.ilike(pat),
)
@staticmethod
def _km_token_condition(token: str):
"""Build an OR condition for a single token across KeyMoment + Creator fields."""
pat = f"%{token}%"
return or_(
KeyMoment.title.ilike(pat),
KeyMoment.summary.ilike(pat),
Creator.name.ilike(pat),
)
@staticmethod
def _cr_token_condition(token: str):
"""Build an OR condition for a single token across Creator fields."""
pat = f"%{token}%"
return or_(
Creator.name.ilike(pat),
func.array_to_string(Creator.genres, " ").ilike(pat),
)
# ── Match context generation ────────────────────────────────────────
@staticmethod
def _build_match_context(item: dict[str, Any], tokens: list[str]) -> str:
"""Generate a human-readable string explaining which fields matched.
Checks each token against each field and returns a compact summary
like "Creator: Keota · Tag: snare" or "Title: ...snare drum...".
"""
if not tokens:
return ""
matches: list[str] = []
seen: set[str] = set()
for token in tokens:
t = token.lower()
# Check creator name
creator = item.get("creator_name", "")
if creator and t in creator.lower() and "creator" not in seen:
matches.append(f"Creator: {creator}")
seen.add("creator")
continue
# Check title
title = item.get("title", "")
if title and t in title.lower() and "title" not in seen:
matches.append(f"Title match")
seen.add("title")
continue
# Check topic_category
cat = item.get("topic_category", "")
if cat and t in cat.lower() and "category" not in seen:
matches.append(f"Category: {cat}")
seen.add("category")
continue
# Check topic_tags
tags = item.get("topic_tags", [])
matched_tag = next((tag for tag in tags if t in tag.lower()), None)
if matched_tag and f"tag:{matched_tag.lower()}" not in seen:
matches.append(f"Tag: {matched_tag}")
seen.add(f"tag:{matched_tag.lower()}")
continue
# Check summary
summary = item.get("summary", "")
if summary and t in summary.lower() and "summary" not in seen:
# Extract a small context window around the match
idx = summary.lower().find(t)
start = max(0, idx - 20)
end = min(len(summary), idx + len(t) + 20)
snippet = summary[start:end].strip()
if start > 0:
snippet = "" + snippet
if end < len(summary):
snippet = snippet + ""
matches.append(f"Content: {snippet}")
seen.add("summary")
continue
return " · ".join(matches) if matches else ""
# ── Keyword search (multi-token AND) ─────────────────────────────────
async def keyword_search(
self,
query: str,
scope: str,
limit: int,
db: AsyncSession,
sort: str = "relevance",
) -> dict[str, list[dict[str, Any]]]:
"""Multi-token AND keyword search across technique pages, key moments, and creators.
Tokenizes the query by whitespace. Each token must match at least one
indexed field (title, summary, topic_category, topic_tags, creator name,
genres). All tokens must match for a row to be included.
If AND matching returns zero results but individual tokens would match,
returns up to 5 partial_matches scored by the number of tokens matched.
Returns ``{"items": [...], "partial_matches": [...]}``.
"""
tokens = self._tokenize(query)
if not tokens:
return {"items": [], "partial_matches": []}
items = await self._keyword_search_and(tokens, scope, limit, db)
# Add match_context to each item
for item in items:
item["match_context"] = self._build_match_context(item, tokens)
partial: list[dict[str, Any]] = []
if not items and len(tokens) > 1:
partial = await self._keyword_partial_matches(tokens, scope, db)
for p in partial:
p["match_context"] = self._build_match_context(p, tokens)
return {"items": items, "partial_matches": partial}
async def _keyword_search_and(
self,
tokens: list[str],
scope: str,
limit: int,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Run AND-logic keyword search — every token must match at least one field."""
results: list[dict[str, Any]] = []
if scope in ("all", "topics"):
tp_stmt = (
select(TechniquePage, Creator)
.join(Creator, TechniquePage.creator_id == Creator.id)
.where(and_(*(self._tp_token_condition(t) for t in tokens)))
.limit(limit)
)
tp_rows = await db.execute(tp_stmt)
for tp, cr in tp_rows.all():
results.append({
"type": "technique_page",
"title": tp.title,
"slug": tp.slug,
"technique_page_slug": tp.slug,
"summary": tp.summary or "",
"topic_category": tp.topic_category,
"topic_tags": tp.topic_tags or [],
"creator_id": str(tp.creator_id),
"creator_name": cr.name,
"creator_slug": cr.slug,
"created_at": tp.created_at.isoformat() if tp.created_at else "",
"score": 0.0,
})
if scope in ("all",):
km_stmt = (
select(KeyMoment, SourceVideo, Creator, TechniquePage)
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
.join(Creator, SourceVideo.creator_id == Creator.id)
.outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id)
.where(and_(*(self._km_token_condition(t) for t in tokens)))
.limit(limit)
)
km_rows = await db.execute(km_stmt)
for km, sv, cr, tp in km_rows.all():
results.append({
"type": "key_moment",
"title": km.title,
"slug": "",
"technique_page_slug": tp.slug if tp else "",
"summary": km.summary or "",
"topic_category": "",
"topic_tags": [],
"creator_id": str(cr.id),
"creator_name": cr.name,
"creator_slug": cr.slug,
"created_at": km.created_at.isoformat() if hasattr(km, "created_at") and km.created_at else "",
"score": 0.0,
})
if scope in ("all", "creators"):
cr_stmt = (
select(Creator)
.where(and_(*(self._cr_token_condition(t) for t in tokens)))
.limit(limit)
)
cr_rows = await db.execute(cr_stmt)
for cr in cr_rows.scalars().all():
results.append({
"type": "creator",
"title": cr.name,
"slug": cr.slug,
"technique_page_slug": "",
"summary": "",
"topic_category": "",
"topic_tags": cr.genres or [],
"creator_id": str(cr.id),
"created_at": cr.created_at.isoformat() if hasattr(cr, "created_at") and cr.created_at else "",
"score": 0.0,
})
return results[:limit]
async def _keyword_partial_matches(
self,
tokens: list[str],
scope: str,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""When AND produces zero results, score rows by how many tokens match.
Returns the top 5 results ordered by match count descending.
"""
seen: dict[tuple[str, str], dict[str, Any]] = {}
match_counts: dict[tuple[str, str], int] = {}
for token in tokens:
single_results = await self._keyword_search_and([token], scope, 20, db)
for r in single_results:
key = (r["type"], r.get("slug") or r.get("title", ""))
if key not in seen:
seen[key] = r
match_counts[key] = 0
match_counts[key] += 1
ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True)
partial: list[dict[str, Any]] = []
for key in ranked[:5]:
item = seen[key]
item["score"] = match_counts[key] / len(tokens)
partial.append(item)
return partial
# ── Orchestrator ─────────────────────────────────────────────────────
async def search(
self,
query: str,
scope: str,
limit: int,
db: AsyncSession,
sort: str = "relevance",
) -> dict[str, Any]:
"""Run semantic and keyword search in parallel, merge and deduplicate.
Both engines run concurrently. Keyword results are always included
(with match_context). Semantic results above the score threshold are
merged in, deduplicated by (type, slug/title). Keyword matches rank
higher when they exist.
"""
start = time.monotonic()
if not query or not query.strip():
return {"items": [], "partial_matches": [], "total": 0, "query": query, "fallback_used": False}
query = query.strip()[:500]
if scope not in ("all", "topics", "creators"):
scope = "all"
# Map scope to Qdrant type filter
# topics scope: no filter — both technique_page and technique_section
# should appear in semantic results
type_filter_map = {
"all": None,
"topics": None,
"creators": None,
}
qdrant_type_filter = type_filter_map.get(scope)
# Run both searches in parallel
async def _semantic():
vector = await self.embed_query(query)
if vector is None:
return []
results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter)
enriched = await self._enrich_qdrant_results(results, db)
# Filter by minimum score and add match_context
filtered = []
for item in enriched:
if item.get("score", 0) >= _SEMANTIC_MIN_SCORE:
if not item.get("match_context"):
item["match_context"] = "Semantic match"
filtered.append(item)
return filtered
async def _keyword():
return await self.keyword_search(query, scope, limit, db, sort=sort)
semantic_results, kw_result = await asyncio.gather(
_semantic(),
_keyword(),
return_exceptions=True,
)
# Handle exceptions gracefully
if isinstance(semantic_results, Exception):
logger.warning("Semantic search failed: %s", semantic_results)
semantic_results = []
if isinstance(kw_result, Exception):
logger.warning("Keyword search failed: %s", kw_result)
kw_result = {"items": [], "partial_matches": []}
kw_items = kw_result["items"]
partial_matches = kw_result.get("partial_matches", [])
# Merge: keyword results first (they have explicit match_context),
# then semantic results that aren't already present
seen_keys: set[str] = set()
merged: list[dict[str, Any]] = []
def _dedup_key(item: dict) -> str:
t = item.get("type", "")
s = item.get("slug") or item.get("technique_page_slug") or ""
title = item.get("title", "")
return f"{t}:{s}:{title}"
for item in kw_items:
key = _dedup_key(item)
if key not in seen_keys:
seen_keys.add(key)
merged.append(item)
for item in semantic_results:
key = _dedup_key(item)
if key not in seen_keys:
seen_keys.add(key)
merged.append(item)
# Apply sort
merged = self._apply_sort(merged, sort)
fallback_used = len(kw_items) > 0 and len(semantic_results) == 0
elapsed_ms = (time.monotonic() - start) * 1000
logger.info(
"Search query=%r scope=%s keyword=%d semantic=%d merged=%d partial=%d latency_ms=%.1f",
query, scope, len(kw_items), len(semantic_results),
len(merged), len(partial_matches), elapsed_ms,
)
return {
"items": merged[:limit],
"partial_matches": partial_matches,
"total": len(merged),
"query": query,
"fallback_used": fallback_used,
}
# ── Sort helpers ────────────────────────────────────────────────────
@staticmethod
def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]:
"""Sort enriched result dicts by the requested criterion."""
if sort == "relevance" or not items:
return items
if sort == "newest":
return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
elif sort == "oldest":
return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False)
elif sort == "alpha":
return sorted(items, key=lambda r: (r.get("title") or "").lower())
elif sort == "creator":
return sorted(
items,
key=lambda r: ((r.get("creator_name") or "").lower(), (r.get("title") or "").lower()),
)
return items
# ── Result enrichment ────────────────────────────────────────────────
async def _enrich_qdrant_results(
self,
qdrant_results: list[dict[str, Any]],
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Enrich Qdrant results with creator names and slugs from DB.
First reads creator_name from Qdrant payload; only hits DB for missing ones.
"""
enriched: list[dict[str, Any]] = []
# Collect creator_ids that need DB lookup
needs_db_lookup: set[str] = set()
for r in qdrant_results:
payload = r.get("payload", {})
if not payload.get("creator_name") and payload.get("creator_id"):
needs_db_lookup.add(payload["creator_id"])
# Batch fetch creators from DB
creator_map: dict[str, dict[str, str]] = {}
if needs_db_lookup:
import uuid as uuid_mod
valid_ids = []
for cid in needs_db_lookup:
try:
valid_ids.append(uuid_mod.UUID(cid))
except (ValueError, AttributeError):
pass
if valid_ids:
stmt = select(Creator).where(Creator.id.in_(valid_ids))
result = await db.execute(stmt)
for c in result.scalars().all():
creator_map[str(c.id)] = {"name": c.name, "slug": c.slug}
for r in qdrant_results:
payload = r.get("payload", {})
cid = payload.get("creator_id", "")
result_type = payload.get("type", "")
# Creator name: prefer payload, fall back to DB
creator_name = payload.get("creator_name", "")
creator_slug = ""
if not creator_name and cid:
info = creator_map.get(cid, {"name": "", "slug": ""})
creator_name = info["name"]
creator_slug = info["slug"]
elif creator_name and cid:
# We have the name from payload but need the slug from DB
info = creator_map.get(cid, {})
creator_slug = info.get("slug", "")
# Determine technique_page_slug based on result type
if result_type == "technique_page":
tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-"))
elif result_type == "technique_section":
tp_slug = payload.get("slug", "")
else:
tp_slug = payload.get("technique_page_slug", "")
enriched.append({
"type": result_type,
"title": payload.get("title", ""),
"slug": payload.get("slug", payload.get("title", "").lower().replace(" ", "-")),
"technique_page_slug": tp_slug,
"summary": payload.get("summary", ""),
"topic_category": payload.get("topic_category", ""),
"topic_tags": payload.get("topic_tags", []),
"creator_id": cid,
"creator_name": creator_name,
"creator_slug": creator_slug,
"created_at": payload.get("created_at", ""),
"score": r.get("score", 0.0),
"match_context": "",
"section_anchor": payload.get("section_anchor", "") if result_type == "technique_section" else "",
"section_heading": payload.get("section_heading", "") if result_type == "technique_section" else "",
})
return enriched