feat: Created sync EmbeddingClient, QdrantManager with idempotent colle…

- "backend/pipeline/embedding_client.py"
- "backend/pipeline/qdrant_client.py"
- "backend/pipeline/stages.py"

GSD-Task: S03/T03
This commit is contained in:
jlightner 2026-03-29 22:39:04 +00:00
parent b5635a09db
commit 5c46d1e922
6 changed files with 536 additions and 1 deletions

View file

@ -133,7 +133,7 @@
- Estimate: 2.5h
- Files: prompts/stage2_segmentation.txt, prompts/stage3_extraction.txt, prompts/stage4_classification.txt, prompts/stage5_synthesis.txt, backend/pipeline/stages.py, backend/worker.py
- Verify: test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt && cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"
- [ ] **T03: Qdrant integration and embedding client** — Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads.
- [x] **T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage** — Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads.
## Failure Modes

View file

@ -0,0 +1,40 @@
{
"schemaVersion": 1,
"taskId": "T02",
"unitId": "M001/S03/T02",
"timestamp": 1774823766552,
"passed": true,
"discoverySource": "task-plan",
"checks": [
{
"command": "test -f prompts/stage2_segmentation.txt",
"exitCode": 0,
"durationMs": 4,
"verdict": "pass"
},
{
"command": "test -f prompts/stage3_extraction.txt",
"exitCode": 0,
"durationMs": 3,
"verdict": "pass"
},
{
"command": "test -f prompts/stage4_classification.txt",
"exitCode": 0,
"durationMs": 4,
"verdict": "pass"
},
{
"command": "test -f prompts/stage5_synthesis.txt",
"exitCode": 0,
"durationMs": 3,
"verdict": "pass"
},
{
"command": "cd backend",
"exitCode": 0,
"durationMs": 3,
"verdict": "pass"
}
]
}

View file

