fix: Parallel search with match_context, deterministic Qdrant IDs, raised embedding timeout

- Search now runs semantic + keyword in parallel, merges and deduplicates
- Keyword results always included with match_context explaining WHY matched
- Semantic results filtered by minimum score threshold (0.45)
- match_context shows 'Creator: X', 'Tag: Y', 'Title match', 'Content: ...'
- Qdrant points use deterministic uuid5 IDs (no more duplicates on reindex)
- Embedding timeout raised from 300ms to 2s (Ollama needs it)
- _enrich_qdrant_results reads creator_name from payload before DB fallback
- Frontend displays match_context as highlighted bar on search result cards
This commit is contained in:
jlightner 2026-04-01 06:54:22 +00:00
parent 94da19c05d
commit 5f608b8889
7 changed files with 285 additions and 112 deletions

View file

@ -18,6 +18,9 @@ from config import Settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Namespace UUID for deterministic point IDs
_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
class QdrantManager: class QdrantManager:
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors.""" """Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
@ -161,8 +164,10 @@ class QdrantManager:
points = [] points = []
for page, vector in zip(pages, vectors): for page, vector in zip(pages, vectors):
# Deterministic UUID: same page always gets the same point ID
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"tp:{page['page_id']}"))
point = PointStruct( point = PointStruct(
id=str(uuid.uuid4()), id=point_id,
vector=vector, vector=vector,
payload={ payload={
"type": "technique_page", "type": "technique_page",
@ -207,8 +212,10 @@ class QdrantManager:
points = [] points = []
for moment, vector in zip(moments, vectors): for moment, vector in zip(moments, vectors):
# Deterministic UUID: same moment always gets the same point ID
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"km:{moment['moment_id']}"))
point = PointStruct( point = PointStruct(
id=str(uuid.uuid4()), id=point_id,
vector=vector, vector=vector,
payload={ payload={
"type": "key_moment", "type": "key_moment",

View file

@ -214,6 +214,7 @@ class SearchResultItem(BaseModel):
creator_slug: str = "" creator_slug: str = ""
topic_category: str = "" topic_category: str = ""
topic_tags: list[str] = Field(default_factory=list) topic_tags: list[str] = Field(default_factory=list)
match_context: str = ""
class SearchResponse(BaseModel): class SearchResponse(BaseModel):

View file

@ -1,8 +1,9 @@
"""Async search service for the public search endpoint. """Async search service for the public search endpoint.
Orchestrates semantic search (embedding + Qdrant) with keyword fallback. Orchestrates semantic search (embedding + Qdrant) with keyword search.
All external calls have timeouts and graceful degradation if embedding Both run in parallel results are merged and deduplicated. Keyword matches
or Qdrant fail, the service falls back to keyword-only (ILIKE) search. get a match_context explaining WHY they matched. Semantic-only results get
a "Semantic match" context and are filtered by a minimum score threshold.
""" """
from __future__ import annotations from __future__ import annotations
@ -25,11 +26,14 @@ from models import Creator, KeyMoment, SourceVideo, TechniquePage
logger = logging.getLogger("chrysopedia.search") logger = logging.getLogger("chrysopedia.search")
# Timeout for external calls (embedding API, Qdrant) in seconds # Timeout for external calls (embedding API, Qdrant) in seconds
_EXTERNAL_TIMEOUT = 0.3 # 300ms per plan _EXTERNAL_TIMEOUT = 2.0 # 2s — Ollama local embedding needs more than 300ms
# Minimum cosine similarity score for semantic results to be included
_SEMANTIC_MIN_SCORE = 0.45
class SearchService: class SearchService:
"""Async search service with semantic + keyword fallback. """Async search service with parallel semantic + keyword search.
Parameters Parameters
---------- ----------
@ -132,14 +136,12 @@ class SearchService:
for point in results.points for point in results.points
] ]
# ── Keyword fallback ─────────────────────────────────────────────────
# ── Token helpers ─────────────────────────────────────────────────── # ── Token helpers ───────────────────────────────────────────────────
@staticmethod @staticmethod
def _tokenize(query: str) -> list[str]: def _tokenize(query: str) -> list[str]:
"""Split query into non-empty tokens.""" """Split query into non-empty lowercase tokens."""
return [t for t in query.split() if t] return [t.lower() for t in query.split() if t]
@staticmethod @staticmethod
def _tp_token_condition(token: str): def _tp_token_condition(token: str):
@ -172,6 +174,71 @@ class SearchService:
func.array_to_string(Creator.genres, " ").ilike(pat), func.array_to_string(Creator.genres, " ").ilike(pat),
) )
# ── Match context generation ────────────────────────────────────────
@staticmethod
def _build_match_context(item: dict[str, Any], tokens: list[str]) -> str:
"""Generate a human-readable string explaining which fields matched.
Checks each token against each field and returns a compact summary
like "Creator: Keota · Tag: snare" or "Title: ...snare drum...".
"""
if not tokens:
return ""
matches: list[str] = []
seen: set[str] = set()
for token in tokens:
t = token.lower()
# Check creator name
creator = item.get("creator_name", "")
if creator and t in creator.lower() and "creator" not in seen:
matches.append(f"Creator: {creator}")
seen.add("creator")
continue
# Check title
title = item.get("title", "")
if title and t in title.lower() and "title" not in seen:
matches.append(f"Title match")
seen.add("title")
continue
# Check topic_category
cat = item.get("topic_category", "")
if cat and t in cat.lower() and "category" not in seen:
matches.append(f"Category: {cat}")
seen.add("category")
continue
# Check topic_tags
tags = item.get("topic_tags", [])
matched_tag = next((tag for tag in tags if t in tag.lower()), None)
if matched_tag and f"tag:{matched_tag.lower()}" not in seen:
matches.append(f"Tag: {matched_tag}")
seen.add(f"tag:{matched_tag.lower()}")
continue
# Check summary
summary = item.get("summary", "")
if summary and t in summary.lower() and "summary" not in seen:
# Extract a small context window around the match
idx = summary.lower().find(t)
start = max(0, idx - 20)
end = min(len(summary), idx + len(t) + 20)
snippet = summary[start:end].strip()
if start > 0:
snippet = "" + snippet
if end < len(summary):
snippet = snippet + ""
matches.append(f"Content: {snippet}")
seen.add("summary")
continue
return " · ".join(matches) if matches else ""
# ── Keyword search (multi-token AND) ───────────────────────────────── # ── Keyword search (multi-token AND) ─────────────────────────────────
async def keyword_search( async def keyword_search(
@ -199,13 +266,15 @@ class SearchService:
items = await self._keyword_search_and(tokens, scope, limit, db) items = await self._keyword_search_and(tokens, scope, limit, db)
# Enrich with creator names # Add match_context to each item
items = await self._enrich_keyword_creator_names(items, db) for item in items:
item["match_context"] = self._build_match_context(item, tokens)
partial: list[dict[str, Any]] = [] partial: list[dict[str, Any]] = []
if not items and len(tokens) > 1: if not items and len(tokens) > 1:
partial = await self._keyword_partial_matches(tokens, scope, db) partial = await self._keyword_partial_matches(tokens, scope, db)
partial = await self._enrich_keyword_creator_names(partial, db) for p in partial:
p["match_context"] = self._build_match_context(p, tokens)
return {"items": items, "partial_matches": partial} return {"items": items, "partial_matches": partial}
@ -323,44 +392,6 @@ class SearchService:
return partial return partial
async def _enrich_keyword_creator_names(
self,
results: list[dict[str, Any]],
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Fill in creator_name/creator_slug for results that don't have them yet."""
needs_enrichment = [
r for r in results
if r.get("creator_id") and not r.get("creator_name")
]
if not needs_enrichment:
return results
import uuid as _uuid_mod
cids: set[str] = {r["creator_id"] for r in needs_enrichment}
valid = []
for cid in cids:
try:
valid.append(_uuid_mod.UUID(cid))
except (ValueError, AttributeError):
pass
creator_map: dict[str, dict[str, str]] = {}
if valid:
cr_stmt = select(Creator).where(Creator.id.in_(valid))
cr_result = await db.execute(cr_stmt)
for c in cr_result.scalars().all():
creator_map[str(c.id)] = {"name": c.name, "slug": c.slug}
for r in results:
if not r.get("creator_name"):
info = creator_map.get(r.get("creator_id", ""), {"name": "", "slug": ""})
r["creator_name"] = info["name"]
r["creator_slug"] = info["slug"]
return results
# ── Orchestrator ───────────────────────────────────────────────────── # ── Orchestrator ─────────────────────────────────────────────────────
async def search( async def search(
@ -371,20 +402,19 @@ class SearchService:
db: AsyncSession, db: AsyncSession,
sort: str = "relevance", sort: str = "relevance",
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Run semantic search with keyword fallback. """Run semantic and keyword search in parallel, merge and deduplicate.
Returns a dict matching the SearchResponse schema shape. Both engines run concurrently. Keyword results are always included
(with match_context). Semantic results above the score threshold are
merged in, deduplicated by (type, slug/title). Keyword matches rank
higher when they exist.
""" """
start = time.monotonic() start = time.monotonic()
# Validate / sanitize inputs
if not query or not query.strip(): if not query or not query.strip():
return {"items": [], "total": 0, "query": query, "fallback_used": False} return {"items": [], "partial_matches": [], "total": 0, "query": query, "fallback_used": False}
# Truncate long queries
query = query.strip()[:500] query = query.strip()[:500]
# Normalize scope
if scope not in ("all", "topics", "creators"): if scope not in ("all", "topics", "creators"):
scope = "all" scope = "all"
@ -392,49 +422,85 @@ class SearchService:
type_filter_map = { type_filter_map = {
"all": None, "all": None,
"topics": "technique_page", "topics": "technique_page",
"creators": None, # creators aren't in Qdrant "creators": None,
} }
qdrant_type_filter = type_filter_map.get(scope) qdrant_type_filter = type_filter_map.get(scope)
fallback_used = False # Run both searches in parallel
items: list[dict[str, Any]] = [] async def _semantic():
# Try semantic search
vector = await self.embed_query(query) vector = await self.embed_query(query)
if vector is not None: if vector is None:
qdrant_results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter) return []
if qdrant_results: results = await self.search_qdrant(vector, limit=limit, type_filter=qdrant_type_filter)
# Enrich Qdrant results with DB metadata enriched = await self._enrich_qdrant_results(results, db)
items = await self._enrich_results(qdrant_results, db) # Filter by minimum score and add match_context
filtered = []
for item in enriched:
if item.get("score", 0) >= _SEMANTIC_MIN_SCORE:
if not item.get("match_context"):
item["match_context"] = "Semantic match"
filtered.append(item)
return filtered
# Fallback to keyword search if semantic failed or returned nothing async def _keyword():
if not items: return await self.keyword_search(query, scope, limit, db, sort=sort)
kw_result = await self.keyword_search(query, scope, limit, db, sort=sort)
items = kw_result["items"] semantic_results, kw_result = await asyncio.gather(
_semantic(),
_keyword(),
return_exceptions=True,
)
# Handle exceptions gracefully
if isinstance(semantic_results, Exception):
logger.warning("Semantic search failed: %s", semantic_results)
semantic_results = []
if isinstance(kw_result, Exception):
logger.warning("Keyword search failed: %s", kw_result)
kw_result = {"items": [], "partial_matches": []}
kw_items = kw_result["items"]
partial_matches = kw_result.get("partial_matches", []) partial_matches = kw_result.get("partial_matches", [])
fallback_used = True
else:
partial_matches = []
# Apply sort to enriched results (semantic or keyword) # Merge: keyword results first (they have explicit match_context),
items = self._apply_sort(items, sort) # then semantic results that aren't already present
seen_keys: set[str] = set()
merged: list[dict[str, Any]] = []
def _dedup_key(item: dict) -> str:
t = item.get("type", "")
s = item.get("slug") or item.get("technique_page_slug") or ""
title = item.get("title", "")
return f"{t}:{s}:{title}"
for item in kw_items:
key = _dedup_key(item)
if key not in seen_keys:
seen_keys.add(key)
merged.append(item)
for item in semantic_results:
key = _dedup_key(item)
if key not in seen_keys:
seen_keys.add(key)
merged.append(item)
# Apply sort
merged = self._apply_sort(merged, sort)
fallback_used = len(kw_items) > 0 and len(semantic_results) == 0
elapsed_ms = (time.monotonic() - start) * 1000 elapsed_ms = (time.monotonic() - start) * 1000
logger.info( logger.info(
"Search query=%r scope=%s results=%d partial=%d fallback=%s latency_ms=%.1f", "Search query=%r scope=%s keyword=%d semantic=%d merged=%d partial=%d latency_ms=%.1f",
query, query, scope, len(kw_items), len(semantic_results),
scope, len(merged), len(partial_matches), elapsed_ms,
len(items),
len(partial_matches),
fallback_used,
elapsed_ms,
) )
return { return {
"items": items, "items": merged[:limit],
"partial_matches": partial_matches, "partial_matches": partial_matches,
"total": len(items), "total": len(merged),
"query": query, "query": query,
"fallback_used": fallback_used, "fallback_used": fallback_used,
} }
@ -443,19 +509,12 @@ class SearchService:
@staticmethod @staticmethod
def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]: def _apply_sort(items: list[dict[str, Any]], sort: str) -> list[dict[str, Any]]:
"""Sort enriched result dicts by the requested criterion. """Sort enriched result dicts by the requested criterion."""
For 'relevance' (default), preserve existing order (score-based from
Qdrant or DB order from keyword search).
"""
if sort == "relevance" or not items: if sort == "relevance" or not items:
return items return items
if sort == "newest": if sort == "newest":
# Sort by created_at descending; items without it go last
return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True) return sorted(items, key=lambda r: r.get("created_at", ""), reverse=True)
elif sort == "oldest": elif sort == "oldest":
# Sort by created_at ascending; items without it go last
return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False) return sorted(items, key=lambda r: r.get("created_at") or "9999", reverse=False)
elif sort == "alpha": elif sort == "alpha":
return sorted(items, key=lambda r: (r.get("title") or "").lower()) return sorted(items, key=lambda r: (r.get("title") or "").lower())
@ -468,34 +527,34 @@ class SearchService:
# ── Result enrichment ──────────────────────────────────────────────── # ── Result enrichment ────────────────────────────────────────────────
async def _enrich_results( async def _enrich_qdrant_results(
self, self,
qdrant_results: list[dict[str, Any]], qdrant_results: list[dict[str, Any]],
db: AsyncSession, db: AsyncSession,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Enrich Qdrant results with creator names and slugs from DB.""" """Enrich Qdrant results with creator names and slugs from DB.
First reads creator_name from Qdrant payload; only hits DB for missing ones.
"""
enriched: list[dict[str, Any]] = [] enriched: list[dict[str, Any]] = []
# Collect creator_ids to batch-fetch # Collect creator_ids that need DB lookup
creator_ids = set() needs_db_lookup: set[str] = set()
for r in qdrant_results: for r in qdrant_results:
payload = r.get("payload", {}) payload = r.get("payload", {})
cid = payload.get("creator_id") if not payload.get("creator_name") and payload.get("creator_id"):
if cid: needs_db_lookup.add(payload["creator_id"])
creator_ids.add(cid)
# Batch fetch creators # Batch fetch creators from DB
creator_map: dict[str, dict[str, str]] = {} creator_map: dict[str, dict[str, str]] = {}
if creator_ids: if needs_db_lookup:
from sqlalchemy.dialects.postgresql import UUID as PgUUID
import uuid as uuid_mod import uuid as uuid_mod
valid_ids = [] valid_ids = []
for cid in creator_ids: for cid in needs_db_lookup:
try: try:
valid_ids.append(uuid_mod.UUID(cid)) valid_ids.append(uuid_mod.UUID(cid))
except (ValueError, AttributeError): except (ValueError, AttributeError):
pass pass
if valid_ids: if valid_ids:
stmt = select(Creator).where(Creator.id.in_(valid_ids)) stmt = select(Creator).where(Creator.id.in_(valid_ids))
result = await db.execute(stmt) result = await db.execute(stmt)
@ -505,9 +564,20 @@ class SearchService:
for r in qdrant_results: for r in qdrant_results:
payload = r.get("payload", {}) payload = r.get("payload", {})
cid = payload.get("creator_id", "") cid = payload.get("creator_id", "")
creator_info = creator_map.get(cid, {"name": "", "slug": ""})
result_type = payload.get("type", "") result_type = payload.get("type", "")
# Creator name: prefer payload, fall back to DB
creator_name = payload.get("creator_name", "")
creator_slug = ""
if not creator_name and cid:
info = creator_map.get(cid, {"name": "", "slug": ""})
creator_name = info["name"]
creator_slug = info["slug"]
elif creator_name and cid:
# We have the name from payload but need the slug from DB
info = creator_map.get(cid, {})
creator_slug = info.get("slug", "")
# Determine technique_page_slug based on result type # Determine technique_page_slug based on result type
if result_type == "technique_page": if result_type == "technique_page":
tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-")) tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-"))
@ -523,10 +593,11 @@ class SearchService:
"topic_category": payload.get("topic_category", ""), "topic_category": payload.get("topic_category", ""),
"topic_tags": payload.get("topic_tags", []), "topic_tags": payload.get("topic_tags", []),
"creator_id": cid, "creator_id": cid,
"creator_name": creator_info["name"], "creator_name": creator_name,
"creator_slug": creator_info["slug"], "creator_slug": creator_slug,
"created_at": payload.get("created_at", ""), "created_at": payload.get("created_at", ""),
"score": r.get("score", 0.0), "score": r.get("score", 0.0),
"match_context": "",
}) })
return enriched return enriched

View file

@ -647,3 +647,66 @@ async def test_suggestions_respects_view_count_ordering(client, db_engine):
# High Views Page should come before Low Views Page # High Views Page should come before Low Views Page
titles = [item["text"] for item in technique_items] titles = [item["text"] for item in technique_items]
assert titles.index("High Views Page") < titles.index("Low Views Page") assert titles.index("High Views Page") < titles.index("Low Views Page")
# ── Match context tests ──────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_keyword_search_match_context_creator(db_engine):
"""Match context includes creator name when query matches creator."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("Bill", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# At least one result should have match_context mentioning the creator
contexts = [r["match_context"] for r in items]
assert any("Creator: Mr. Bill" in c for c in contexts), f"Expected creator context, got: {contexts}"
@pytest.mark.asyncio
async def test_keyword_search_match_context_tag(db_engine):
"""Match context includes tag name when query matches a topic tag."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("granular", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
contexts = [r["match_context"] for r in items]
assert any("Tag: granular" in c for c in contexts), f"Expected tag context, got: {contexts}"
@pytest.mark.asyncio
async def test_keyword_search_match_context_multi_token(db_engine):
"""Multi-token match context shows multiple match reasons."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Bill bass" — "Bill" matches creator, "bass" matches tag/title
kw_result = await svc.keyword_search("Bill bass", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# The match_context should contain both creator and another field
contexts = [r["match_context"] for r in items]
assert any("Creator: Mr. Bill" in c for c in contexts)

View file

@ -1651,6 +1651,30 @@ a.app-footer__repo:hover {
margin-bottom: 0.375rem; margin-bottom: 0.375rem;
} }
.search-result-card__match-context {
display: flex;
align-items: center;
gap: 0.375rem;
font-size: 0.75rem;
color: var(--color-accent);
margin-bottom: 0.375rem;
padding: 0.25rem 0.5rem;
background: rgba(0, 255, 255, 0.06);
border-radius: 4px;
border-left: 2px solid var(--color-accent);
}
.match-context__icon {
flex-shrink: 0;
font-size: 0.625rem;
}
.match-context__text {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.search-result-card__meta { .search-result-card__meta {
display: flex; display: flex;
align-items: center; align-items: center;

View file

@ -18,6 +18,7 @@ export interface SearchResultItem {
topic_category: string; topic_category: string;
topic_tags: string[]; topic_tags: string[];
technique_page_slug?: string; technique_page_slug?: string;
match_context?: string;
} }
export interface SearchResponse { export interface SearchResponse {

View file

@ -165,6 +165,12 @@ function SearchResultCard({ item, staggerIndex }: { item: SearchResultItem; stag
{item.type === "technique_page" ? "Technique" : "Key Moment"} {item.type === "technique_page" ? "Technique" : "Key Moment"}
</span> </span>
</div> </div>
{item.match_context && (
<div className="search-result-card__match-context">
<span className="match-context__icon"></span>
<span className="match-context__text">{item.match_context}</span>
</div>
)}
{item.summary && ( {item.summary && (
<p className="search-result-card__summary"> <p className="search-result-card__summary">
{item.summary.length > 200 {item.summary.length > 200