chrysopedia/backend/pipeline/qdrant_client.py
jlightner 17b43d9778 feat: Added LightRAG /query/data as primary search engine with file_sou…
- "backend/config.py"
- "backend/search_service.py"

GSD-Task: S01/T01
2026-04-04 04:44:24 +00:00

320 lines
12 KiB
Python

"""Qdrant vector database manager for collection lifecycle and point upserts.
Handles collection creation (idempotent) and batch upserts for technique pages
and key moments. Connection failures are non-blocking — the pipeline continues
without search indexing.
"""
from __future__ import annotations
import logging
import uuid
from qdrant_client import QdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
from qdrant_client.models import Distance, PointStruct, VectorParams
from config import Settings
logger = logging.getLogger(__name__)
# Namespace UUID for deterministic point IDs
_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
class QdrantManager:
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._client = QdrantClient(url=settings.qdrant_url)
self._collection = settings.qdrant_collection
# ── Collection management ────────────────────────────────────────────
def ensure_collection(self) -> None:
"""Create the collection if it does not already exist.
Uses cosine distance and the configured embedding dimensions.
"""
try:
if self._client.collection_exists(self._collection):
logger.info("Qdrant collection '%s' already exists.", self._collection)
return
self._client.create_collection(
collection_name=self._collection,
vectors_config=VectorParams(
size=self.settings.embedding_dimensions,
distance=Distance.COSINE,
),
)
logger.info(
"Created Qdrant collection '%s' (dim=%d, cosine).",
self._collection,
self.settings.embedding_dimensions,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant error during ensure_collection (%s). Skipping.",
exc,
)
except Exception as exc:
logger.warning(
"Qdrant connection failed during ensure_collection (%s: %s). Skipping.",
type(exc).__name__,
exc,
)
# ── Deletion ───────────────────────────────────────────────────────────
def delete_by_video_id(self, video_id: str) -> int:
"""Delete all points (key moments + technique pages) associated with a video.
Key moments have source_video_id in payload.
Technique pages don't have direct video linkage, so only moments are deleted.
Returns the count of deleted points (best-effort — Qdrant may not report exact counts).
"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
try:
result = self._client.delete(
collection_name=self._collection,
points_selector=Filter(
must=[
FieldCondition(
key="source_video_id",
match=MatchValue(value=video_id),
),
],
),
)
logger.info(
"Deleted Qdrant points for video_id=%s from collection '%s'.",
video_id,
self._collection,
)
return 0 # Qdrant delete doesn't return count
except Exception as exc:
logger.warning(
"Qdrant delete for video_id=%s failed (%s: %s). Skipping.",
video_id,
type(exc).__name__,
exc,
)
return 0
# ── Low-level upsert ─────────────────────────────────────────────────
def upsert_points(self, points: list[PointStruct]) -> None:
"""Upsert a batch of pre-built PointStruct objects."""
if not points:
return
try:
self._client.upsert(
collection_name=self._collection,
points=points,
)
logger.info(
"Upserted %d points to Qdrant collection '%s'.",
len(points),
self._collection,
)
except qdrant_exceptions.UnexpectedResponse as exc:
logger.warning(
"Qdrant upsert failed (%s). %d points skipped.",
exc,
len(points),
)
except Exception as exc:
logger.warning(
"Qdrant upsert connection error (%s: %s). %d points skipped.",
type(exc).__name__,
exc,
len(points),
)
# ── High-level upserts ───────────────────────────────────────────────
def upsert_technique_pages(
self,
pages: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for technique pages.
Each page dict must contain:
page_id, creator_id, title, topic_category, topic_tags, summary
Parameters
----------
pages:
Metadata dicts, one per technique page.
vectors:
Corresponding embedding vectors (same order as pages).
"""
if len(pages) != len(vectors):
logger.warning(
"Technique-page count (%d) != vector count (%d). Skipping upsert.",
len(pages),
len(vectors),
)
return
points = []
for page, vector in zip(pages, vectors):
# Deterministic UUID: same page always gets the same point ID
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"tp:{page['page_id']}"))
point = PointStruct(
id=point_id,
vector=vector,
payload={
"type": "technique_page",
"page_id": page["page_id"],
"creator_id": page["creator_id"],
"creator_name": page.get("creator_name", ""),
"title": page["title"],
"slug": page.get("slug", ""),
"topic_category": page["topic_category"],
"topic_tags": page.get("topic_tags") or [],
"summary": page.get("summary") or "",
},
)
points.append(point)
self.upsert_points(points)
def upsert_key_moments(
self,
moments: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for key moments.
Each moment dict must contain:
moment_id, source_video_id, title, start_time, end_time, content_type
Parameters
----------
moments:
Metadata dicts, one per key moment.
vectors:
Corresponding embedding vectors (same order as moments).
"""
if len(moments) != len(vectors):
logger.warning(
"Key-moment count (%d) != vector count (%d). Skipping upsert.",
len(moments),
len(vectors),
)
return
points = []
for moment, vector in zip(moments, vectors):
# Deterministic UUID: same moment always gets the same point ID
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"km:{moment['moment_id']}"))
point = PointStruct(
id=point_id,
vector=vector,
payload={
"type": "key_moment",
"moment_id": moment["moment_id"],
"source_video_id": moment["source_video_id"],
"creator_id": moment.get("creator_id", ""),
"technique_page_id": moment.get("technique_page_id", ""),
"technique_page_slug": moment.get("technique_page_slug", ""),
"title": moment["title"],
"creator_name": moment.get("creator_name", ""),
"start_time": moment["start_time"],
"end_time": moment["end_time"],
"content_type": moment["content_type"],
},
)
points.append(point)
self.upsert_points(points)
# ── Technique section operations ─────────────────────────────────────
def delete_sections_by_page_id(self, page_id: str) -> None:
"""Delete all technique_section points for a given page_id.
Called before re-upserting sections to prevent orphan points when
headings are renamed or sections removed. Non-blocking — logs warning
on failure.
"""
from qdrant_client.models import FieldCondition, Filter, MatchValue
try:
self._client.delete(
collection_name=self._collection,
points_selector=Filter(
must=[
FieldCondition(
key="page_id",
match=MatchValue(value=page_id),
),
FieldCondition(
key="type",
match=MatchValue(value="technique_section"),
),
],
),
)
logger.info(
"Deleted technique_section points for page_id=%s from '%s'.",
page_id, self._collection,
)
except Exception as exc:
logger.warning(
"Qdrant delete sections for page_id=%s failed (%s: %s). Skipping.",
page_id, type(exc).__name__, exc,
)
def upsert_technique_sections(
self,
sections: list[dict],
vectors: list[list[float]],
) -> None:
"""Build and upsert PointStructs for technique page sections.
Each section dict must contain:
page_id, section_anchor, section_heading, creator_id, creator_name,
title (page title), slug (page slug), topic_category, topic_tags, summary
Uses deterministic UUIDs: ``uuid5(namespace, 'ts:{page_id}:{section_anchor}')``.
"""
if len(sections) != len(vectors):
logger.warning(
"Technique-section count (%d) != vector count (%d). Skipping upsert.",
len(sections), len(vectors),
)
return
points = []
for sec, vector in zip(sections, vectors):
point_id = str(uuid.uuid5(
_QDRANT_NAMESPACE,
f"ts:{sec['page_id']}:{sec['section_anchor']}",
))
point = PointStruct(
id=point_id,
vector=vector,
payload={
"type": "technique_section",
"page_id": sec["page_id"],
"creator_id": sec.get("creator_id", ""),
"creator_name": sec.get("creator_name", ""),
"title": sec.get("title", ""),
"slug": sec.get("slug", ""),
"section_heading": sec["section_heading"],
"section_anchor": sec["section_anchor"],
"topic_category": sec.get("topic_category", ""),
"topic_tags": sec.get("topic_tags") or [],
"summary": (sec.get("summary") or "")[:200],
},
)
points.append(point)
self.upsert_points(points)