feat: Enriched Qdrant embedding text with creator_name/tags and added r…

- "backend/pipeline/stages.py"
- "backend/pipeline/qdrant_client.py"
- "backend/routers/pipeline.py"

GSD-Task: S01/T02
This commit is contained in:
jlightner 2026-04-01 06:17:55 +00:00
parent 84e7a9906c
commit fa82f1079a
6 changed files with 173 additions and 3 deletions

View file

@ -15,7 +15,7 @@
- Estimate: 45min
- Files: backend/search_service.py, backend/schemas.py, backend/routers/search.py
- Verify: Run targeted curl: `curl 'http://localhost:8001/api/v1/search?q=keota+snare'` returns results matching both tokens. `curl 'http://localhost:8001/api/v1/search?q=xyznonexistent+snare'` returns empty items but partial_matches with snare-only results.
- [ ] **T02: Enrich Qdrant embedding text and payloads** — In `backend/pipeline/stages.py` stage6_embed_and_index:
- [x] **T02: Enriched Qdrant embedding text with creator_name/tags and added reindex-all endpoint** — In `backend/pipeline/stages.py` stage6_embed_and_index:
1. For technique pages, change embedding text from `'{title} {summary} {topic_category}'` to `'{creator_name} {title} {topic_category} {tags_joined} {summary}'` where creator_name comes from the joined Creator row and tags_joined is `' '.join(topic_tags)`.
2. For key moments, change from `'{title} {summary}'` to `'{creator_name} {title} {summary}'`.
3. In `qdrant_client.py`, add `creator_name` to key_moment payload dict.

View file

@ -0,0 +1,9 @@
{
"schemaVersion": 1,
"taskId": "T01",
"unitId": "M012/S01/T01",
"timestamp": 1775024120919,
"passed": true,
"discoverySource": "none",
"checks": []
}

View file

