From 5a484fb27a8a7dcb8802e3adfcd510fff80dbbf2 Mon Sep 17 00:00:00 2001 From: jlightner Date: Wed, 1 Apr 2026 06:17:55 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Enriched=20Qdrant=20embedding=20text=20?= =?UTF-8?q?with=20creator=5Fname/tags=20and=20added=20r=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/stages.py" - "backend/pipeline/qdrant_client.py" - "backend/routers/pipeline.py" GSD-Task: S01/T02 --- backend/pipeline/qdrant_client.py | 2 ++ backend/pipeline/stages.py | 33 +++++++++++++++++++-- backend/routers/pipeline.py | 49 +++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/backend/pipeline/qdrant_client.py b/backend/pipeline/qdrant_client.py index 85cc4ee..d2f4b3c 100644 --- a/backend/pipeline/qdrant_client.py +++ b/backend/pipeline/qdrant_client.py @@ -168,6 +168,7 @@ class QdrantManager: "type": "technique_page", "page_id": page["page_id"], "creator_id": page["creator_id"], + "creator_name": page.get("creator_name", ""), "title": page["title"], "slug": page.get("slug", ""), "topic_category": page["topic_category"], @@ -216,6 +217,7 @@ class QdrantManager: "technique_page_id": moment.get("technique_page_id", ""), "technique_page_slug": moment.get("technique_page_slug", ""), "title": moment["title"], + "creator_name": moment.get("creator_name", ""), "start_time": moment["start_time"], "end_time": moment["end_time"], "content_type": moment["content_type"], diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index e8085cd..1532c9f 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -1291,6 +1291,30 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st _finish_run(run_id, "complete") return video_id + # Resolve creator names for enriched embedding text + creator_ids = {p.creator_id for p in pages} + creator_map: dict[str, str] = {} + if creator_ids: + creators = ( + session.execute( + select(Creator).where(Creator.id.in_(creator_ids)) + ) + .scalars() + .all() + ) + creator_map = {str(c.id): c.name for c in creators} + + # Resolve creator name for key moments via source_video → creator + video_ids = {m.source_video_id for m in moments} + video_creator_map: dict[str, str] = {} + if video_ids: + rows = session.execute( + select(SourceVideo.id, Creator.name) + .join(Creator, SourceVideo.creator_id == Creator.id) + .where(SourceVideo.id.in_(video_ids)) + ).all() + video_creator_map = {str(r[0]): r[1] for r in rows} + embed_client = EmbeddingClient(settings) qdrant = QdrantManager(settings) @@ -1302,11 +1326,14 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st page_texts = [] page_dicts = [] for p in pages: - text = f"{p.title} {p.summary or ''} {p.topic_category or ''}" + creator_name = creator_map.get(str(p.creator_id), "") + tags_joined = " ".join(p.topic_tags) if p.topic_tags else "" + text = f"{creator_name} {p.title} {p.topic_category or ''} {tags_joined} {p.summary or ''}" page_texts.append(text.strip()) page_dicts.append({ "page_id": str(p.id), "creator_id": str(p.creator_id), + "creator_name": creator_name, "title": p.title, "slug": p.slug, "topic_category": p.topic_category or "", @@ -1339,7 +1366,8 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st moment_texts = [] moment_dicts = [] for m in moments: - text = f"{m.title} {m.summary or ''}" + creator_name = video_creator_map.get(str(m.source_video_id), "") + text = f"{creator_name} {m.title} {m.summary or ''}" moment_texts.append(text.strip()) tp_id = str(m.technique_page_id) if m.technique_page_id else "" moment_dicts.append({ @@ -1348,6 +1376,7 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st "technique_page_id": tp_id, "technique_page_slug": page_id_to_slug.get(tp_id, ""), "title": m.title, + "creator_name": creator_name, "start_time": m.start_time, "end_time": m.end_time, "content_type": m.content_type.value, diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index 513cc8c..7f45a90 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -578,6 +578,55 @@ async def get_token_summary( ) +# ── Admin: Worker status ───────────────────────────────────────────────────── + +@router.post("/admin/pipeline/reindex-all") +async def reindex_all( + db: AsyncSession = Depends(get_session), +): + """Re-run stage 6 (embed & index) for all videos with processing_status='complete'. + + Use after changing embedding text composition or Qdrant payload fields + to regenerate all vectors and payloads without re-running the full pipeline. + """ + stmt = select(SourceVideo.id).where( + SourceVideo.processing_status == ProcessingStatus.complete + ) + result = await db.execute(stmt) + video_ids = [str(row[0]) for row in result.all()] + + if not video_ids: + return { + "status": "no_videos", + "message": "No videos with processing_status='complete' found.", + "dispatched": 0, + } + + from pipeline.stages import stage6_embed_and_index + + dispatched = 0 + errors = [] + for vid in video_ids: + try: + stage6_embed_and_index.delay(vid) + dispatched += 1 + except Exception as exc: + logger.warning("Failed to dispatch reindex for video_id=%s: %s", vid, exc) + errors.append({"video_id": vid, "error": str(exc)}) + + logger.info( + "Reindex-all dispatched %d/%d stage6 tasks.", + dispatched, len(video_ids), + ) + + return { + "status": "dispatched", + "dispatched": dispatched, + "total_complete_videos": len(video_ids), + "errors": errors if errors else None, + } + + # ── Admin: Worker status ───────────────────────────────────────────────────── @router.get("/admin/pipeline/worker-status")