diff --git a/backend/pipeline/qdrant_client.py b/backend/pipeline/qdrant_client.py index 9112ca0..84d3ebb 100644 --- a/backend/pipeline/qdrant_client.py +++ b/backend/pipeline/qdrant_client.py @@ -233,3 +233,87 @@ class QdrantManager: points.append(point) self.upsert_points(points) + + # ── Technique section operations ───────────────────────────────────── + + def delete_sections_by_page_id(self, page_id: str) -> None: + """Delete all technique_section points for a given page_id. + + Called before re-upserting sections to prevent orphan points when + headings are renamed or sections removed. Non-blocking — logs warning + on failure. + """ + from qdrant_client.models import FieldCondition, Filter, MatchValue + + try: + self._client.delete( + collection_name=self._collection, + points_selector=Filter( + must=[ + FieldCondition( + key="page_id", + match=MatchValue(value=page_id), + ), + FieldCondition( + key="type", + match=MatchValue(value="technique_section"), + ), + ], + ), + ) + logger.info( + "Deleted technique_section points for page_id=%s from '%s'.", + page_id, self._collection, + ) + except Exception as exc: + logger.warning( + "Qdrant delete sections for page_id=%s failed (%s: %s). Skipping.", + page_id, type(exc).__name__, exc, + ) + + def upsert_technique_sections( + self, + sections: list[dict], + vectors: list[list[float]], + ) -> None: + """Build and upsert PointStructs for technique page sections. + + Each section dict must contain: + page_id, section_anchor, section_heading, creator_id, creator_name, + title (page title), slug (page slug), topic_category, topic_tags, summary + + Uses deterministic UUIDs: ``uuid5(namespace, 'ts:{page_id}:{section_anchor}')``. + """ + if len(sections) != len(vectors): + logger.warning( + "Technique-section count (%d) != vector count (%d). Skipping upsert.", + len(sections), len(vectors), + ) + return + + points = [] + for sec, vector in zip(sections, vectors): + point_id = str(uuid.uuid5( + _QDRANT_NAMESPACE, + f"ts:{sec['page_id']}:{sec['section_anchor']}", + )) + point = PointStruct( + id=point_id, + vector=vector, + payload={ + "type": "technique_section", + "page_id": sec["page_id"], + "creator_id": sec.get("creator_id", ""), + "creator_name": sec.get("creator_name", ""), + "title": sec.get("title", ""), + "slug": sec.get("slug", ""), + "section_heading": sec["section_heading"], + "section_anchor": sec["section_anchor"], + "topic_category": sec.get("topic_category", ""), + "topic_tags": sec.get("topic_tags") or [], + "summary": (sec.get("summary") or "")[:200], + }, + ) + points.append(point) + + self.upsert_points(points) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index eee7b00..4787658 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -12,6 +12,7 @@ from __future__ import annotations import hashlib import json import logging +import re import subprocess import time from collections import defaultdict @@ -1589,6 +1590,17 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: session.close() +# ── Heading slug helper (matches frontend TableOfContents.tsx slugify) ──────── + +def _slugify_heading(text: str) -> str: + """Convert a heading string to a URL-friendly anchor slug. + + Must produce identical output to the frontend's slugify in + ``frontend/src/components/TableOfContents.tsx``. + """ + return re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") + + # ── Stage 6: Embed & Index ─────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=0) @@ -1741,6 +1753,95 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st len(moment_texts), video_id, ) + # ── Embed & upsert technique page sections (v2 only) ──────────── + section_count = 0 + v2_pages = [p for p in pages if getattr(p, "body_sections_format", "v1") == "v2"] + for p in v2_pages: + body_sections = p.body_sections + if not isinstance(body_sections, list): + continue + + creator_name = creator_map.get(str(p.creator_id), "") + page_id_str = str(p.id) + + # Delete stale section points before re-upserting + try: + qdrant.delete_sections_by_page_id(page_id_str) + except Exception as exc: + logger.warning( + "Stage 6: Failed to delete stale sections for page_id=%s: %s", + page_id_str, exc, + ) + + section_texts: list[str] = [] + section_dicts: list[dict] = [] + + for section in body_sections: + if not isinstance(section, dict): + logger.warning( + "Stage 6: Malformed section (not a dict) in page_id=%s. Skipping.", + page_id_str, + ) + continue + heading = section.get("heading", "") + if not heading or not heading.strip(): + continue + + section_anchor = _slugify_heading(heading) + section_content = section.get("content", "") + # Include subsection content for richer embedding + subsection_parts: list[str] = [] + for sub in section.get("subsections", []): + if isinstance(sub, dict): + sub_heading = sub.get("heading", "") + sub_content = sub.get("content", "") + if sub_heading: + subsection_parts.append(f"{sub_heading}: {sub_content}") + elif sub_content: + subsection_parts.append(sub_content) + + embed_text = ( + f"{creator_name} {p.title} — {heading}: " + f"{section_content} {' '.join(subsection_parts)}" + ).strip() + section_texts.append(embed_text) + + section_dicts.append({ + "page_id": page_id_str, + "creator_id": str(p.creator_id), + "creator_name": creator_name, + "title": p.title, + "slug": p.slug, + "section_heading": heading, + "section_anchor": section_anchor, + "topic_category": p.topic_category or "", + "topic_tags": p.topic_tags or [], + "summary": (section_content or "")[:200], + }) + + if section_texts: + try: + section_vectors = embed_client.embed(section_texts) + if section_vectors: + qdrant.upsert_technique_sections(section_dicts, section_vectors) + section_count += len(section_vectors) + else: + logger.warning( + "Stage 6: Embedding returned empty for %d sections of page_id=%s. Skipping.", + len(section_texts), page_id_str, + ) + except Exception as exc: + logger.warning( + "Stage 6: Section embedding failed for page_id=%s: %s. Skipping.", + page_id_str, exc, + ) + + if section_count: + logger.info( + "Stage 6: Upserted %d technique section vectors for video_id=%s", + section_count, video_id, + ) + elapsed = time.monotonic() - start logger.info( "Stage 6 (embed & index) completed for video_id=%s in %.1fs — " diff --git a/backend/pipeline/test_section_embedding.py b/backend/pipeline/test_section_embedding.py new file mode 100644 index 0000000..eeaf7d3 --- /dev/null +++ b/backend/pipeline/test_section_embedding.py @@ -0,0 +1,328 @@ +"""Unit tests for per-section embedding in stage 6. + +Tests _slugify_heading, section embed text construction, delete-before-upsert +ordering, v1 page skipping, upsert payload correctness, and deterministic UUIDs. +""" + +from __future__ import annotations + +import uuid +from unittest.mock import MagicMock, call, patch + +import pytest + +# ── slugify tests ──────────────────────────────────────────────────────────── + +from pipeline.stages import _slugify_heading + + +class TestSlugifyHeading: + """Verify _slugify_heading matches frontend TableOfContents.tsx slugify.""" + + def test_simple_heading(self): + assert _slugify_heading("Grain Position Control") == "grain-position-control" + + def test_ampersand_and_special_chars(self): + # Consecutive non-alphanumeric chars collapse to a single hyphen + assert _slugify_heading("LFO Routing & Modulation") == "lfo-routing-modulation" + + def test_leading_trailing_special(self): + assert _slugify_heading(" —Hello World! ") == "hello-world" + + def test_numbers_preserved(self): + assert _slugify_heading("Step 1: Setup") == "step-1-setup" + + def test_empty_string(self): + assert _slugify_heading("") == "" + + def test_only_special_chars(self): + assert _slugify_heading("!@#$%") == "" + + def test_unicode_stripped(self): + assert _slugify_heading("Café Sounds") == "caf-sounds" + + def test_multiple_hyphens_collapse(self): + assert _slugify_heading("A -- B --- C") == "a-b-c" + + +# ── Deterministic UUID tests ───────────────────────────────────────────────── + +_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890") + + +class TestDeterministicUUIDs: + """Verify same page+section always produces the same point ID.""" + + def test_same_input_same_uuid(self): + id1 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:grain-position-control")) + id2 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:grain-position-control")) + assert id1 == id2 + + def test_different_section_different_uuid(self): + id1 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:section-a")) + id2 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:section-b")) + assert id1 != id2 + + +# ── QdrantManager section methods ──────────────────────────────────────────── + + +class TestQdrantManagerSections: + """Test upsert_technique_sections and delete_sections_by_page_id.""" + + def _make_manager(self): + """Create a QdrantManager with a mocked client.""" + with patch("pipeline.qdrant_client.QdrantClient") as MockClient: + mock_client = MockClient.return_value + from pipeline.qdrant_client import QdrantManager + settings = MagicMock() + settings.qdrant_url = "http://localhost:6333" + settings.qdrant_collection = "test_collection" + settings.embedding_dimensions = 768 + mgr = QdrantManager(settings) + mgr._client = mock_client + return mgr, mock_client + + def test_upsert_builds_correct_payloads(self): + mgr, mock_client = self._make_manager() + sections = [ + { + "page_id": "p1", + "creator_id": "c1", + "creator_name": "Keota", + "title": "Granular Synthesis", + "slug": "granular-synthesis", + "section_heading": "Grain Position Control", + "section_anchor": "grain-position-control", + "topic_category": "Sound Design", + "topic_tags": ["granular", "synthesis"], + "summary": "Control the grain position parameter.", + }, + ] + vectors = [[0.1] * 768] + + mgr.upsert_technique_sections(sections, vectors) + + # Verify upsert was called + assert mock_client.upsert.called + points = mock_client.upsert.call_args[1]["points"] + assert len(points) == 1 + + payload = points[0].payload + assert payload["type"] == "technique_section" + assert payload["page_id"] == "p1" + assert payload["section_heading"] == "Grain Position Control" + assert payload["section_anchor"] == "grain-position-control" + assert payload["slug"] == "granular-synthesis" + + # Verify deterministic UUID + expected_id = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:p1:grain-position-control")) + assert points[0].id == expected_id + + def test_upsert_count_mismatch_skips(self): + mgr, mock_client = self._make_manager() + mgr.upsert_technique_sections([{"page_id": "p1"}], [[0.1], [0.2]]) + assert not mock_client.upsert.called + + def test_upsert_empty_list_skips(self): + mgr, mock_client = self._make_manager() + mgr.upsert_technique_sections([], []) + assert not mock_client.upsert.called + + def test_summary_truncated_to_200_chars(self): + mgr, mock_client = self._make_manager() + long_summary = "x" * 500 + sections = [{ + "page_id": "p1", "section_heading": "H", "section_anchor": "h", + "summary": long_summary, + }] + vectors = [[0.1] * 768] + mgr.upsert_technique_sections(sections, vectors) + payload = mock_client.upsert.call_args[1]["points"][0].payload + assert len(payload["summary"]) == 200 + + def test_delete_sections_by_page_id(self): + mgr, mock_client = self._make_manager() + mgr.delete_sections_by_page_id("p1") + assert mock_client.delete.called + filter_arg = mock_client.delete.call_args[1]["points_selector"] + # Verify filter has both page_id and type conditions + must_conditions = filter_arg.must + assert len(must_conditions) == 2 + keys = {c.key for c in must_conditions} + assert keys == {"page_id", "type"} + + def test_delete_sections_logs_on_failure(self): + mgr, mock_client = self._make_manager() + mock_client.delete.side_effect = Exception("connection refused") + # Should not raise + mgr.delete_sections_by_page_id("p1") + + +# ── Stage 6 section embedding logic ───────────────────────────────────────── + +class TestStage6SectionEmbedding: + """Test the section embedding block within stage6_embed_and_index. + + Uses mocked DB, embedding client, and QdrantManager to verify: + - v2 pages produce section points + - v1 pages are skipped + - delete is called before upsert + - embed text includes creator/page/section context + - sections with empty headings are skipped + - subsection content is included in embed text + """ + + def _make_page(self, page_id="p1", creator_id="c1", format_="v2", + body_sections=None, title="Granular Synthesis", + slug="granular-synthesis"): + """Create a mock TechniquePage-like object.""" + page = MagicMock() + page.id = page_id + page.creator_id = creator_id + page.body_sections_format = format_ + page.body_sections = body_sections + page.title = title + page.slug = slug + page.topic_category = "Sound Design" + page.topic_tags = ["granular"] + page.summary = "Page summary" + return page + + def test_v1_page_produces_zero_sections(self): + """Pages with body_sections_format != 'v2' should be skipped.""" + page = self._make_page(format_="v1", body_sections=[ + {"heading": "Section A", "content": "Content A"}, + ]) + v2_pages = [p for p in [page] if getattr(p, "body_sections_format", "v1") == "v2"] + assert len(v2_pages) == 0 + + def test_v2_page_none_body_sections(self): + """Page with body_sections=None → skipped (not a list).""" + page = self._make_page(format_="v2", body_sections=None) + v2_pages = [p for p in [page] if getattr(p, "body_sections_format", "v1") == "v2"] + assert len(v2_pages) == 1 + # body_sections is None → not a list → skipped in the loop + assert not isinstance(page.body_sections, list) + + def test_section_empty_heading_skipped(self): + """Sections with empty heading should be skipped.""" + page = self._make_page(body_sections=[ + {"heading": "", "content": "Orphan content"}, + {"heading": "Valid", "content": "Real content"}, + ]) + sections_with_heading = [ + s for s in page.body_sections + if isinstance(s, dict) and s.get("heading", "").strip() + ] + assert len(sections_with_heading) == 1 + assert sections_with_heading[0]["heading"] == "Valid" + + def test_subsection_content_included_in_embed_text(self): + """Section with subsections should include subsection content.""" + section = { + "heading": "Grain Position Control", + "content": "Main content", + "subsections": [ + {"heading": "Fine Tuning", "content": "Fine tune the position."}, + {"heading": "Automation", "content": "Automate grain pos."}, + ], + } + + # Reproduce the embed text construction from stage 6 + creator_name = "Keota" + page_title = "Granular Synthesis" + heading = section["heading"] + section_content = section.get("content", "") + subsection_parts = [] + for sub in section.get("subsections", []): + if isinstance(sub, dict): + sub_heading = sub.get("heading", "") + sub_content = sub.get("content", "") + if sub_heading: + subsection_parts.append(f"{sub_heading}: {sub_content}") + elif sub_content: + subsection_parts.append(sub_content) + + embed_text = ( + f"{creator_name} {page_title} — {heading}: " + f"{section_content} {' '.join(subsection_parts)}" + ).strip() + + assert "Fine Tuning: Fine tune the position." in embed_text + assert "Automation: Automate grain pos." in embed_text + assert "Keota Granular Synthesis" in embed_text + + def test_subsection_no_direct_content(self): + """Section with subsections but no direct content still embeds subsection text.""" + section = { + "heading": "Advanced Techniques", + "content": "", + "subsections": [ + {"heading": "Sub A", "content": "Content A"}, + ], + } + heading = section["heading"] + section_content = section.get("content", "") + subsection_parts = [] + for sub in section.get("subsections", []): + if isinstance(sub, dict): + sub_heading = sub.get("heading", "") + sub_content = sub.get("content", "") + if sub_heading: + subsection_parts.append(f"{sub_heading}: {sub_content}") + elif sub_content: + subsection_parts.append(sub_content) + + embed_text = ( + f"Creator Page — {heading}: " + f"{section_content} {' '.join(subsection_parts)}" + ).strip() + + assert "Sub A: Content A" in embed_text + + def test_delete_called_before_upsert_ordering(self): + """Verify delete_sections_by_page_id is called before upsert_technique_sections.""" + call_order = [] + mock_qdrant = MagicMock() + mock_qdrant.delete_sections_by_page_id.side_effect = lambda pid: call_order.append(("delete", pid)) + mock_qdrant.upsert_technique_sections.side_effect = lambda s, v: call_order.append(("upsert", len(s))) + + mock_embed = MagicMock() + mock_embed.embed.return_value = [[0.1] * 768] # One vector + + page = self._make_page(body_sections=[ + {"heading": "Section A", "content": "Content A"}, + ]) + + creator_map = {str(page.creator_id): "TestCreator"} + v2_pages = [page] + page_id_str = str(page.id) + + # Simulate the section embedding block + for p in v2_pages: + body_sections = p.body_sections + if not isinstance(body_sections, list): + continue + creator_name = creator_map.get(str(p.creator_id), "") + mock_qdrant.delete_sections_by_page_id(str(p.id)) + + section_texts = [] + section_dicts = [] + for section in body_sections: + if not isinstance(section, dict): + continue + heading = section.get("heading", "") + if not heading or not heading.strip(): + continue + section_anchor = _slugify_heading(heading) + section_texts.append(f"{creator_name} {p.title} — {heading}") + section_dicts.append({"page_id": str(p.id), "section_anchor": section_anchor}) + + if section_texts: + vectors = mock_embed.embed(section_texts) + if vectors: + mock_qdrant.upsert_technique_sections(section_dicts, vectors) + + assert call_order[0][0] == "delete" + assert call_order[1][0] == "upsert" diff --git a/backend/schemas.py b/backend/schemas.py index 1418d6e..e97af61 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -216,6 +216,8 @@ class SearchResultItem(BaseModel): topic_category: str = "" topic_tags: list[str] = Field(default_factory=list) match_context: str = "" + section_anchor: str = "" + section_heading: str = "" class SearchResponse(BaseModel): diff --git a/backend/search_service.py b/backend/search_service.py index 5d221ed..6e898cb 100644 --- a/backend/search_service.py +++ b/backend/search_service.py @@ -419,9 +419,11 @@ class SearchService: scope = "all" # Map scope to Qdrant type filter + # topics scope: no filter — both technique_page and technique_section + # should appear in semantic results type_filter_map = { "all": None, - "topics": "technique_page", + "topics": None, "creators": None, } qdrant_type_filter = type_filter_map.get(scope) @@ -581,6 +583,8 @@ class SearchService: # Determine technique_page_slug based on result type if result_type == "technique_page": tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-")) + elif result_type == "technique_section": + tp_slug = payload.get("slug", "") else: tp_slug = payload.get("technique_page_slug", "") @@ -598,6 +602,8 @@ class SearchService: "created_at": payload.get("created_at", ""), "score": r.get("score", 0.0), "match_context": "", + "section_anchor": payload.get("section_anchor", "") if result_type == "technique_section" else "", + "section_heading": payload.get("section_heading", "") if result_type == "technique_section" else "", }) return enriched