From 9c0247c83023c19da5e69afcf55ffbd0e75ca72d Mon Sep 17 00:00:00 2001 From: jlightner Date: Wed, 1 Apr 2026 06:15:20 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Refactored=20keyword=5Fsearch=20to=20mu?= =?UTF-8?q?lti-token=20AND=20with=20cross-field=20mat=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/search_service.py" - "backend/schemas.py" - "backend/routers/search.py" - "backend/tests/test_search.py" GSD-Task: S01/T01 --- .artifacts/feature-synthesis-chunking.md | 111 +++++++++++++ backend/routers/search.py | 1 + backend/schemas.py | 1 + backend/search_service.py | 202 ++++++++++++++++++----- backend/tests/test_search.py | 138 +++++++++++++++- 5 files changed, 410 insertions(+), 43 deletions(-) create mode 100644 .artifacts/feature-synthesis-chunking.md diff --git a/.artifacts/feature-synthesis-chunking.md b/.artifacts/feature-synthesis-chunking.md new file mode 100644 index 0000000..c04b383 --- /dev/null +++ b/.artifacts/feature-synthesis-chunking.md @@ -0,0 +1,111 @@ +# Feature: Stage 5 Synthesis Chunking for Large Category Groups + +## Problem + +Stage 5 synthesis sends all key moments for a given `(video, topic_category)` group to the LLM in a single call. When a video produces a large number of moments in one category, the prompt exceeds what the model can process into a valid structured response. + +**Concrete failure:** COPYCATT's "Sound Design - Everything In 2 Hours Speedrun" (2,026 transcript segments) produced 198 moments classified as "Sound design" (175) / "Sound Design" (23 — casing inconsistency). The synthesis prompt for that category was ~42k tokens. The model (`fyn-llm-agent-think`, 128k context) accepted the prompt but returned only 5,407 completion tokens with `finish=stop` — valid JSON that was structurally incomplete, failing Pydantic `SynthesisResult` validation. The pipeline retried and failed identically each time. + +The other 37 videos in the corpus (up to 930 segments, ~60 moments per category max) all synthesized successfully. + +## Root Causes + +Two independent issues compound into this failure: + +### 1. No chunking in stage 5 synthesis + +`stage5_synthesis()` in `backend/pipeline/stages.py` iterates over `groups[category]` and builds one prompt containing ALL moments for that category. There's no upper bound on how many moments go into a single LLM call. + +**Location:** `stages.py` lines ~850-875 — the `for category, moment_group in groups.items()` loop builds the full `moments_text` without splitting. + +### 2. Inconsistent category casing from stage 4 + +Stage 4 classification produces `"Sound design"` and `"Sound Design"` as separate categories for the same video. Stage 5 groups by exact string match, so these stay separate — but even independently, 175 moments in one group is too many. The casing issue does inflate the problem by preventing natural splitting across categories. + +**Location:** Classification output stored in Redis at `chrysopedia:classification:{video_id}`. The `topic_category` values come directly from the LLM with no normalization. + +## Proposed Changes + +### Change 1: Chunked synthesis with merge pass + +Split large category groups into chunks before sending to the LLM. Each chunk produces technique pages independently, then a lightweight merge step combines pages with overlapping topics. + +**In `stage5_synthesis()` (`backend/pipeline/stages.py`):** + +1. After grouping moments by category, check each group's size against a configurable threshold (e.g., `SYNTHESIS_CHUNK_SIZE = 30` moments). + +2. Groups at or below the threshold: process as today — single LLM call. + +3. Groups above the threshold: split into chunks of `SYNTHESIS_CHUNK_SIZE` moments, ordered by `start_time` (preserving chronological context). Each chunk gets its own synthesis LLM call, producing its own `SynthesisResult` with 1+ pages. + +4. After all chunks for a category are processed, collect the resulting pages. Pages with the same or very similar slugs (e.g., Levenshtein distance < 3, or shared slug prefix before the creator suffix) should be merged. The merge is a second LLM call with a simpler prompt: "Here are N partial technique pages on the same topic from the same creator. Merge them into a single cohesive page, combining body sections, deduplicating signal chains and plugins, and writing a unified summary." This merge prompt is much smaller than the original 198-moment prompt because it takes synthesized prose as input, not raw moment data. + +5. If no pages share slugs across chunks, keep them all — they represent genuinely distinct sub-topics the LLM identified within the category. + +**New config setting in `backend/config.py`:** +```python +synthesis_chunk_size: int = 30 # Max moments per synthesis LLM call +``` + +**New prompt file:** `prompts/stage5_merge.txt` — instructions for combining partial technique pages into a unified page. Much simpler than the full synthesis prompt since it operates on already-synthesized prose rather than raw moments. + +**Token budget consideration:** 30 moments × ~200 tokens each (title + summary + metadata + transcript excerpt) = ~6k tokens of moment data + ~2k system prompt = ~8k input tokens. Well within what the model handles reliably. The merge call takes 2-4 partial pages of prose (~3-5k tokens total) — also very manageable. + +### Change 2: Category casing normalization in stage 4 + +Normalize `topic_category` values before storing classification results in Redis. + +**In `stage4_classification()` (`backend/pipeline/stages.py`):** + +After parsing the `ClassificationResult` from the LLM, apply title-case normalization to each moment's `topic_category`: + +```python +category = cls_result.topic_category.strip().title() +# "Sound design" -> "Sound Design" +# "sound design" -> "Sound Design" +# "SOUND DESIGN" -> "Sound Design" +``` + +This is a one-line fix. It prevents the "Sound design" / "Sound Design" split that inflated the group sizes and would reduce the COPYCATT video from 198 → 198 moments in a single normalized "Sound Design" group — still too many without chunking, but it eliminates the class of bug where moments scatter across near-duplicate categories. + +**Also apply in stage 5 as a safety net:** When building the `groups` dict, normalize the category key: +```python +category = cls_info.get("topic_category", "Uncategorized").strip().title() +``` + +This handles data already in Redis from prior stage 4 runs without requiring reprocessing. + +### Change 3: Estimated token pre-check before LLM call + +Before making the synthesis LLM call, estimate the total tokens (prompt + expected output) and log a warning if it exceeds a safety threshold. This doesn't block the call — chunking handles the splitting — but it provides observability for tuning `SYNTHESIS_CHUNK_SIZE`. + +**In the synthesis loop, after building `user_prompt`:** +```python +estimated_input = estimate_tokens(system_prompt) + estimate_tokens(user_prompt) +if estimated_input > 15000: + logger.warning( + "Stage 5: Large synthesis input for category '%s' video_id=%s: " + "~%d input tokens, %d moments. Consider reducing SYNTHESIS_CHUNK_SIZE.", + category, video_id, estimated_input, len(moment_group), + ) +``` + +## Files to Modify + +| File | Change | +|------|--------| +| `backend/pipeline/stages.py` | Chunk logic in `stage5_synthesis()`, casing normalization in `stage4_classification()` and `stage5_synthesis()` grouping | +| `backend/pipeline/llm_client.py` | No changes needed — `estimate_max_tokens()` already handles per-call estimation | +| `backend/config.py` | Add `synthesis_chunk_size: int = 30` setting | +| `prompts/stage5_merge.txt` | New prompt for merging partial technique pages | +| `backend/schemas.py` | No changes — `SynthesisResult` schema works for both chunk and merge calls | + +## Testing + +1. **Unit test:** Mock the LLM and verify that a 90-moment group gets split into 3 chunks of 30, each producing a `SynthesisResult`, followed by a merge call. +2. **Integration test:** Retrigger the COPYCATT "Sound Design - Everything In 2 Hours Speedrun" video and confirm it completes stage 5 without `LLMTruncationError`. +3. **Regression test:** Retrigger a small video (e.g., Skope "Understanding Waveshapers", 9 moments) and confirm behavior is unchanged — no chunking triggered, same output. + +## Rollback + +`SYNTHESIS_CHUNK_SIZE` can be set very high (e.g., 9999) to effectively disable chunking without a code change. The casing normalization is backward-compatible — it only affects new pipeline runs. diff --git a/backend/routers/search.py b/backend/routers/search.py index ef7cf53..5ff137b 100644 --- a/backend/routers/search.py +++ b/backend/routers/search.py @@ -47,6 +47,7 @@ async def search( result = await svc.search(query=q, scope=scope, limit=limit, db=db) return SearchResponse( items=[SearchResultItem(**item) for item in result["items"]], + partial_matches=[SearchResultItem(**item) for item in result.get("partial_matches", [])], total=result["total"], query=result["query"], fallback_used=result["fallback_used"], diff --git a/backend/schemas.py b/backend/schemas.py index a6ff13d..b1ed909 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -219,6 +219,7 @@ class SearchResultItem(BaseModel): class SearchResponse(BaseModel): """Top-level search response with metadata.""" items: list[SearchResultItem] = Field(default_factory=list) + partial_matches: list[SearchResultItem] = Field(default_factory=list) total: int = 0 query: str = "" fallback_used: bool = False diff --git a/backend/search_service.py b/backend/search_service.py index 6d6495e..35fdf77 100644 --- a/backend/search_service.py +++ b/backend/search_service.py @@ -16,7 +16,7 @@ import openai from qdrant_client import AsyncQdrantClient from qdrant_client.http import exceptions as qdrant_exceptions from qdrant_client.models import FieldCondition, Filter, MatchValue -from sqlalchemy import or_, select +from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from config import Settings @@ -134,33 +134,99 @@ class SearchService: # ── Keyword fallback ───────────────────────────────────────────────── + # ── Token helpers ─────────────────────────────────────────────────── + + @staticmethod + def _tokenize(query: str) -> list[str]: + """Split query into non-empty tokens.""" + return [t for t in query.split() if t] + + @staticmethod + def _tp_token_condition(token: str): + """Build an OR condition for a single token across TechniquePage + Creator fields.""" + pat = f"%{token}%" + return or_( + TechniquePage.title.ilike(pat), + TechniquePage.summary.ilike(pat), + TechniquePage.topic_category.ilike(pat), + func.array_to_string(TechniquePage.topic_tags, " ").ilike(pat), + Creator.name.ilike(pat), + ) + + @staticmethod + def _km_token_condition(token: str): + """Build an OR condition for a single token across KeyMoment + Creator fields.""" + pat = f"%{token}%" + return or_( + KeyMoment.title.ilike(pat), + KeyMoment.summary.ilike(pat), + Creator.name.ilike(pat), + ) + + @staticmethod + def _cr_token_condition(token: str): + """Build an OR condition for a single token across Creator fields.""" + pat = f"%{token}%" + return or_( + Creator.name.ilike(pat), + func.array_to_string(Creator.genres, " ").ilike(pat), + ) + + # ── Keyword search (multi-token AND) ───────────────────────────────── + async def keyword_search( self, query: str, scope: str, limit: int, db: AsyncSession, - ) -> list[dict[str, Any]]: - """ILIKE keyword search across technique pages, key moments, and creators. + ) -> dict[str, list[dict[str, Any]]]: + """Multi-token AND keyword search across technique pages, key moments, and creators. - Searches title/name columns. Returns a unified list of result dicts. + Tokenizes the query by whitespace. Each token must match at least one + indexed field (title, summary, topic_category, topic_tags, creator name, + genres). All tokens must match for a row to be included. + + If AND matching returns zero results but individual tokens would match, + returns up to 5 partial_matches scored by the number of tokens matched. + + Returns ``{"items": [...], "partial_matches": [...]}``. """ + tokens = self._tokenize(query) + if not tokens: + return {"items": [], "partial_matches": []} + + items = await self._keyword_search_and(tokens, scope, limit, db) + + # Enrich with creator names + items = await self._enrich_keyword_creator_names(items, db) + + partial: list[dict[str, Any]] = [] + if not items and len(tokens) > 1: + partial = await self._keyword_partial_matches(tokens, scope, db) + partial = await self._enrich_keyword_creator_names(partial, db) + + return {"items": items, "partial_matches": partial} + + async def _keyword_search_and( + self, + tokens: list[str], + scope: str, + limit: int, + db: AsyncSession, + ) -> list[dict[str, Any]]: + """Run AND-logic keyword search — every token must match at least one field.""" results: list[dict[str, Any]] = [] - pattern = f"%{query}%" if scope in ("all", "topics"): - stmt = ( - select(TechniquePage) - .where( - or_( - TechniquePage.title.ilike(pattern), - TechniquePage.summary.ilike(pattern), - ) - ) + tp_stmt = ( + select(TechniquePage, Creator) + .join(Creator, TechniquePage.creator_id == Creator.id) + .where(and_(*(self._tp_token_condition(t) for t in tokens))) .limit(limit) ) - rows = await db.execute(stmt) - for tp in rows.scalars().all(): + tp_rows = await db.execute(tp_stmt) + for tp, cr in tp_rows.all(): results.append({ "type": "technique_page", "title": tp.title, @@ -170,6 +236,8 @@ class SearchService: "topic_category": tp.topic_category, "topic_tags": tp.topic_tags or [], "creator_id": str(tp.creator_id), + "creator_name": cr.name, + "creator_slug": cr.slug, "score": 0.0, }) @@ -179,7 +247,7 @@ class SearchService: .join(SourceVideo, KeyMoment.source_video_id == SourceVideo.id) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(TechniquePage, KeyMoment.technique_page_id == TechniquePage.id) - .where(KeyMoment.title.ilike(pattern)) + .where(and_(*(self._km_token_condition(t) for t in tokens))) .limit(limit) ) km_rows = await db.execute(km_stmt) @@ -201,7 +269,7 @@ class SearchService: if scope in ("all", "creators"): cr_stmt = ( select(Creator) - .where(Creator.name.ilike(pattern)) + .where(and_(*(self._cr_token_condition(t) for t in tokens))) .limit(limit) ) cr_rows = await db.execute(cr_stmt) @@ -218,29 +286,77 @@ class SearchService: "score": 0.0, }) - # Enrich keyword results with creator names - kw_creator_ids = {r["creator_id"] for r in results if r.get("creator_id")} - kw_creator_map: dict[str, dict[str, str]] = {} - if kw_creator_ids: - import uuid as _uuid_mod - valid = [] - for cid in kw_creator_ids: - try: - valid.append(_uuid_mod.UUID(cid)) - except (ValueError, AttributeError): - pass - if valid: - cr_stmt = select(Creator).where(Creator.id.in_(valid)) - cr_result = await db.execute(cr_stmt) - for c in cr_result.scalars().all(): - kw_creator_map[str(c.id)] = {"name": c.name, "slug": c.slug} - for r in results: - info = kw_creator_map.get(r.get("creator_id", ""), {"name": "", "slug": ""}) - r["creator_name"] = info["name"] - r["creator_slug"] = info["slug"] - return results[:limit] + async def _keyword_partial_matches( + self, + tokens: list[str], + scope: str, + db: AsyncSession, + ) -> list[dict[str, Any]]: + """When AND produces zero results, score rows by how many tokens match. + + Returns the top 5 results ordered by match count descending. + """ + seen: dict[tuple[str, str], dict[str, Any]] = {} + match_counts: dict[tuple[str, str], int] = {} + + for token in tokens: + single_results = await self._keyword_search_and([token], scope, 20, db) + for r in single_results: + key = (r["type"], r.get("slug") or r.get("title", "")) + if key not in seen: + seen[key] = r + match_counts[key] = 0 + match_counts[key] += 1 + + ranked = sorted(match_counts.keys(), key=lambda k: match_counts[k], reverse=True) + partial: list[dict[str, Any]] = [] + for key in ranked[:5]: + item = seen[key] + item["score"] = match_counts[key] / len(tokens) + partial.append(item) + + return partial + + async def _enrich_keyword_creator_names( + self, + results: list[dict[str, Any]], + db: AsyncSession, + ) -> list[dict[str, Any]]: + """Fill in creator_name/creator_slug for results that don't have them yet.""" + needs_enrichment = [ + r for r in results + if r.get("creator_id") and not r.get("creator_name") + ] + if not needs_enrichment: + return results + + import uuid as _uuid_mod + + cids: set[str] = {r["creator_id"] for r in needs_enrichment} + valid = [] + for cid in cids: + try: + valid.append(_uuid_mod.UUID(cid)) + except (ValueError, AttributeError): + pass + + creator_map: dict[str, dict[str, str]] = {} + if valid: + cr_stmt = select(Creator).where(Creator.id.in_(valid)) + cr_result = await db.execute(cr_stmt) + for c in cr_result.scalars().all(): + creator_map[str(c.id)] = {"name": c.name, "slug": c.slug} + + for r in results: + if not r.get("creator_name"): + info = creator_map.get(r.get("creator_id", ""), {"name": "", "slug": ""}) + r["creator_name"] = info["name"] + r["creator_slug"] = info["slug"] + + return results + # ── Orchestrator ───────────────────────────────────────────────────── async def search( @@ -288,22 +404,28 @@ class SearchService: # Fallback to keyword search if semantic failed or returned nothing if not items: - items = await self.keyword_search(query, scope, limit, db) + kw_result = await self.keyword_search(query, scope, limit, db) + items = kw_result["items"] + partial_matches = kw_result.get("partial_matches", []) fallback_used = True + else: + partial_matches = [] elapsed_ms = (time.monotonic() - start) * 1000 logger.info( - "Search query=%r scope=%s results=%d fallback=%s latency_ms=%.1f", + "Search query=%r scope=%s results=%d partial=%d fallback=%s latency_ms=%.1f", query, scope, len(items), + len(partial_matches), fallback_used, elapsed_ms, ) return { "items": items, + "partial_matches": partial_matches, "total": len(items), "query": query, "fallback_used": fallback_used, diff --git a/backend/tests/test_search.py b/backend/tests/test_search.py index 366c80e..9fc0581 100644 --- a/backend/tests/test_search.py +++ b/backend/tests/test_search.py @@ -356,7 +356,8 @@ async def test_keyword_search_technique_page_has_technique_page_slug(db_engine): async with session_factory() as session: from config import Settings svc = SearchService(settings=Settings()) - results = await svc.keyword_search("Reese Bass", "topics", 10, session) + kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session) + results = kw_result["items"] assert len(results) >= 1 tp_result = next(r for r in results if r["type"] == "technique_page") @@ -377,7 +378,8 @@ async def test_keyword_search_key_moment_has_parent_technique_page_slug(db_engin async with session_factory() as session: from config import Settings svc = SearchService(settings=Settings()) - results = await svc.keyword_search("Reese", "all", 20, session) + kw_result = await svc.keyword_search("Reese", "all", 20, session) + results = kw_result["items"] km_results = [r for r in results if r["type"] == "key_moment"] assert len(km_results) >= 1 @@ -398,13 +400,143 @@ async def test_keyword_search_key_moment_without_technique_page(db_engine): async with session_factory() as session: from config import Settings svc = SearchService(settings=Settings()) - results = await svc.keyword_search("Outro", "all", 20, session) + kw_result = await svc.keyword_search("Outro", "all", 20, session) + results = kw_result["items"] km_results = [r for r in results if r["type"] == "key_moment"] assert len(km_results) == 1 assert km_results[0]["technique_page_slug"] == "" +# ── Multi-token AND keyword search tests ───────────────────────────────────── + + +@pytest.mark.asyncio +async def test_keyword_search_multi_token_and_logic(db_engine): + """Multi-token query requires all tokens to match across fields.""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + # "Reese Bass" — both tokens appear in tp1 title "Reese Bass Design" + kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session) + items = kw_result["items"] + assert len(items) >= 1 + assert all("reese" in r["title"].lower() or "bass" in r["title"].lower() + for r in items if r["type"] == "technique_page") + + # "Granular bass" — 'granular' is in tp2, 'bass' is NOT in tp2 title/summary + # but tp2 summary says "granular synthesis" not "bass" — no AND match expected + kw_result2 = await svc.keyword_search("Granular bass", "topics", 10, session) + items2 = kw_result2["items"] + # Should NOT contain tp2 since "bass" doesn't appear in tp2's fields + tp2_results = [r for r in items2 if r["slug"] == "granular-pad-textures"] + assert len(tp2_results) == 0 + + +@pytest.mark.asyncio +async def test_keyword_search_cross_field_token_matching(db_engine): + """Tokens can match across different fields (e.g., one in title, one in creator name).""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + # "Bill Reese" — "Bill" matches Creator.name "Mr. Bill", "Reese" matches title + kw_result = await svc.keyword_search("Bill Reese", "topics", 10, session) + items = kw_result["items"] + assert len(items) >= 1 + # tp1 "Reese Bass Design" by "Mr. Bill" should match + slugs = [r["slug"] for r in items] + assert "reese-bass-design" in slugs + + +@pytest.mark.asyncio +async def test_keyword_search_partial_matches_on_zero_and(db_engine): + """When AND yields no results, partial_matches returns rows scored by token coverage.""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + # "xyznonexistent Reese" — no row matches both, but "Reese" matches several + kw_result = await svc.keyword_search("xyznonexistent Reese", "all", 20, session) + assert kw_result["items"] == [] + assert len(kw_result["partial_matches"]) >= 1 + # Partial matches should have scores between 0 and 1 + for pm in kw_result["partial_matches"]: + assert 0 < pm["score"] <= 1.0 + + +@pytest.mark.asyncio +async def test_keyword_search_single_token_no_partial(db_engine): + """Single-token search that fails returns no partial_matches (only multi-token triggers partial).""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + kw_result = await svc.keyword_search("xyznonexistent", "all", 20, session) + assert kw_result["items"] == [] + assert kw_result["partial_matches"] == [] + + +@pytest.mark.asyncio +async def test_keyword_search_topic_tags_matching(db_engine): + """Tokens that appear in topic_tags array are matched via array_to_string.""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + # "textures" is a topic_tag on tp1, "Bill" is the creator + kw_result = await svc.keyword_search("textures Bill", "topics", 10, session) + items = kw_result["items"] + assert len(items) >= 1 + slugs = [r["slug"] for r in items] + assert "reese-bass-design" in slugs + + +@pytest.mark.asyncio +async def test_keyword_search_creator_genres_matching(db_engine): + """Creator search matches against genres array via array_to_string.""" + seed = await _seed_search_data(db_engine) + + session_factory = async_sessionmaker( + db_engine, class_=AsyncSession, expire_on_commit=False + ) + async with session_factory() as session: + from config import Settings + svc = SearchService(settings=Settings()) + + # "Glitch" is a genre on creator1 "Mr. Bill" + kw_result = await svc.keyword_search("Bill Glitch", "creators", 10, session) + items = kw_result["items"] + assert len(items) >= 1 + assert any(r["title"] == "Mr. Bill" for r in items) + + # ── Suggestions endpoint tests ─────────────────────────────────────────────── SUGGESTIONS_URL = "/api/v1/search/suggestions"