@ -0,0 +1,88 @@
---
id: T03
parent: S03
milestone: M001
provides: []
requires: []
affects: []
key_files: ["backend/pipeline/embedding_client.py", "backend/pipeline/qdrant_client.py", "backend/pipeline/stages.py"]
key_decisions: ["stage6 uses max_retries=0 and catches all exceptions — embedding is a non-blocking side-effect that can be regenerated later", "EmbeddingClient returns empty list on any API error so callers don't need try/except", "QdrantManager generates random UUIDs for point IDs to avoid conflicts across re-indexing"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All 9 verification checks passed. Three task-level import checks (EmbeddingClient, QdrantManager, stage6_embed_and_index) exit 0. Full slice-level verification (Settings defaults, schemas, LLMClient, celery_app, grep for openai/qdrant-client) all pass. Celery task registry shows all 6 tasks registered. Structural validation confirmed sync openai.OpenAI usage, config-driven dimensions, idempotent collection creation, non-blocking error handling, and metadata payloads."
completed_at: 2026-03-29T22:39:01.970Z
blocker_discovered: false
---
# T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage
> Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage
## What Happened
---
id: T03
parent: S03
milestone: M001
key_files:
- backend/pipeline/embedding_client.py
- backend/pipeline/qdrant_client.py
- backend/pipeline/stages.py
key_decisions:
- stage6 uses max_retries=0 and catches all exceptions — embedding is a non-blocking side-effect that can be regenerated later
- EmbeddingClient returns empty list on any API error so callers don't need try/except
- QdrantManager generates random UUIDs for point IDs to avoid conflicts across re-indexing
duration: ""
verification_result: passed
completed_at: 2026-03-29T22:39:01.970Z
blocker_discovered: false
---
# T03: Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage
**Created sync EmbeddingClient, QdrantManager with idempotent collection management and metadata-rich upserts, and wired stage6_embed_and_index into the Celery pipeline chain as a non-blocking side-effect stage**
## What Happened
Created backend/pipeline/embedding_client.py with an EmbeddingClient class using sync openai.OpenAI for /v1/embeddings calls. The embed() method gracefully handles connection, timeout, and API errors by returning an empty list, and validates vector dimensions against settings.embedding_dimensions. Created backend/pipeline/qdrant_client.py with QdrantManager wrapping sync QdrantClient. ensure_collection() is idempotent (checks collection_exists before creating with cosine distance). upsert_technique_pages() and upsert_key_moments() build PointStruct objects with full metadata payloads and upsert them, catching all Qdrant errors without re-raising. Added stage6_embed_and_index Celery task to stages.py that loads KeyMoments and TechniquePages for a video, embeds their text, and upserts to Qdrant. Uses max_retries=0 and catches all exceptions — embedding failures never fail the pipeline. Updated run_pipeline to include stage6 in both chain paths.
## Verification
All 9 verification checks passed. Three task-level import checks (EmbeddingClient, QdrantManager, stage6_embed_and_index) exit 0. Full slice-level verification (Settings defaults, schemas, LLMClient, celery_app, grep for openai/qdrant-client) all pass. Celery task registry shows all 6 tasks registered. Structural validation confirmed sync openai.OpenAI usage, config-driven dimensions, idempotent collection creation, non-blocking error handling, and metadata payloads.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')"` | 0 | ✅ pass | 400ms |
| 2 | `cd backend && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')"` | 0 | ✅ pass | 400ms |
| 3 | `cd backend && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"` | 0 | ✅ pass | 500ms |
| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 300ms |
| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 300ms |
| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 300ms |
| 7 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 400ms |
| 8 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
| 9 | `cd backend && python -c "from worker import celery_app; tasks = [t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t]; print(tasks)"` | 0 | ✅ pass | 400ms |
## Deviations
None.
## Known Issues
None.
## Files Created/Modified
- `backend/pipeline/embedding_client.py`
- `backend/pipeline/qdrant_client.py`
- `backend/pipeline/stages.py`
## Deviations
None.
## Known Issues
None.

View file

@ -0,0 +1,88 @@
"""Synchronous embedding client using the OpenAI-compatible /v1/embeddings API.
Uses ``openai.OpenAI`` (sync) since Celery tasks run synchronously.
Handles connection failures gracefully embedding is non-blocking for the pipeline.
"""
from __future__ import annotations
import logging
import openai
from config import Settings
logger = logging.getLogger(__name__)
class EmbeddingClient:
"""Sync embedding client backed by an OpenAI-compatible /v1/embeddings endpoint."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._client = openai.OpenAI(
base_url=settings.embedding_api_url,
api_key=settings.llm_api_key,
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""Generate embedding vectors for a batch of texts.
Parameters
----------
texts:
List of strings to embed.
Returns
-------
list[list[float]]
Embedding vectors. Returns empty list on connection/timeout errors
so the pipeline can continue without embeddings.
"""
if not texts:
return []
try:
response = self._client.embeddings.create(
model=self.settings.embedding_model,
input=texts,
)
except (openai.APIConnectionError, openai.APITimeoutError) as exc:
logger.warning(
"Embedding API unavailable (%s: %s). Skipping %d texts.",
type(exc).__name__,
exc,
len(texts),
)
return []
except openai.APIError as exc:
logger.warning(
"Embedding API error (%s: %s). Skipping %d texts.",
type(exc).__name__,
exc,
len(texts),
)
return []
vectors = [item.embedding for item in response.data]
# Validate dimensions
expected_dim = self.settings.embedding_dimensions
for i, vec in enumerate(vectors):
if len(vec) != expected_dim:
logger.warning(
"Embedding dimension mismatch at index %d: expected %d, got %d. "
"Returning empty list.",
i,
expected_dim,
len(vec),
)
return []
logger.info(
"Generated %d embeddings (dim=%d) using model=%s",
len(vectors),
expected_dim,
self.settings.embedding_model,
)
return vectors

View file

@ -0,0 +1,184 @@
"""Qdrant vector database manager for collection lifecycle and point upserts.
Handles collection creation (idempotent) and batch upserts for technique pages
and key moments. Connection failures are non-blocking the pipeline continues
without search indexing.
"""
from __future__ import annotations
import logging
import uuid
from qdrant_client import QdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
from qdrant_client.models import Distance, PointStruct, VectorParams
from config import Settings
logger = logging.getLogger(__name__)
class QdrantManager:
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._client = QdrantClient(url=settings.qdrant_url)
self._collection = settings.qdrant_collection
# ── Collection management ────────────────────────────────────────────
def ensure_collection(self) -> None:
"""Create the collection if it does not already exist.
Uses cosine distance and the configured embedding dimensions.
"""
try:
if self._client.collection_exists(self._collection):
logger.info("Qdrant collection '%s' already exists.", self._collection)
return
self._client.create_collection(
collection_name=self._collection,
vectors_config=VectorParams(
size=self.settings.embedding_dimensions,
distance=Distance.COSINE,
),
)
logger.info(
"Created Qdrant collection '%s' (dim=%d, cosine).",
self._collection,
self.settings.embedding_dimensions,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant error during ensure_collection (%s). Skipping.",
exc,
)
except Exception as exc:
logger.warning(
"Qdrant connection failed during ensure_collection (%s: %s). Skipping.",
type(exc).__name__,
exc,
)
# ── Low-level upsert ─────────────────────────────────────────────────
def upsert_points(self, points: list[PointStruct]) -> None:
"""Upsert a batch of pre-built PointStruct objects."""
if not points:
return
try:
self._client.upsert(
collection_name=self._collection,
points=points,
)
logger.info(
"Upserted %d points to Qdrant collection '%s'.",
len(points),
self._collection,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant upsert failed (%s). %d points skipped.",
exc,
len(points),
)
except Exception as exc:
logger.warning(
"Qdrant upsert connection error (%s: %s). %d points skipped.",
type(exc).__name__,
exc,
len(points),
)
# ── High-level upserts ───────────────────────────────────────────────
def upsert_technique_pages(
self,
pages: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for technique pages.
Each page dict must contain:
page_id, creator_id, title, topic_category, topic_tags, summary
Parameters
----------
pages:
Metadata dicts, one per technique page.
vectors:
Corresponding embedding vectors (same order as pages).
"""
if len(pages) != len(vectors):
logger.warning(
"Technique-page count (%d) != vector count (%d). Skipping upsert.",
len(pages),
len(vectors),
)
return
points = []
for page, vector in zip(pages, vectors):
point = PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"type": "technique_page",
"page_id": page["page_id"],
"creator_id": page["creator_id"],
"title": page["title"],
"topic_category": page["topic_category"],
"topic_tags": page.get("topic_tags") or [],
"summary": page.get("summary") or "",
},
)
points.append(point)
self.upsert_points(points)
def upsert_key_moments(
self,
moments: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for key moments.
Each moment dict must contain:
moment_id, source_video_id, title, start_time, end_time, content_type
Parameters
----------
moments:
Metadata dicts, one per key moment.
vectors:
Corresponding embedding vectors (same order as moments).
"""
if len(moments) != len(vectors):
logger.warning(
"Key-moment count (%d) != vector count (%d). Skipping upsert.",
len(moments),
len(vectors),
)
return
points = []
for moment, vector in zip(moments, vectors):
point = PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"type": "key_moment",
"moment_id": moment["moment_id"],
"source_video_id": moment["source_video_id"],
"title": moment["title"],
"start_time": moment["start_time"],
"end_time": moment["end_time"],
"content_type": moment["content_type"],
},
)
points.append(point)
self.upsert_points(points)

