feat: Refactored keyword_search to multi-token AND with cross-field mat…

- "backend/search_service.py"
- "backend/schemas.py"
- "backend/routers/search.py"
- "backend/tests/test_search.py"

GSD-Task: S01/T01
This commit is contained in:
jlightner 2026-04-01 06:15:20 +00:00
parent 0d538238a6
commit 9c0247c830
5 changed files with 410 additions and 43 deletions

View file

@ -0,0 +1,111 @@
# Feature: Stage 5 Synthesis Chunking for Large Category Groups
## Problem
Stage 5 synthesis sends all key moments for a given `(video, topic_category)` group to the LLM in a single call. When a video produces a large number of moments in one category, the prompt exceeds what the model can process into a valid structured response.
**Concrete failure:** COPYCATT's "Sound Design - Everything In 2 Hours Speedrun" (2,026 transcript segments) produced 198 moments classified as "Sound design" (175) / "Sound Design" (23 — casing inconsistency). The synthesis prompt for that category was ~42k tokens. The model (`fyn-llm-agent-think`, 128k context) accepted the prompt but returned only 5,407 completion tokens with `finish=stop` — valid JSON that was structurally incomplete, failing Pydantic `SynthesisResult` validation. The pipeline retried and failed identically each time.
The other 37 videos in the corpus (up to 930 segments, ~60 moments per category max) all synthesized successfully.
## Root Causes
Two independent issues compound into this failure:
### 1. No chunking in stage 5 synthesis
`stage5_synthesis()` in `backend/pipeline/stages.py` iterates over `groups[category]` and builds one prompt containing ALL moments for that category. There's no upper bound on how many moments go into a single LLM call.
**Location:** `stages.py` lines ~850-875 — the `for category, moment_group in groups.items()` loop builds the full `moments_text` without splitting.
### 2. Inconsistent category casing from stage 4
Stage 4 classification produces `"Sound design"` and `"Sound Design"` as separate categories for the same video. Stage 5 groups by exact string match, so these stay separate — but even independently, 175 moments in one group is too many. The casing issue does inflate the problem by preventing natural splitting across categories.
**Location:** Classification output stored in Redis at `chrysopedia:classification:{video_id}`. The `topic_category` values come directly from the LLM with no normalization.
## Proposed Changes
### Change 1: Chunked synthesis with merge pass
Split large category groups into chunks before sending to the LLM. Each chunk produces technique pages independently, then a lightweight merge step combines pages with overlapping topics.
**In `stage5_synthesis()` (`backend/pipeline/stages.py`):**
1. After grouping moments by category, check each group's size against a configurable threshold (e.g., `SYNTHESIS_CHUNK_SIZE = 30` moments).
2. Groups at or below the threshold: process as today — single LLM call.
3. Groups above the threshold: split into chunks of `SYNTHESIS_CHUNK_SIZE` moments, ordered by `start_time` (preserving chronological context). Each chunk gets its own synthesis LLM call, producing its own `SynthesisResult` with 1+ pages.
4. After all chunks for a category are processed, collect the resulting pages. Pages with the same or very similar slugs (e.g., Levenshtein distance < 3, or shared slug prefix before the creator suffix) should be merged. The merge is a second LLM call with a simpler prompt: "Here are N partial technique pages on the same topic from the same creator. Merge them into a single cohesive page, combining body sections, deduplicating signal chains and plugins, and writing a unified summary." This merge prompt is much smaller than the original 198-moment prompt because it takes synthesized prose as input, not raw moment data.
5. If no pages share slugs across chunks, keep them all — they represent genuinely distinct sub-topics the LLM identified within the category.
**New config setting in `backend/config.py`:**
```python
synthesis_chunk_size: int = 30 # Max moments per synthesis LLM call
```
**New prompt file:** `prompts/stage5_merge.txt` — instructions for combining partial technique pages into a unified page. Much simpler than the full synthesis prompt since it operates on already-synthesized prose rather than raw moments.
**Token budget consideration:** 30 moments × ~200 tokens each (title + summary + metadata + transcript excerpt) = ~6k tokens of moment data + ~2k system prompt = ~8k input tokens. Well within what the model handles reliably. The merge call takes 2-4 partial pages of prose (~3-5k tokens total) — also very manageable.
### Change 2: Category casing normalization in stage 4
Normalize `topic_category` values before storing classification results in Redis.
**In `stage4_classification()` (`backend/pipeline/stages.py`):**
After parsing the `ClassificationResult` from the LLM, apply title-case normalization to each moment's `topic_category`:
```python
category = cls_result.topic_category.strip().title()
# "Sound design" -> "Sound Design"
# "sound design" -> "Sound Design"
# "SOUND DESIGN" -> "Sound Design"
```
This is a one-line fix. It prevents the "Sound design" / "Sound Design" split that inflated the group sizes and would reduce the COPYCATT video from 198 → 198 moments in a single normalized "Sound Design" group — still too many without chunking, but it eliminates the class of bug where moments scatter across near-duplicate categories.
**Also apply in stage 5 as a safety net:** When building the `groups` dict, normalize the category key:
```python
category = cls_info.get("topic_category", "Uncategorized").strip().title()
```
This handles data already in Redis from prior stage 4 runs without requiring reprocessing.
### Change 3: Estimated token pre-check before LLM call
Before making the synthesis LLM call, estimate the total tokens (prompt + expected output) and log a warning if it exceeds a safety threshold. This doesn't block the call — chunking handles the splitting — but it provides observability for tuning `SYNTHESIS_CHUNK_SIZE`.
**In the synthesis loop, after building `user_prompt`:**
```python
estimated_input = estimate_tokens(system_prompt) + estimate_tokens(user_prompt)
if estimated_input > 15000:
logger.warning(
"Stage 5: Large synthesis input for category '%s' video_id=%s: "
"~%d input tokens, %d moments. Consider reducing SYNTHESIS_CHUNK_SIZE.",
category, video_id, estimated_input, len(moment_group),
)
```
## Files to Modify
| File | Change |
|------|--------|
| `backend/pipeline/stages.py` | Chunk logic in `stage5_synthesis()`, casing normalization in `stage4_classification()` and `stage5_synthesis()` grouping |
| `backend/pipeline/llm_client.py` | No changes needed — `estimate_max_tokens()` already handles per-call estimation |
| `backend/config.py` | Add `synthesis_chunk_size: int = 30` setting |
| `prompts/stage5_merge.txt` | New prompt for merging partial technique pages |
| `backend/schemas.py` | No changes — `SynthesisResult` schema works for both chunk and merge calls |
## Testing
1. **Unit test:** Mock the LLM and verify that a 90-moment group gets split into 3 chunks of 30, each producing a `SynthesisResult`, followed by a merge call.
2. **Integration test:** Retrigger the COPYCATT "Sound Design - Everything In 2 Hours Speedrun" video and confirm it completes stage 5 without `LLMTruncationError`.
3. **Regression test:** Retrigger a small video (e.g., Skope "Understanding Waveshapers", 9 moments) and confirm behavior is unchanged — no chunking triggered, same output.
## Rollback
`SYNTHESIS_CHUNK_SIZE` can be set very high (e.g., 9999) to effectively disable chunking without a code change. The casing normalization is backward-compatible — it only affects new pipeline runs.

