feat: Added per-section embedding to stage 6 for v2 technique pages wit…

- "backend/schemas.py"
- "backend/pipeline/stages.py"
- "backend/pipeline/qdrant_client.py"
- "backend/search_service.py"
- "backend/pipeline/test_section_embedding.py"

GSD-Task: S07/T01
This commit is contained in:
jlightner 2026-04-03 02:12:56 +00:00
parent edfabb037a
commit fd683e8266
5 changed files with 522 additions and 1 deletions

View file

@ -233,3 +233,87 @@ class QdrantManager:
points.append(point)
self.upsert_points(points)
# ── Technique section operations ─────────────────────────────────────
def delete_sections_by_page_id(self, page_id: str) -> None:
"""Delete all technique_section points for a given page_id.
Called before re-upserting sections to prevent orphan points when
headings are renamed or sections removed. Non-blocking logs warning
on failure.
"""
from qdrant_client.models import FieldCondition, Filter, MatchValue
try:
self._client.delete(
collection_name=self._collection,
points_selector=Filter(
must=[
FieldCondition(
key="page_id",
match=MatchValue(value=page_id),
),
FieldCondition(
key="type",
match=MatchValue(value="technique_section"),
),
],
),
)
logger.info(
"Deleted technique_section points for page_id=%s from '%s'.",
page_id, self._collection,
)
except Exception as exc:
logger.warning(
"Qdrant delete sections for page_id=%s failed (%s: %s). Skipping.",
page_id, type(exc).__name__, exc,
)
def upsert_technique_sections(
self,
sections: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for technique page sections.
Each section dict must contain:
page_id, section_anchor, section_heading, creator_id, creator_name,
title (page title), slug (page slug), topic_category, topic_tags, summary
Uses deterministic UUIDs: ``uuid5(namespace, 'ts:{page_id}:{section_anchor}')``.
"""
if len(sections) != len(vectors):
logger.warning(
"Technique-section count (%d) != vector count (%d). Skipping upsert.",
len(sections), len(vectors),
)
return
points = []
for sec, vector in zip(sections, vectors):
point_id = str(uuid.uuid5(
_QDRANT_NAMESPACE,
f"ts:{sec['page_id']}:{sec['section_anchor']}",
))
point = PointStruct(
id=point_id,
vector=vector,
payload={
"type": "technique_section",
"page_id": sec["page_id"],
"creator_id": sec.get("creator_id", ""),
"creator_name": sec.get("creator_name", ""),
"title": sec.get("title", ""),
"slug": sec.get("slug", ""),
"section_heading": sec["section_heading"],
"section_anchor": sec["section_anchor"],
"topic_category": sec.get("topic_category", ""),
"topic_tags": sec.get("topic_tags") or [],
"summary": (sec.get("summary") or "")[:200],
},
)
points.append(point)
self.upsert_points(points)

View file

@ -12,6 +12,7 @@ from __future__ import annotations
import hashlib
import json
import logging
import re
import subprocess
import time
from collections import defaultdict
@ -1589,6 +1590,17 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
session.close()
# ── Heading slug helper (matches frontend TableOfContents.tsx slugify) ────────
def _slugify_heading(text: str) -> str:
"""Convert a heading string to a URL-friendly anchor slug.
Must produce identical output to the frontend's slugify in
``frontend/src/components/TableOfContents.tsx``.
"""
return re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-")
# ── Stage 6: Embed & Index ───────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=0)
@ -1741,6 +1753,95 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
len(moment_texts), video_id,
)
# ── Embed & upsert technique page sections (v2 only) ────────────
section_count = 0
v2_pages = [p for p in pages if getattr(p, "body_sections_format", "v1") == "v2"]
for p in v2_pages:
body_sections = p.body_sections
if not isinstance(body_sections, list):
continue
creator_name = creator_map.get(str(p.creator_id), "")
page_id_str = str(p.id)
# Delete stale section points before re-upserting
try:
qdrant.delete_sections_by_page_id(page_id_str)
except Exception as exc:
logger.warning(
"Stage 6: Failed to delete stale sections for page_id=%s: %s",
page_id_str, exc,
)
section_texts: list[str] = []
section_dicts: list[dict] = []
for section in body_sections:
if not isinstance(section, dict):
logger.warning(
"Stage 6: Malformed section (not a dict) in page_id=%s. Skipping.",
page_id_str,
)
continue
heading = section.get("heading", "")
if not heading or not heading.strip():
continue
section_anchor = _slugify_heading(heading)
section_content = section.get("content", "")
# Include subsection content for richer embedding
subsection_parts: list[str] = []
for sub in section.get("subsections", []):
if isinstance(sub, dict):
sub_heading = sub.get("heading", "")
sub_content = sub.get("content", "")
if sub_heading:
subsection_parts.append(f"{sub_heading}: {sub_content}")
elif sub_content:
subsection_parts.append(sub_content)
embed_text = (
f"{creator_name} {p.title}{heading}: "
f"{section_content} {' '.join(subsection_parts)}"
).strip()
section_texts.append(embed_text)
section_dicts.append({
"page_id": page_id_str,
"creator_id": str(p.creator_id),
"creator_name": creator_name,
"title": p.title,
"slug": p.slug,
"section_heading": heading,
"section_anchor": section_anchor,
"topic_category": p.topic_category or "",
"topic_tags": p.topic_tags or [],
"summary": (section_content or "")[:200],
})
if section_texts:
try:
section_vectors = embed_client.embed(section_texts)
if section_vectors:
qdrant.upsert_technique_sections(section_dicts, section_vectors)
section_count += len(section_vectors)
else:
logger.warning(
"Stage 6: Embedding returned empty for %d sections of page_id=%s. Skipping.",
len(section_texts), page_id_str,
)
except Exception as exc:
logger.warning(
"Stage 6: Section embedding failed for page_id=%s: %s. Skipping.",
page_id_str, exc,
)
if section_count:
logger.info(
"Stage 6: Upserted %d technique section vectors for video_id=%s",
section_count, video_id,
)
elapsed = time.monotonic() - start
logger.info(
"Stage 6 (embed & index) completed for video_id=%s in %.1fs — "

View file

@ -0,0 +1,328 @@
"""Unit tests for per-section embedding in stage 6.
Tests _slugify_heading, section embed text construction, delete-before-upsert
ordering, v1 page skipping, upsert payload correctness, and deterministic UUIDs.
"""
from __future__ import annotations
import uuid
from unittest.mock import MagicMock, call, patch
import pytest
# ── slugify tests ────────────────────────────────────────────────────────────
from pipeline.stages import _slugify_heading
class TestSlugifyHeading:
"""Verify _slugify_heading matches frontend TableOfContents.tsx slugify."""
def test_simple_heading(self):
assert _slugify_heading("Grain Position Control") == "grain-position-control"
def test_ampersand_and_special_chars(self):
# Consecutive non-alphanumeric chars collapse to a single hyphen
assert _slugify_heading("LFO Routing & Modulation") == "lfo-routing-modulation"
def test_leading_trailing_special(self):
assert _slugify_heading(" —Hello World! ") == "hello-world"
def test_numbers_preserved(self):
assert _slugify_heading("Step 1: Setup") == "step-1-setup"
def test_empty_string(self):
assert _slugify_heading("") == ""
def test_only_special_chars(self):
assert _slugify_heading("!@#$%") == ""
def test_unicode_stripped(self):
assert _slugify_heading("Café Sounds") == "caf-sounds"
def test_multiple_hyphens_collapse(self):
assert _slugify_heading("A -- B --- C") == "a-b-c"
# ── Deterministic UUID tests ─────────────────────────────────────────────────
_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
class TestDeterministicUUIDs:
"""Verify same page+section always produces the same point ID."""
def test_same_input_same_uuid(self):
id1 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:grain-position-control"))
id2 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:grain-position-control"))
assert id1 == id2
def test_different_section_different_uuid(self):
id1 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:section-a"))
id2 = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:page-abc:section-b"))
assert id1 != id2
# ── QdrantManager section methods ────────────────────────────────────────────
class TestQdrantManagerSections:
"""Test upsert_technique_sections and delete_sections_by_page_id."""
def _make_manager(self):
"""Create a QdrantManager with a mocked client."""
with patch("pipeline.qdrant_client.QdrantClient") as MockClient:
mock_client = MockClient.return_value
from pipeline.qdrant_client import QdrantManager
settings = MagicMock()
settings.qdrant_url = "http://localhost:6333"
settings.qdrant_collection = "test_collection"
settings.embedding_dimensions = 768
mgr = QdrantManager(settings)
mgr._client = mock_client
return mgr, mock_client
def test_upsert_builds_correct_payloads(self):
mgr, mock_client = self._make_manager()
sections = [
{
"page_id": "p1",
"creator_id": "c1",
"creator_name": "Keota",
"title": "Granular Synthesis",
"slug": "granular-synthesis",
"section_heading": "Grain Position Control",
"section_anchor": "grain-position-control",
"topic_category": "Sound Design",
"topic_tags": ["granular", "synthesis"],
"summary": "Control the grain position parameter.",
},
]
vectors = [[0.1] * 768]
mgr.upsert_technique_sections(sections, vectors)
# Verify upsert was called
assert mock_client.upsert.called
points = mock_client.upsert.call_args[1]["points"]
assert len(points) == 1
payload = points[0].payload
assert payload["type"] == "technique_section"
assert payload["page_id"] == "p1"
assert payload["section_heading"] == "Grain Position Control"
assert payload["section_anchor"] == "grain-position-control"
assert payload["slug"] == "granular-synthesis"
# Verify deterministic UUID
expected_id = str(uuid.uuid5(_QDRANT_NAMESPACE, "ts:p1:grain-position-control"))
assert points[0].id == expected_id
def test_upsert_count_mismatch_skips(self):
mgr, mock_client = self._make_manager()
mgr.upsert_technique_sections([{"page_id": "p1"}], [[0.1], [0.2]])
assert not mock_client.upsert.called
def test_upsert_empty_list_skips(self):
mgr, mock_client = self._make_manager()
mgr.upsert_technique_sections([], [])
assert not mock_client.upsert.called
def test_summary_truncated_to_200_chars(self):
mgr, mock_client = self._make_manager()
long_summary = "x" * 500
sections = [{
"page_id": "p1", "section_heading": "H", "section_anchor": "h",
"summary": long_summary,
}]
vectors = [[0.1] * 768]
mgr.upsert_technique_sections(sections, vectors)
payload = mock_client.upsert.call_args[1]["points"][0].payload
assert len(payload["summary"]) == 200
def test_delete_sections_by_page_id(self):
mgr, mock_client = self._make_manager()
mgr.delete_sections_by_page_id("p1")
assert mock_client.delete.called
filter_arg = mock_client.delete.call_args[1]["points_selector"]
# Verify filter has both page_id and type conditions
must_conditions = filter_arg.must
assert len(must_conditions) == 2
keys = {c.key for c in must_conditions}
assert keys == {"page_id", "type"}
def test_delete_sections_logs_on_failure(self):
mgr, mock_client = self._make_manager()
mock_client.delete.side_effect = Exception("connection refused")
# Should not raise
mgr.delete_sections_by_page_id("p1")
# ── Stage 6 section embedding logic ─────────────────────────────────────────
class TestStage6SectionEmbedding:
"""Test the section embedding block within stage6_embed_and_index.
Uses mocked DB, embedding client, and QdrantManager to verify:
- v2 pages produce section points
- v1 pages are skipped
- delete is called before upsert
- embed text includes creator/page/section context
- sections with empty headings are skipped
- subsection content is included in embed text
"""
def _make_page(self, page_id="p1", creator_id="c1", format_="v2",
body_sections=None, title="Granular Synthesis",
slug="granular-synthesis"):
"""Create a mock TechniquePage-like object."""
page = MagicMock()
page.id = page_id
page.creator_id = creator_id
page.body_sections_format = format_
page.body_sections = body_sections
page.title = title
page.slug = slug
page.topic_category = "Sound Design"
page.topic_tags = ["granular"]
page.summary = "Page summary"
return page
def test_v1_page_produces_zero_sections(self):
"""Pages with body_sections_format != 'v2' should be skipped."""
page = self._make_page(format_="v1", body_sections=[
{"heading": "Section A", "content": "Content A"},
])
v2_pages = [p for p in [page] if getattr(p, "body_sections_format", "v1") == "v2"]
assert len(v2_pages) == 0
def test_v2_page_none_body_sections(self):
"""Page with body_sections=None → skipped (not a list)."""
page = self._make_page(format_="v2", body_sections=None)
v2_pages = [p for p in [page] if getattr(p, "body_sections_format", "v1") == "v2"]
assert len(v2_pages) == 1
# body_sections is None → not a list → skipped in the loop
assert not isinstance(page.body_sections, list)
def test_section_empty_heading_skipped(self):
"""Sections with empty heading should be skipped."""
page = self._make_page(body_sections=[
{"heading": "", "content": "Orphan content"},
{"heading": "Valid", "content": "Real content"},
])
sections_with_heading = [
s for s in page.body_sections
if isinstance(s, dict) and s.get("heading", "").strip()
]
assert len(sections_with_heading) == 1
assert sections_with_heading[0]["heading"] == "Valid"
def test_subsection_content_included_in_embed_text(self):
"""Section with subsections should include subsection content."""
section = {
"heading": "Grain Position Control",
"content": "Main content",
"subsections": [
{"heading": "Fine Tuning", "content": "Fine tune the position."},
{"heading": "Automation", "content": "Automate grain pos."},
],
}
# Reproduce the embed text construction from stage 6
creator_name = "Keota"
page_title = "Granular Synthesis"
heading = section["heading"]
section_content = section.get("content", "")
subsection_parts = []
for sub in section.get("subsections", []):
if isinstance(sub, dict):
sub_heading = sub.get("heading", "")
sub_content = sub.get("content", "")
if sub_heading:
subsection_parts.append(f"{sub_heading}: {sub_content}")
elif sub_content:
subsection_parts.append(sub_content)
embed_text = (
f"{creator_name} {page_title}{heading}: "
f"{section_content} {' '.join(subsection_parts)}"
).strip()
assert "Fine Tuning: Fine tune the position." in embed_text
assert "Automation: Automate grain pos." in embed_text
assert "Keota Granular Synthesis" in embed_text
def test_subsection_no_direct_content(self):
"""Section with subsections but no direct content still embeds subsection text."""
section = {
"heading": "Advanced Techniques",
"content": "",
"subsections": [
{"heading": "Sub A", "content": "Content A"},
],
}
heading = section["heading"]
section_content = section.get("content", "")
subsection_parts = []
for sub in section.get("subsections", []):
if isinstance(sub, dict):
sub_heading = sub.get("heading", "")
sub_content = sub.get("content", "")
if sub_heading:
subsection_parts.append(f"{sub_heading}: {sub_content}")
elif sub_content:
subsection_parts.append(sub_content)
embed_text = (
f"Creator Page — {heading}: "
f"{section_content} {' '.join(subsection_parts)}"
).strip()
assert "Sub A: Content A" in embed_text
def test_delete_called_before_upsert_ordering(self):
"""Verify delete_sections_by_page_id is called before upsert_technique_sections."""
call_order = []
mock_qdrant = MagicMock()
mock_qdrant.delete_sections_by_page_id.side_effect = lambda pid: call_order.append(("delete", pid))
mock_qdrant.upsert_technique_sections.side_effect = lambda s, v: call_order.append(("upsert", len(s)))
mock_embed = MagicMock()
mock_embed.embed.return_value = [[0.1] * 768] # One vector
page = self._make_page(body_sections=[
{"heading": "Section A", "content": "Content A"},
])
creator_map = {str(page.creator_id): "TestCreator"}
v2_pages = [page]
page_id_str = str(page.id)
# Simulate the section embedding block
for p in v2_pages:
body_sections = p.body_sections
if not isinstance(body_sections, list):
continue
creator_name = creator_map.get(str(p.creator_id), "")
mock_qdrant.delete_sections_by_page_id(str(p.id))
section_texts = []
section_dicts = []
for section in body_sections:
if not isinstance(section, dict):
continue
heading = section.get("heading", "")
if not heading or not heading.strip():
continue
section_anchor = _slugify_heading(heading)
section_texts.append(f"{creator_name} {p.title}{heading}")
section_dicts.append({"page_id": str(p.id), "section_anchor": section_anchor})
if section_texts:
vectors = mock_embed.embed(section_texts)
if vectors:
mock_qdrant.upsert_technique_sections(section_dicts, vectors)
assert call_order[0][0] == "delete"
assert call_order[1][0] == "upsert"

View file

@ -216,6 +216,8 @@ class SearchResultItem(BaseModel):
topic_category: str = ""
topic_tags: list[str] = Field(default_factory=list)
match_context: str = ""
section_anchor: str = ""
section_heading: str = ""
class SearchResponse(BaseModel):

View file

@ -419,9 +419,11 @@ class SearchService:
scope = "all"
# Map scope to Qdrant type filter
# topics scope: no filter — both technique_page and technique_section
# should appear in semantic results
type_filter_map = {
"all": None,
"topics": "technique_page",
"topics": None,
"creators": None,
}
qdrant_type_filter = type_filter_map.get(scope)
@ -581,6 +583,8 @@ class SearchService:
# Determine technique_page_slug based on result type
if result_type == "technique_page":
tp_slug = payload.get("slug", payload.get("title", "").lower().replace(" ", "-"))
elif result_type == "technique_section":
tp_slug = payload.get("slug", "")
else:
tp_slug = payload.get("technique_page_slug", "")
@ -598,6 +602,8 @@ class SearchService:
"created_at": payload.get("created_at", ""),
"score": r.get("score", 0.0),
"match_context": "",
"section_anchor": payload.get("section_anchor", "") if result_type == "technique_section" else "",
"section_heading": payload.get("section_heading", "") if result_type == "technique_section" else "",
})
return enriched