chrysopedia/backend/pipeline/qdrant_client.py
jlightner e17132bd60 feat: Add bulk pipeline reprocessing — creator filter, multi-select, clean retrigger
- Backend: POST /admin/pipeline/clean-retrigger/{video_id} endpoint that
  deletes pipeline_events, key_moments, transcript_segments, and Qdrant
  vectors before retriggering the pipeline
- Backend: QdrantManager.delete_by_video_id() for vector cleanup
- Frontend: Creator filter dropdown on pipeline admin page
- Frontend: Checkbox selection column with select-all
- Frontend: Bulk toolbar with Retrigger Selected and Clean Reprocess
  actions, sequential dispatch with progress bar, cancel support
- Bulk dispatch uses 500ms delay between requests to avoid slamming API
2026-03-31 15:24:59 +00:00

226 lines
8 KiB
Python

"""Qdrant vector database manager for collection lifecycle and point upserts.
Handles collection creation (idempotent) and batch upserts for technique pages
and key moments. Connection failures are non-blocking — the pipeline continues
without search indexing.
"""
from __future__ import annotations
import logging
import uuid
from qdrant_client import QdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
from qdrant_client.models import Distance, PointStruct, VectorParams
from config import Settings
logger = logging.getLogger(__name__)
class QdrantManager:
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._client = QdrantClient(url=settings.qdrant_url)
self._collection = settings.qdrant_collection
# ── Collection management ────────────────────────────────────────────
def ensure_collection(self) -> None:
"""Create the collection if it does not already exist.
Uses cosine distance and the configured embedding dimensions.
"""
try:
if self._client.collection_exists(self._collection):
logger.info("Qdrant collection '%s' already exists.", self._collection)
return
self._client.create_collection(
collection_name=self._collection,
vectors_config=VectorParams(
size=self.settings.embedding_dimensions,
distance=Distance.COSINE,
),
)
logger.info(
"Created Qdrant collection '%s' (dim=%d, cosine).",
self._collection,
self.settings.embedding_dimensions,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant error during ensure_collection (%s). Skipping.",
exc,
)
except Exception as exc:
logger.warning(
"Qdrant connection failed during ensure_collection (%s: %s). Skipping.",
type(exc).__name__,
exc,
)
# ── Deletion ───────────────────────────────────────────────────────────
def delete_by_video_id(self, video_id: str) -> int:
"""Delete all points (key moments + technique pages) associated with a video.
Key moments have source_video_id in payload.
Technique pages don't have direct video linkage, so only moments are deleted.
Returns the count of deleted points (best-effort — Qdrant may not report exact counts).
"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
try:
result = self._client.delete(
collection_name=self._collection,
points_selector=Filter(
must=[
FieldCondition(
key="source_video_id",
match=MatchValue(value=video_id),
),
],
),
)
logger.info(
"Deleted Qdrant points for video_id=%s from collection '%s'.",
video_id,
self._collection,
)
return 0 # Qdrant delete doesn't return count
except Exception as exc:
logger.warning(
"Qdrant delete for video_id=%s failed (%s: %s). Skipping.",
video_id,
type(exc).__name__,
exc,
)
return 0
# ── Low-level upsert ─────────────────────────────────────────────────
def upsert_points(self, points: list[PointStruct]) -> None:
"""Upsert a batch of pre-built PointStruct objects."""
if not points:
return
try:
self._client.upsert(
collection_name=self._collection,
points=points,
)
logger.info(
"Upserted %d points to Qdrant collection '%s'.",
len(points),
self._collection,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant upsert failed (%s). %d points skipped.",
exc,
len(points),
)
except Exception as exc:
logger.warning(
"Qdrant upsert connection error (%s: %s). %d points skipped.",
type(exc).__name__,
exc,
len(points),
)
# ── High-level upserts ───────────────────────────────────────────────
def upsert_technique_pages(
self,
pages: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for technique pages.
Each page dict must contain:
page_id, creator_id, title, topic_category, topic_tags, summary
Parameters
----------
pages:
Metadata dicts, one per technique page.
vectors:
Corresponding embedding vectors (same order as pages).
"""
if len(pages) != len(vectors):
logger.warning(
"Technique-page count (%d) != vector count (%d). Skipping upsert.",
len(pages),
len(vectors),
)
return
points = []
for page, vector in zip(pages, vectors):
point = PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"type": "technique_page",
"page_id": page["page_id"],
"creator_id": page["creator_id"],
"title": page["title"],
"slug": page.get("slug", ""),
"topic_category": page["topic_category"],
"topic_tags": page.get("topic_tags") or [],
"summary": page.get("summary") or "",
},
)
points.append(point)
self.upsert_points(points)
def upsert_key_moments(
self,
moments: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for key moments.
Each moment dict must contain:
moment_id, source_video_id, title, start_time, end_time, content_type
Parameters
----------
moments:
Metadata dicts, one per key moment.
vectors:
Corresponding embedding vectors (same order as moments).
"""
if len(moments) != len(vectors):
logger.warning(
"Key-moment count (%d) != vector count (%d). Skipping upsert.",
len(moments),
len(vectors),
)
return
points = []
for moment, vector in zip(moments, vectors):
point = PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"type": "key_moment",
"moment_id": moment["moment_id"],
"source_video_id": moment["source_video_id"],
"technique_page_id": moment.get("technique_page_id", ""),
"technique_page_slug": moment.get("technique_page_slug", ""),
"title": moment["title"],
"start_time": moment["start_time"],
"end_time": moment["end_time"],
"content_type": moment["content_type"],
},
)
points.append(point)
self.upsert_points(points)