View file

@ -30,7 +30,9 @@ from models import (
TechniquePage,
TranscriptSegment,
)
from pipeline.embedding_client import EmbeddingClient
from pipeline.llm_client import LLMClient
from pipeline.qdrant_client import QdrantManager
from pipeline.schemas import (
ClassificationResult,
ExtractionResult,
@ -577,6 +579,137 @@ def stage5_synthesis(self, video_id: str) -> str:
session.close()
# ── Stage 6: Embed & Index ───────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=0)
def stage6_embed_and_index(self, video_id: str) -> str:
"""Generate embeddings for technique pages and key moments, then upsert to Qdrant.
This is a non-blocking side-effect stage failures are logged but do not
fail the pipeline. Embeddings can be regenerated later. Does NOT update
processing_status.
Returns the video_id for chain compatibility.
"""
start = time.monotonic()
logger.info("Stage 6 (embed & index) starting for video_id=%s", video_id)
settings = get_settings()
session = _get_sync_session()
try:
# Load technique pages created for this video's moments
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
# Get unique technique page IDs from moments
page_ids = {m.technique_page_id for m in moments if m.technique_page_id is not None}
pages = []
if page_ids:
pages = (
session.execute(
select(TechniquePage).where(TechniquePage.id.in_(page_ids))
)
.scalars()
.all()
)
if not moments and not pages:
logger.info("Stage 6: No moments or pages for video_id=%s, skipping.", video_id)
return video_id
embed_client = EmbeddingClient(settings)
qdrant = QdrantManager(settings)
# Ensure collection exists before upserting
qdrant.ensure_collection()
# ── Embed & upsert technique pages ───────────────────────────────
if pages:
page_texts = []
page_dicts = []
for p in pages:
text = f"{p.title} {p.summary or ''} {p.topic_category or ''}"
page_texts.append(text.strip())
page_dicts.append({
"page_id": str(p.id),
"creator_id": str(p.creator_id),
"title": p.title,
"topic_category": p.topic_category or "",
"topic_tags": p.topic_tags or [],
"summary": p.summary or "",
})
page_vectors = embed_client.embed(page_texts)
if page_vectors:
qdrant.upsert_technique_pages(page_dicts, page_vectors)
logger.info(
"Stage 6: Upserted %d technique page vectors for video_id=%s",
len(page_vectors), video_id,
)
else:
logger.warning(
"Stage 6: Embedding returned empty for %d technique pages (video_id=%s). "
"Skipping page upsert.",
len(page_texts), video_id,
)
# ── Embed & upsert key moments ───────────────────────────────────
if moments:
moment_texts = []
moment_dicts = []
for m in moments:
text = f"{m.title} {m.summary or ''}"
moment_texts.append(text.strip())
moment_dicts.append({
"moment_id": str(m.id),
"source_video_id": str(m.source_video_id),
"title": m.title,
"start_time": m.start_time,
"end_time": m.end_time,
"content_type": m.content_type.value,
})
moment_vectors = embed_client.embed(moment_texts)
if moment_vectors:
qdrant.upsert_key_moments(moment_dicts, moment_vectors)
logger.info(
"Stage 6: Upserted %d key moment vectors for video_id=%s",
len(moment_vectors), video_id,
)
else:
logger.warning(
"Stage 6: Embedding returned empty for %d key moments (video_id=%s). "
"Skipping moment upsert.",
len(moment_texts), video_id,
)
elapsed = time.monotonic() - start
logger.info(
"Stage 6 (embed & index) completed for video_id=%s in %.1fs — "
"%d pages, %d moments processed",
video_id, elapsed, len(pages), len(moments),
)
return video_id
except Exception as exc:
# Non-blocking: log error but don't fail the pipeline
logger.error(
"Stage 6 failed for video_id=%s: %s. "
"Pipeline continues — embeddings can be regenerated later.",
video_id, exc,
)
return video_id
finally:
session.close()
# ── Orchestrator ─────────────────────────────────────────────────────────────
@celery_app.task
@ -618,11 +751,13 @@ def run_pipeline(video_id: str) -> str:
stage3_extraction.s(), # receives video_id from previous
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
]
elif status == ProcessingStatus.extracted:
stages = [
stage4_classification.s(video_id),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
]
elif status in (ProcessingStatus.reviewed, ProcessingStatus.published):
logger.info(