@ -0,0 +1,81 @@
---
id: T02
parent: S01
milestone: M012
provides: []
requires: []
affects: []
key_files: ["backend/pipeline/stages.py", "backend/pipeline/qdrant_client.py", "backend/routers/pipeline.py"]
key_decisions: ["Resolve creator names via batch query at start of stage 6 rather than per-item lookups", "Added creator_name to technique_page Qdrant payload for consistency"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All 3 modified files pass AST syntax validation. Embedding text composition confirmed via grep. Qdrant payloads include creator_name for both point types. Reindex-all endpoint registered."
completed_at: 2026-04-01T06:17:45.233Z
blocker_discovered: false
---
# T02: Enriched Qdrant embedding text with creator_name/tags and added reindex-all endpoint
> Enriched Qdrant embedding text with creator_name/tags and added reindex-all endpoint
## What Happened
---
id: T02
parent: S01
milestone: M012
key_files:
- backend/pipeline/stages.py
- backend/pipeline/qdrant_client.py
- backend/routers/pipeline.py
key_decisions:
- Resolve creator names via batch query at start of stage 6 rather than per-item lookups
- Added creator_name to technique_page Qdrant payload for consistency
duration: ""
verification_result: passed
completed_at: 2026-04-01T06:17:45.233Z
blocker_discovered: false
---
# T02: Enriched Qdrant embedding text with creator_name/tags and added reindex-all endpoint
**Enriched Qdrant embedding text with creator_name/tags and added reindex-all endpoint**
## What Happened
Modified stage 6 to produce richer embedding text: technique pages now embed as '{creator_name} {title} {topic_category} {tags_joined} {summary}', key moments as '{creator_name} {title} {summary}'. Creator names resolved via batch queries. Added creator_name to both Qdrant payload types. Added POST /admin/pipeline/reindex-all endpoint to dispatch stage 6 for all complete videos.
## Verification
All 3 modified files pass AST syntax validation. Embedding text composition confirmed via grep. Qdrant payloads include creator_name for both point types. Reindex-all endpoint registered.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `python -c "import ast; ast.parse(open('pipeline/stages.py').read())"` | 0 | ✅ pass | 200ms |
| 2 | `python -c "import ast; ast.parse(open('pipeline/qdrant_client.py').read())"` | 0 | ✅ pass | 200ms |
| 3 | `python -c "import ast; ast.parse(open('routers/pipeline.py').read())"` | 0 | ✅ pass | 200ms |
## Deviations
Added creator_name to technique_page Qdrant payload too (plan only mentioned key_moment). Strictly additive.
## Known Issues
None.
## Files Created/Modified
- `backend/pipeline/stages.py`
- `backend/pipeline/qdrant_client.py`
- `backend/routers/pipeline.py`
## Deviations
Added creator_name to technique_page Qdrant payload too (plan only mentioned key_moment). Strictly additive.
## Known Issues
None.

View file

@ -168,6 +168,7 @@ class QdrantManager:
"type": "technique_page",
"page_id": page["page_id"],
"creator_id": page["creator_id"],
"creator_name": page.get("creator_name", ""),
"title": page["title"],
"slug": page.get("slug", ""),
"topic_category": page["topic_category"],
@ -216,6 +217,7 @@ class QdrantManager:
"technique_page_id": moment.get("technique_page_id", ""),
"technique_page_slug": moment.get("technique_page_slug", ""),
"title": moment["title"],
"creator_name": moment.get("creator_name", ""),
"start_time": moment["start_time"],
"end_time": moment["end_time"],
"content_type": moment["content_type"],

View file

@ -1291,6 +1291,30 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
_finish_run(run_id, "complete")
return video_id
# Resolve creator names for enriched embedding text
creator_ids = {p.creator_id for p in pages}
creator_map: dict[str, str] = {}
if creator_ids:
creators = (
session.execute(
select(Creator).where(Creator.id.in_(creator_ids))
)
.scalars()
.all()
)
creator_map = {str(c.id): c.name for c in creators}
# Resolve creator name for key moments via source_video → creator
video_ids = {m.source_video_id for m in moments}
video_creator_map: dict[str, str] = {}
if video_ids:
rows = session.execute(
select(SourceVideo.id, Creator.name)
.join(Creator, SourceVideo.creator_id == Creator.id)
.where(SourceVideo.id.in_(video_ids))
).all()
video_creator_map = {str(r[0]): r[1] for r in rows}
embed_client = EmbeddingClient(settings)
qdrant = QdrantManager(settings)
@ -1302,11 +1326,14 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
page_texts = []
page_dicts = []
for p in pages:
text = f"{p.title} {p.summary or ''} {p.topic_category or ''}"
creator_name = creator_map.get(str(p.creator_id), "")
tags_joined = " ".join(p.topic_tags) if p.topic_tags else ""
text = f"{creator_name} {p.title} {p.topic_category or ''} {tags_joined} {p.summary or ''}"
page_texts.append(text.strip())
page_dicts.append({
"page_id": str(p.id),
"creator_id": str(p.creator_id),
"creator_name": creator_name,
"title": p.title,
"slug": p.slug,
"topic_category": p.topic_category or "",
@ -1339,7 +1366,8 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
moment_texts = []
moment_dicts = []
for m in moments:
text = f"{m.title} {m.summary or ''}"
creator_name = video_creator_map.get(str(m.source_video_id), "")
text = f"{creator_name} {m.title} {m.summary or ''}"
moment_texts.append(text.strip())
tp_id = str(m.technique_page_id) if m.technique_page_id else ""
moment_dicts.append({
@ -1348,6 +1376,7 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
"technique_page_id": tp_id,
"technique_page_slug": page_id_to_slug.get(tp_id, ""),
"title": m.title,
"creator_name": creator_name,
"start_time": m.start_time,
"end_time": m.end_time,
"content_type": m.content_type.value,

View file

@ -578,6 +578,55 @@ async def get_token_summary(
)
# ── Admin: Worker status ─────────────────────────────────────────────────────
@router.post("/admin/pipeline/reindex-all")
async def reindex_all(
db: AsyncSession = Depends(get_session),
):
"""Re-run stage 6 (embed & index) for all videos with processing_status='complete'.
Use after changing embedding text composition or Qdrant payload fields
to regenerate all vectors and payloads without re-running the full pipeline.
"""
stmt = select(SourceVideo.id).where(
SourceVideo.processing_status == ProcessingStatus.complete
)
result = await db.execute(stmt)
video_ids = [str(row[0]) for row in result.all()]
if not video_ids:
return {
"status": "no_videos",
"message": "No videos with processing_status='complete' found.",
"dispatched": 0,
}
from pipeline.stages import stage6_embed_and_index
dispatched = 0
errors = []
for vid in video_ids:
try:
stage6_embed_and_index.delay(vid)
dispatched += 1
except Exception as exc:
logger.warning("Failed to dispatch reindex for video_id=%s: %s", vid, exc)
errors.append({"video_id": vid, "error": str(exc)})
logger.info(
"Reindex-all dispatched %d/%d stage6 tasks.",
dispatched, len(video_ids),
)
return {
"status": "dispatched",
"dispatched": dispatched,
"total_complete_videos": len(video_ids),
"errors": errors if errors else None,
}
# ── Admin: Worker status ─────────────────────────────────────────────────────
@router.get("/admin/pipeline/worker-status")