diff --git a/.gsd/milestones/M001/slices/S03/S03-PLAN.md b/.gsd/milestones/M001/slices/S03/S03-PLAN.md index 8cf6f2e..4907c4c 100644 --- a/.gsd/milestones/M001/slices/S03/S03-PLAN.md +++ b/.gsd/milestones/M001/slices/S03/S03-PLAN.md @@ -133,7 +133,7 @@ - Estimate: 2.5h - Files: prompts/stage2_segmentation.txt, prompts/stage3_extraction.txt, prompts/stage4_classification.txt, prompts/stage5_synthesis.txt, backend/pipeline/stages.py, backend/worker.py - Verify: test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt && cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')" -- [ ] **T03: Qdrant integration and embedding client** — Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads. +- [x] **T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage** — Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads. ## Failure Modes diff --git a/.gsd/milestones/M001/slices/S03/tasks/T02-VERIFY.json b/.gsd/milestones/M001/slices/S03/tasks/T02-VERIFY.json new file mode 100644 index 0000000..8a93a38 --- /dev/null +++ b/.gsd/milestones/M001/slices/S03/tasks/T02-VERIFY.json @@ -0,0 +1,40 @@ +{ + "schemaVersion": 1, + "taskId": "T02", + "unitId": "M001/S03/T02", + "timestamp": 1774823766552, + "passed": true, + "discoverySource": "task-plan", + "checks": [ + { + "command": "test -f prompts/stage2_segmentation.txt", + "exitCode": 0, + "durationMs": 4, + "verdict": "pass" + }, + { + "command": "test -f prompts/stage3_extraction.txt", + "exitCode": 0, + "durationMs": 3, + "verdict": "pass" + }, + { + "command": "test -f prompts/stage4_classification.txt", + "exitCode": 0, + "durationMs": 4, + "verdict": "pass" + }, + { + "command": "test -f prompts/stage5_synthesis.txt", + "exitCode": 0, + "durationMs": 3, + "verdict": "pass" + }, + { + "command": "cd backend", + "exitCode": 0, + "durationMs": 3, + "verdict": "pass" + } + ] +} diff --git a/.gsd/milestones/M001/slices/S03/tasks/T03-SUMMARY.md b/.gsd/milestones/M001/slices/S03/tasks/T03-SUMMARY.md new file mode 100644 index 0000000..54adb20 --- /dev/null +++ b/.gsd/milestones/M001/slices/S03/tasks/T03-SUMMARY.md @@ -0,0 +1,88 @@ +--- +id: T03 +parent: S03 +milestone: M001 +provides: [] +requires: [] +affects: [] +key_files: ["backend/pipeline/embedding_client.py", "backend/pipeline/qdrant_client.py", "backend/pipeline/stages.py"] +key_decisions: ["stage6 uses max_retries=0 and catches all exceptions — embedding is a non-blocking side-effect that can be regenerated later", "EmbeddingClient returns empty list on any API error so callers don't need try/except", "QdrantManager generates random UUIDs for point IDs to avoid conflicts across re-indexing"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "All 9 verification checks passed. Three task-level import checks (EmbeddingClient, QdrantManager, stage6_embed_and_index) exit 0. Full slice-level verification (Settings defaults, schemas, LLMClient, celery_app, grep for openai/qdrant-client) all pass. Celery task registry shows all 6 tasks registered. Structural validation confirmed sync openai.OpenAI usage, config-driven dimensions, idempotent collection creation, non-blocking error handling, and metadata payloads." +completed_at: 2026-03-29T22:39:01.970Z +blocker_discovered: false +--- + +# T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage + +> Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage + +## What Happened +--- +id: T03 +parent: S03 +milestone: M001 +key_files: + - backend/pipeline/embedding_client.py + - backend/pipeline/qdrant_client.py + - backend/pipeline/stages.py +key_decisions: + - stage6 uses max_retries=0 and catches all exceptions — embedding is a non-blocking side-effect that can be regenerated later + - EmbeddingClient returns empty list on any API error so callers don't need try/except + - QdrantManager generates random UUIDs for point IDs to avoid conflicts across re-indexing +duration: "" +verification_result: passed +completed_at: 2026-03-29T22:39:01.970Z +blocker_discovered: false +--- + +# T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage + +**Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage** + +## What Happened + +Created backend/pipeline/embedding_client.py with an EmbeddingClient class using sync openai.OpenAI for /v1/embeddings calls. The embed() method gracefully handles connection, timeout, and API errors by returning an empty list, and validates vector dimensions against settings.embedding_dimensions. Created backend/pipeline/qdrant_client.py with QdrantManager wrapping sync QdrantClient. ensure_collection() is idempotent (checks collection_exists before creating with cosine distance). upsert_technique_pages() and upsert_key_moments() build PointStruct objects with full metadata payloads and upsert them, catching all Qdrant errors without re-raising. Added stage6_embed_and_index Celery task to stages.py that loads KeyMoments and TechniquePages for a video, embeds their text, and upserts to Qdrant. Uses max_retries=0 and catches all exceptions — embedding failures never fail the pipeline. Updated run_pipeline to include stage6 in both chain paths. + +## Verification + +All 9 verification checks passed. Three task-level import checks (EmbeddingClient, QdrantManager, stage6_embed_and_index) exit 0. Full slice-level verification (Settings defaults, schemas, LLMClient, celery_app, grep for openai/qdrant-client) all pass. Celery task registry shows all 6 tasks registered. Structural validation confirmed sync openai.OpenAI usage, config-driven dimensions, idempotent collection creation, non-blocking error handling, and metadata payloads. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')"` | 0 | ✅ pass | 400ms | +| 2 | `cd backend && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')"` | 0 | ✅ pass | 400ms | +| 3 | `cd backend && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"` | 0 | ✅ pass | 500ms | +| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 300ms | +| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 300ms | +| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 300ms | +| 7 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 400ms | +| 8 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms | +| 9 | `cd backend && python -c "from worker import celery_app; tasks = [t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t]; print(tasks)"` | 0 | ✅ pass | 400ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `backend/pipeline/embedding_client.py` +- `backend/pipeline/qdrant_client.py` +- `backend/pipeline/stages.py` + + +## Deviations +None. + +## Known Issues +None. diff --git a/backend/pipeline/embedding_client.py b/backend/pipeline/embedding_client.py new file mode 100644 index 0000000..16c0921 --- /dev/null +++ b/backend/pipeline/embedding_client.py @@ -0,0 +1,88 @@ +"""Synchronous embedding client using the OpenAI-compatible /v1/embeddings API. + +Uses ``openai.OpenAI`` (sync) since Celery tasks run synchronously. +Handles connection failures gracefully — embedding is non-blocking for the pipeline. +""" + +from __future__ import annotations + +import logging + +import openai + +from config import Settings + +logger = logging.getLogger(__name__) + + +class EmbeddingClient: + """Sync embedding client backed by an OpenAI-compatible /v1/embeddings endpoint.""" + + def __init__(self, settings: Settings) -> None: + self.settings = settings + self._client = openai.OpenAI( + base_url=settings.embedding_api_url, + api_key=settings.llm_api_key, + ) + + def embed(self, texts: list[str]) -> list[list[float]]: + """Generate embedding vectors for a batch of texts. + + Parameters + ---------- + texts: + List of strings to embed. + + Returns + ------- + list[list[float]] + Embedding vectors. Returns empty list on connection/timeout errors + so the pipeline can continue without embeddings. + """ + if not texts: + return [] + + try: + response = self._client.embeddings.create( + model=self.settings.embedding_model, + input=texts, + ) + except (openai.APIConnectionError, openai.APITimeoutError) as exc: + logger.warning( + "Embedding API unavailable (%s: %s). Skipping %d texts.", + type(exc).__name__, + exc, + len(texts), + ) + return [] + except openai.APIError as exc: + logger.warning( + "Embedding API error (%s: %s). Skipping %d texts.", + type(exc).__name__, + exc, + len(texts), + ) + return [] + + vectors = [item.embedding for item in response.data] + + # Validate dimensions + expected_dim = self.settings.embedding_dimensions + for i, vec in enumerate(vectors): + if len(vec) != expected_dim: + logger.warning( + "Embedding dimension mismatch at index %d: expected %d, got %d. " + "Returning empty list.", + i, + expected_dim, + len(vec), + ) + return [] + + logger.info( + "Generated %d embeddings (dim=%d) using model=%s", + len(vectors), + expected_dim, + self.settings.embedding_model, + ) + return vectors diff --git a/backend/pipeline/qdrant_client.py b/backend/pipeline/qdrant_client.py new file mode 100644 index 0000000..db512fd --- /dev/null +++ b/backend/pipeline/qdrant_client.py @@ -0,0 +1,184 @@ +"""Qdrant vector database manager for collection lifecycle and point upserts. + +Handles collection creation (idempotent) and batch upserts for technique pages +and key moments. Connection failures are non-blocking — the pipeline continues +without search indexing. +""" + +from __future__ import annotations + +import logging +import uuid + +from qdrant_client import QdrantClient +from qdrant_client.http import exceptions as qdrant_exceptions +from qdrant_client.models import Distance, PointStruct, VectorParams + +from config import Settings + +logger = logging.getLogger(__name__) + + +class QdrantManager: + """Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors.""" + + def __init__(self, settings: Settings) -> None: + self.settings = settings + self._client = QdrantClient(url=settings.qdrant_url) + self._collection = settings.qdrant_collection + + # ── Collection management ──────────────────────────────────────────── + + def ensure_collection(self) -> None: + """Create the collection if it does not already exist. + + Uses cosine distance and the configured embedding dimensions. + """ + try: + if self._client.collection_exists(self._collection): + logger.info("Qdrant collection '%s' already exists.", self._collection) + return + + self._client.create_collection( + collection_name=self._collection, + vectors_config=VectorParams( + size=self.settings.embedding_dimensions, + distance=Distance.COSINE, + ), + ) + logger.info( + "Created Qdrant collection '%s' (dim=%d, cosine).", + self._collection, + self.settings.embedding_dimensions, + ) + except qdrant_exceptions.UnexpectedResponse as exc: + logger.warning( + "Qdrant error during ensure_collection (%s). Skipping.", + exc, + ) + except Exception as exc: + logger.warning( + "Qdrant connection failed during ensure_collection (%s: %s). Skipping.", + type(exc).__name__, + exc, + ) + + # ── Low-level upsert ───────────────────────────────────────────────── + + def upsert_points(self, points: list[PointStruct]) -> None: + """Upsert a batch of pre-built PointStruct objects.""" + if not points: + return + try: + self._client.upsert( + collection_name=self._collection, + points=points, + ) + logger.info( + "Upserted %d points to Qdrant collection '%s'.", + len(points), + self._collection, + ) + except qdrant_exceptions.UnexpectedResponse as exc: + logger.warning( + "Qdrant upsert failed (%s). %d points skipped.", + exc, + len(points), + ) + except Exception as exc: + logger.warning( + "Qdrant upsert connection error (%s: %s). %d points skipped.", + type(exc).__name__, + exc, + len(points), + ) + + # ── High-level upserts ─────────────────────────────────────────────── + + def upsert_technique_pages( + self, + pages: list[dict], + vectors: list[list[float]], + ) -> None: + """Build and upsert PointStructs for technique pages. + + Each page dict must contain: + page_id, creator_id, title, topic_category, topic_tags, summary + + Parameters + ---------- + pages: + Metadata dicts, one per technique page. + vectors: + Corresponding embedding vectors (same order as pages). + """ + if len(pages) != len(vectors): + logger.warning( + "Technique-page count (%d) != vector count (%d). Skipping upsert.", + len(pages), + len(vectors), + ) + return + + points = [] + for page, vector in zip(pages, vectors): + point = PointStruct( + id=str(uuid.uuid4()), + vector=vector, + payload={ + "type": "technique_page", + "page_id": page["page_id"], + "creator_id": page["creator_id"], + "title": page["title"], + "topic_category": page["topic_category"], + "topic_tags": page.get("topic_tags") or [], + "summary": page.get("summary") or "", + }, + ) + points.append(point) + + self.upsert_points(points) + + def upsert_key_moments( + self, + moments: list[dict], + vectors: list[list[float]], + ) -> None: + """Build and upsert PointStructs for key moments. + + Each moment dict must contain: + moment_id, source_video_id, title, start_time, end_time, content_type + + Parameters + ---------- + moments: + Metadata dicts, one per key moment. + vectors: + Corresponding embedding vectors (same order as moments). + """ + if len(moments) != len(vectors): + logger.warning( + "Key-moment count (%d) != vector count (%d). Skipping upsert.", + len(moments), + len(vectors), + ) + return + + points = [] + for moment, vector in zip(moments, vectors): + point = PointStruct( + id=str(uuid.uuid4()), + vector=vector, + payload={ + "type": "key_moment", + "moment_id": moment["moment_id"], + "source_video_id": moment["source_video_id"], + "title": moment["title"], + "start_time": moment["start_time"], + "end_time": moment["end_time"], + "content_type": moment["content_type"], + }, + ) + points.append(point) + + self.upsert_points(points) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 68ddfdc..a2893b0 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -30,7 +30,9 @@ from models import ( TechniquePage, TranscriptSegment, ) +from pipeline.embedding_client import EmbeddingClient from pipeline.llm_client import LLMClient +from pipeline.qdrant_client import QdrantManager from pipeline.schemas import ( ClassificationResult, ExtractionResult, @@ -577,6 +579,137 @@ def stage5_synthesis(self, video_id: str) -> str: session.close() +# ── Stage 6: Embed & Index ─────────────────────────────────────────────────── + +@celery_app.task(bind=True, max_retries=0) +def stage6_embed_and_index(self, video_id: str) -> str: + """Generate embeddings for technique pages and key moments, then upsert to Qdrant. + + This is a non-blocking side-effect stage — failures are logged but do not + fail the pipeline. Embeddings can be regenerated later. Does NOT update + processing_status. + + Returns the video_id for chain compatibility. + """ + start = time.monotonic() + logger.info("Stage 6 (embed & index) starting for video_id=%s", video_id) + + settings = get_settings() + session = _get_sync_session() + try: + # Load technique pages created for this video's moments + moments = ( + session.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + .scalars() + .all() + ) + + # Get unique technique page IDs from moments + page_ids = {m.technique_page_id for m in moments if m.technique_page_id is not None} + pages = [] + if page_ids: + pages = ( + session.execute( + select(TechniquePage).where(TechniquePage.id.in_(page_ids)) + ) + .scalars() + .all() + ) + + if not moments and not pages: + logger.info("Stage 6: No moments or pages for video_id=%s, skipping.", video_id) + return video_id + + embed_client = EmbeddingClient(settings) + qdrant = QdrantManager(settings) + + # Ensure collection exists before upserting + qdrant.ensure_collection() + + # ── Embed & upsert technique pages ─────────────────────────────── + if pages: + page_texts = [] + page_dicts = [] + for p in pages: + text = f"{p.title} {p.summary or ''} {p.topic_category or ''}" + page_texts.append(text.strip()) + page_dicts.append({ + "page_id": str(p.id), + "creator_id": str(p.creator_id), + "title": p.title, + "topic_category": p.topic_category or "", + "topic_tags": p.topic_tags or [], + "summary": p.summary or "", + }) + + page_vectors = embed_client.embed(page_texts) + if page_vectors: + qdrant.upsert_technique_pages(page_dicts, page_vectors) + logger.info( + "Stage 6: Upserted %d technique page vectors for video_id=%s", + len(page_vectors), video_id, + ) + else: + logger.warning( + "Stage 6: Embedding returned empty for %d technique pages (video_id=%s). " + "Skipping page upsert.", + len(page_texts), video_id, + ) + + # ── Embed & upsert key moments ─────────────────────────────────── + if moments: + moment_texts = [] + moment_dicts = [] + for m in moments: + text = f"{m.title} {m.summary or ''}" + moment_texts.append(text.strip()) + moment_dicts.append({ + "moment_id": str(m.id), + "source_video_id": str(m.source_video_id), + "title": m.title, + "start_time": m.start_time, + "end_time": m.end_time, + "content_type": m.content_type.value, + }) + + moment_vectors = embed_client.embed(moment_texts) + if moment_vectors: + qdrant.upsert_key_moments(moment_dicts, moment_vectors) + logger.info( + "Stage 6: Upserted %d key moment vectors for video_id=%s", + len(moment_vectors), video_id, + ) + else: + logger.warning( + "Stage 6: Embedding returned empty for %d key moments (video_id=%s). " + "Skipping moment upsert.", + len(moment_texts), video_id, + ) + + elapsed = time.monotonic() - start + logger.info( + "Stage 6 (embed & index) completed for video_id=%s in %.1fs — " + "%d pages, %d moments processed", + video_id, elapsed, len(pages), len(moments), + ) + return video_id + + except Exception as exc: + # Non-blocking: log error but don't fail the pipeline + logger.error( + "Stage 6 failed for video_id=%s: %s. " + "Pipeline continues — embeddings can be regenerated later.", + video_id, exc, + ) + return video_id + finally: + session.close() + + # ── Orchestrator ───────────────────────────────────────────────────────────── @celery_app.task @@ -618,11 +751,13 @@ def run_pipeline(video_id: str) -> str: stage3_extraction.s(), # receives video_id from previous stage4_classification.s(), stage5_synthesis.s(), + stage6_embed_and_index.s(), ] elif status == ProcessingStatus.extracted: stages = [ stage4_classification.s(video_id), stage5_synthesis.s(), + stage6_embed_and_index.s(), ] elif status in (ProcessingStatus.reviewed, ProcessingStatus.published): logger.info(