feat: Enriched Qdrant embedding text with creator_name/tags and added r…

- "backend/pipeline/stages.py"
- "backend/pipeline/qdrant_client.py"
- "backend/routers/pipeline.py"

GSD-Task: S01/T02
This commit is contained in:
jlightner 2026-04-01 06:17:55 +00:00
parent 9c0247c830
commit 5a484fb27a
3 changed files with 82 additions and 2 deletions

View file

@ -168,6 +168,7 @@ class QdrantManager:
"type": "technique_page",
"page_id": page["page_id"],
"creator_id": page["creator_id"],
"creator_name": page.get("creator_name", ""),
"title": page["title"],
"slug": page.get("slug", ""),
"topic_category": page["topic_category"],
@ -216,6 +217,7 @@ class QdrantManager:
"technique_page_id": moment.get("technique_page_id", ""),
"technique_page_slug": moment.get("technique_page_slug", ""),
"title": moment["title"],
"creator_name": moment.get("creator_name", ""),
"start_time": moment["start_time"],
"end_time": moment["end_time"],
"content_type": moment["content_type"],

View file

@ -1291,6 +1291,30 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
_finish_run(run_id, "complete")
return video_id
# Resolve creator names for enriched embedding text
creator_ids = {p.creator_id for p in pages}
creator_map: dict[str, str] = {}
if creator_ids:
creators = (
session.execute(
select(Creator).where(Creator.id.in_(creator_ids))
)
.scalars()
.all()
)
creator_map = {str(c.id): c.name for c in creators}
# Resolve creator name for key moments via source_video → creator
video_ids = {m.source_video_id for m in moments}
video_creator_map: dict[str, str] = {}
if video_ids:
rows = session.execute(
select(SourceVideo.id, Creator.name)
.join(Creator, SourceVideo.creator_id == Creator.id)
.where(SourceVideo.id.in_(video_ids))
).all()
video_creator_map = {str(r[0]): r[1] for r in rows}
embed_client = EmbeddingClient(settings)
qdrant = QdrantManager(settings)
@ -1302,11 +1326,14 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
page_texts = []
page_dicts = []
for p in pages:
text = f"{p.title} {p.summary or ''} {p.topic_category or ''}"
creator_name = creator_map.get(str(p.creator_id), "")
tags_joined = " ".join(p.topic_tags) if p.topic_tags else ""
text = f"{creator_name} {p.title} {p.topic_category or ''} {tags_joined} {p.summary or ''}"
page_texts.append(text.strip())
page_dicts.append({
"page_id": str(p.id),
"creator_id": str(p.creator_id),
"creator_name": creator_name,
"title": p.title,
"slug": p.slug,
"topic_category": p.topic_category or "",
@ -1339,7 +1366,8 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
moment_texts = []
moment_dicts = []
for m in moments:
text = f"{m.title} {m.summary or ''}"
creator_name = video_creator_map.get(str(m.source_video_id), "")
text = f"{creator_name} {m.title} {m.summary or ''}"
moment_texts.append(text.strip())
tp_id = str(m.technique_page_id) if m.technique_page_id else ""
moment_dicts.append({
@ -1348,6 +1376,7 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
"technique_page_id": tp_id,
"technique_page_slug": page_id_to_slug.get(tp_id, ""),
"title": m.title,
"creator_name": creator_name,
"start_time": m.start_time,
"end_time": m.end_time,
"content_type": m.content_type.value,

View file

@ -578,6 +578,55 @@ async def get_token_summary(
)
# ── Admin: Worker status ─────────────────────────────────────────────────────
@router.post("/admin/pipeline/reindex-all")
async def reindex_all(
db: AsyncSession = Depends(get_session),
):
"""Re-run stage 6 (embed & index) for all videos with processing_status='complete'.
Use after changing embedding text composition or Qdrant payload fields
to regenerate all vectors and payloads without re-running the full pipeline.
"""
stmt = select(SourceVideo.id).where(
SourceVideo.processing_status == ProcessingStatus.complete
)
result = await db.execute(stmt)
video_ids = [str(row[0]) for row in result.all()]
if not video_ids:
return {
"status": "no_videos",
"message": "No videos with processing_status='complete' found.",
"dispatched": 0,
}
from pipeline.stages import stage6_embed_and_index
dispatched = 0
errors = []
for vid in video_ids:
try:
stage6_embed_and_index.delay(vid)
dispatched += 1
except Exception as exc:
logger.warning("Failed to dispatch reindex for video_id=%s: %s", vid, exc)
errors.append({"video_id": vid, "error": str(exc)})
logger.info(
"Reindex-all dispatched %d/%d stage6 tasks.",
dispatched, len(video_ids),
)
return {
"status": "dispatched",
"dispatched": dispatched,
"total_complete_videos": len(video_ids),
"errors": errors if errors else None,
}
# ── Admin: Worker status ─────────────────────────────────────────────────────
@router.get("/admin/pipeline/worker-status")