View file

@ -47,6 +47,7 @@ async def search(
result = await svc.search(query=q, scope=scope, limit=limit, db=db)
return SearchResponse(
items=[SearchResultItem(**item) for item in result["items"]],
partial_matches=[SearchResultItem(**item) for item in result.get("partial_matches", [])],
total=result["total"],
query=result["query"],
fallback_used=result["fallback_used"],

View file

@ -219,6 +219,7 @@ class SearchResultItem(BaseModel):
class SearchResponse(BaseModel):
"""Top-level search response with metadata."""
items: list[SearchResultItem] = Field(default_factory=list)
partial_matches: list[SearchResultItem] = Field(default_factory=list)
total: int = 0
query: str = ""
fallback_used: bool = False

View file

@ -16,7 +16,7 @@ import openai
from qdrant_client import AsyncQdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
from qdrant_client.models import FieldCondition, Filter, MatchValue
from sqlalchemy import or_, select
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import Settings
@ -134,33 +134,99 @@ class SearchService:
# ── Keyword fallback ─────────────────────────────────────────────────
# ── Token helpers ───────────────────────────────────────────────────
@staticmethod
def _tokenize(query: str) -> list[str]:
"""Split query into non-empty tokens."""
return [t for t in query.split() if t]
@staticmethod
def _tp_token_condition(token: str):
"""Build an OR condition for a single token across TechniquePage + Creator fields."""
pat = f"%{token}%"
return or_(
TechniquePage.title.ilike(pat),
TechniquePage.summary.ilike(pat),
TechniquePage.topic_category.ilike(pat),
func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat),
Creator.name.ilike(pat),
)
@staticmethod
def _km_token_condition(token: str):
"""Build an OR condition for a single token across KeyMoment + Creator fields."""
pat = f"%{token}%"
return or_(
KeyMoment.title.ilike(pat),
KeyMoment.summary.ilike(pat),
Creator.name.ilike(pat),
)
@staticmethod
def _cr_token_condition(token: str):
"""Build an OR condition for a single token across Creator fields."""
pat = f"%{token}%"
return or_(
Creator.name.ilike(pat),
func.array_to_string(Creator.genres, " ").ilike(pat),
)
# ── Keyword search (multi-token AND) ─────────────────────────────────
async def keyword_search(
self,
query: str,
scope: str,
limit: int,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""ILIKE keyword search across technique pages, key moments, and creators.
) -> dict[str, list[dict[str, Any]]]:
"""Multi-token AND keyword search across technique pages, key moments, and creators.
Searches title/name columns. Returns a unified list of result dicts.
Tokenizes the query by whitespace. Each token must match at least one
indexed field (title, summary, topic_category, topic_tags, creator name,
genres). All tokens must match for a row to be included.
If AND matching returns zero results but individual tokens would match,
returns up to 5 partial_matches scored by the number of tokens matched.
Returns ``{"items": [...], "partial_matches": [...]}``.
"""
tokens = self._tokenize(query)
if not tokens:
return {"items": [], "partial_matches": []}
items = await self._keyword_search_and(tokens, scope, limit, db)
# Enrich with creator names
items = await self._enrich_keyword_creator_names(items, db)
partial: list[dict[str, Any]] = []
if not items and len(tokens) > 1:
partial = await self._keyword_partial_matches(tokens, scope, db)
partial = await self._enrich_keyword_creator_names(partial, db)
return {"items": items, "partial_matches": partial}
async def _keyword_search_and(
self,
tokens: list[str],
scope: str,
limit: int,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Run AND-logic keyword search — every token must match at least one field."""
results: list[dict[str, Any]] = []
pattern = f"%{query}%"
if scope in ("all", "topics"):
stmt = (
select(TechniquePage)
.where(
or_(
TechniquePage.title.ilike(pattern),
TechniquePage.summary.ilike(pattern),
)
)
tp_stmt = (
select(TechniquePage, Creator)
.join(Creator, TechniquePage.creator_id == Creator.id)
.where(and_(*(self._tp_token_condition(t) for t in tokens)))
.limit(limit)
)
rows = await db.execute(stmt)
for tp in rows.scalars().all():
tp_rows = await db.execute(tp_stmt)
for tp, cr in tp_rows.all():
results.append({
"type": "technique_page",
"title": tp.title,
@ -170,6 +236,8 @@ class SearchService:
"topic_category": tp.topic_category,
"topic_tags": tp.topic_tags or [],
"creator_id": str(tp.creator_id),
"creator_name": cr.name,
"creator_slug": cr.slug,
"score": 0.0,
})
@ -179,7 +247,7 @@ class SearchService:
.join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id)
.join(Creator, SourceVideo.creator_id == Creator.id)
.outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id)
.where(KeyMoment.title.ilike(pattern))
.where(and_(*(self._km_token_condition(t) for t in tokens)))
.limit(limit)
)
km_rows = await db.execute(km_stmt)
@ -201,7 +269,7 @@ class SearchService:
if scope in ("all", "creators"):
cr_stmt = (
select(Creator)
.where(Creator.name.ilike(pattern))
.where(and_(*(self._cr_token_condition(t) for t in tokens)))
.limit(limit)
)
cr_rows = await db.execute(cr_stmt)
@ -218,28 +286,76 @@ class SearchService:
"score": 0.0,
})
# Enrich keyword results with creator names
kw_creator_ids = {r["creator_id"] for r in results if r.get("creator_id")}
kw_creator_map: dict[str, dict[str, str]] = {}
if kw_creator_ids:
return results[:limit]
async def _keyword_partial_matches(
self,
tokens: list[str],
scope: str,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""When AND produces zero results, score rows by how many tokens match.
Returns the top 5 results ordered by match count descending.
"""
seen: dict[tuple[str, str], dict[str, Any]] = {}
match_counts: dict[tuple[str, str], int] = {}
for token in tokens:
single_results = await self._keyword_search_and([token], scope, 20, db)
for r in single_results:
key = (r["type"], r.get("slug") or r.get("title", ""))
if key not in seen:
seen[key] = r
match_counts[key] = 0
match_counts[key] += 1
ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True)
partial: list[dict[str, Any]] = []
for key in ranked[:5]:
item = seen[key]
item["score"] = match_counts[key] / len(tokens)
partial.append(item)
return partial
async def _enrich_keyword_creator_names(
self,
results: list[dict[str, Any]],
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Fill in creator_name/creator_slug for results that don't have them yet."""
needs_enrichment = [
r for r in results
if r.get("creator_id") and not r.get("creator_name")
]
if not needs_enrichment:
return results
import uuid as _uuid_mod
cids: set[str] = {r["creator_id"] for r in needs_enrichment}
valid = []
for cid in kw_creator_ids:
for cid in cids:
try:
valid.append(_uuid_mod.UUID(cid))
except (ValueError, AttributeError):
pass
creator_map: dict[str, dict[str, str]] = {}
if valid:
cr_stmt = select(Creator).where(Creator.id.in_(valid))
cr_result = await db.execute(cr_stmt)
for c in cr_result.scalars().all():
kw_creator_map[str(c.id)] = {"name": c.name, "slug": c.slug}
creator_map[str(c.id)] = {"name": c.name, "slug": c.slug}
for r in results:
info = kw_creator_map.get(r.get("creator_id", ""), {"name": "", "slug": ""})
if not r.get("creator_name"):
info = creator_map.get(r.get("creator_id", ""), {"name": "", "slug": ""})
r["creator_name"] = info["name"]
r["creator_slug"] = info["slug"]
return results[:limit]
return results
# ── Orchestrator ─────────────────────────────────────────────────────
@ -288,22 +404,28 @@ class SearchService:
# Fallback to keyword search if semantic failed or returned nothing
if not items:
items = await self.keyword_search(query, scope, limit, db)
kw_result = await self.keyword_search(query, scope, limit, db)
items = kw_result["items"]
partial_matches = kw_result.get("partial_matches", [])
fallback_used = True
else:
partial_matches = []
elapsed_ms = (time.monotonic() - start) * 1000
logger.info(
"Search query=%r scope=%s results=%d fallback=%s latency_ms=%.1f",
"Search query=%r scope=%s results=%d partial=%d fallback=%s latency_ms=%.1f",
query,
scope,
len(items),
len(partial_matches),
fallback_used,
elapsed_ms,
)
return {
"items": items,
"partial_matches": partial_matches,
"total": len(items),
"query": query,
"fallback_used": fallback_used,

View file

@ -356,7 +356,8 @@ async def test_keyword_search_technique_page_has_technique_page_slug(db_engine):
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
results = await svc.keyword_search("Reese Bass", "topics", 10, session)
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
results = kw_result["items"]
assert len(results) >= 1
tp_result = next(r for r in results if r["type"] == "technique_page")
@ -377,7 +378,8 @@ async def test_keyword_search_key_moment_has_parent_technique_page_slug(db_engin
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
results = await svc.keyword_search("Reese", "all", 20, session)
kw_result = await svc.keyword_search("Reese", "all", 20, session)
results = kw_result["items"]
km_results = [r for r in results if r["type"] == "key_moment"]
assert len(km_results) >= 1
@ -398,13 +400,143 @@ async def test_keyword_search_key_moment_without_technique_page(db_engine):
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
results = await svc.keyword_search("Outro", "all", 20, session)
kw_result = await svc.keyword_search("Outro", "all", 20, session)
results = kw_result["items"]
km_results = [r for r in results if r["type"] == "key_moment"]
assert len(km_results) == 1
assert km_results[0]["technique_page_slug"] == ""
# ── Multi-token AND keyword search tests ─────────────────────────────────────
@pytest.mark.asyncio
async def test_keyword_search_multi_token_and_logic(db_engine):
"""Multi-token query requires all tokens to match across fields."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Reese Bass" — both tokens appear in tp1 title "Reese Bass Design"
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
assert all("reese" in r["title"].lower() or "bass" in r["title"].lower()
for r in items if r["type"] == "technique_page")
# "Granular bass" — 'granular' is in tp2, 'bass' is NOT in tp2 title/summary
# but tp2 summary says "granular synthesis" not "bass" — no AND match expected
kw_result2 = await svc.keyword_search("Granular bass", "topics", 10, session)
items2 = kw_result2["items"]
# Should NOT contain tp2 since "bass" doesn't appear in tp2's fields
tp2_results = [r for r in items2 if r["slug"] == "granular-pad-textures"]
assert len(tp2_results) == 0
@pytest.mark.asyncio
async def test_keyword_search_cross_field_token_matching(db_engine):
"""Tokens can match across different fields (e.g., one in title, one in creator name)."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Bill Reese" — "Bill" matches Creator.name "Mr. Bill", "Reese" matches title
kw_result = await svc.keyword_search("Bill Reese", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# tp1 "Reese Bass Design" by "Mr. Bill" should match
slugs = [r["slug"] for r in items]
assert "reese-bass-design" in slugs
@pytest.mark.asyncio
async def test_keyword_search_partial_matches_on_zero_and(db_engine):
"""When AND yields no results, partial_matches returns rows scored by token coverage."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "xyznonexistent Reese" — no row matches both, but "Reese" matches several
kw_result = await svc.keyword_search("xyznonexistent Reese", "all", 20, session)
assert kw_result["items"] == []
assert len(kw_result["partial_matches"]) >= 1
# Partial matches should have scores between 0 and 1
for pm in kw_result["partial_matches"]:
assert 0 < pm["score"] <= 1.0
@pytest.mark.asyncio
async def test_keyword_search_single_token_no_partial(db_engine):
"""Single-token search that fails returns no partial_matches (only multi-token triggers partial)."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("xyznonexistent", "all", 20, session)
assert kw_result["items"] == []
assert kw_result["partial_matches"] == []
@pytest.mark.asyncio
async def test_keyword_search_topic_tags_matching(db_engine):
"""Tokens that appear in topic_tags array are matched via array_to_string."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "textures" is a topic_tag on tp1, "Bill" is the creator
kw_result = await svc.keyword_search("textures Bill", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
slugs = [r["slug"] for r in items]
assert "reese-bass-design" in slugs
@pytest.mark.asyncio
async def test_keyword_search_creator_genres_matching(db_engine):
"""Creator search matches against genres array via array_to_string."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Glitch" is a genre on creator1 "Mr. Bill"
kw_result = await svc.keyword_search("Bill Glitch", "creators", 10, session)
items = kw_result["items"]
assert len(items) >= 1
assert any(r["title"] == "Mr. Bill" for r in items)
# ── Suggestions endpoint tests ───────────────────────────────────────────────
SUGGESTIONS_URL = "/api/v1/search/suggestions"