- "backend/schemas.py" - "backend/pipeline/stages.py" - "backend/pipeline/qdrant_client.py" - "backend/search_service.py" - "backend/pipeline/test_section_embedding.py" GSD-Task: S07/T01
319 lines
12 KiB
Python
319 lines
12 KiB
Python
"""Qdrant vector database manager for collection lifecycle and point upserts.
|
|
|
|
Handles collection creation (idempotent) and batch upserts for technique pages
|
|
and key moments. Connection failures are non-blocking — the pipeline continues
|
|
without search indexing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import exceptions as qdrant_exceptions
|
|
from qdrant_client.models import Distance, PointStruct, VectorParams
|
|
|
|
from config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Namespace UUID for deterministic point IDs
|
|
_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
|
|
|
|
|
|
class QdrantManager:
|
|
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self._client = QdrantClient(url=settings.qdrant_url)
|
|
self._collection = settings.qdrant_collection
|
|
|
|
# ── Collection management ────────────────────────────────────────────
|
|
|
|
def ensure_collection(self) -> None:
|
|
"""Create the collection if it does not already exist.
|
|
|
|
Uses cosine distance and the configured embedding dimensions.
|
|
"""
|
|
try:
|
|
if self._client.collection_exists(self._collection):
|
|
logger.info("Qdrant collection '%s' already exists.", self._collection)
|
|
return
|
|
|
|
self._client.create_collection(
|
|
collection_name=self._collection,
|
|
vectors_config=VectorParams(
|
|
size=self.settings.embedding_dimensions,
|
|
distance=Distance.COSINE,
|
|
),
|
|
)
|
|
logger.info(
|
|
"Created Qdrant collection '%s' (dim=%d, cosine).",
|
|
self._collection,
|
|
self.settings.embedding_dimensions,
|
|
)
|
|
except qdrant_exceptions.UnexpectedResponse as exc:
|
|
logger.warning(
|
|
"Qdrant error during ensure_collection (%s). Skipping.",
|
|
exc,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant connection failed during ensure_collection (%s: %s). Skipping.",
|
|
type(exc).__name__,
|
|
exc,
|
|
)
|
|
|
|
# ── Deletion ───────────────────────────────────────────────────────────
|
|
|
|
def delete_by_video_id(self, video_id: str) -> int:
|
|
"""Delete all points (key moments + technique pages) associated with a video.
|
|
|
|
Key moments have source_video_id in payload.
|
|
Technique pages don't have direct video linkage, so only moments are deleted.
|
|
|
|
Returns the count of deleted points (best-effort — Qdrant may not report exact counts).
|
|
"""
|
|
from qdrant_client.models import Filter, FieldCondition, MatchValue
|
|
|
|
try:
|
|
result = self._client.delete(
|
|
collection_name=self._collection,
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="source_video_id",
|
|
match=MatchValue(value=video_id),
|
|
),
|
|
],
|
|
),
|
|
)
|
|
logger.info(
|
|
"Deleted Qdrant points for video_id=%s from collection '%s'.",
|
|
video_id,
|
|
self._collection,
|
|
)
|
|
return 0 # Qdrant delete doesn't return count
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant delete for video_id=%s failed (%s: %s). Skipping.",
|
|
video_id,
|
|
type(exc).__name__,
|
|
exc,
|
|
)
|
|
return 0
|
|
|
|
# ── Low-level upsert ─────────────────────────────────────────────────
|
|
|
|
def upsert_points(self, points: list[PointStruct]) -> None:
|
|
"""Upsert a batch of pre-built PointStruct objects."""
|
|
if not points:
|
|
return
|
|
try:
|
|
self._client.upsert(
|
|
collection_name=self._collection,
|
|
points=points,
|
|
)
|
|
logger.info(
|
|
"Upserted %d points to Qdrant collection '%s'.",
|
|
len(points),
|
|
self._collection,
|
|
)
|
|
except qdrant_exceptions.UnexpectedResponse as exc:
|
|
logger.warning(
|
|
"Qdrant upsert failed (%s). %d points skipped.",
|
|
exc,
|
|
len(points),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant upsert connection error (%s: %s). %d points skipped.",
|
|
type(exc).__name__,
|
|
exc,
|
|
len(points),
|
|
)
|
|
|
|
# ── High-level upserts ───────────────────────────────────────────────
|
|
|
|
def upsert_technique_pages(
|
|
self,
|
|
pages: list[dict],
|
|
vectors: list[list[float]],
|
|
) -> None:
|
|
"""Build and upsert PointStructs for technique pages.
|
|
|
|
Each page dict must contain:
|
|
page_id, creator_id, title, topic_category, topic_tags, summary
|
|
|
|
Parameters
|
|
----------
|
|
pages:
|
|
Metadata dicts, one per technique page.
|
|
vectors:
|
|
Corresponding embedding vectors (same order as pages).
|
|
"""
|
|
if len(pages) != len(vectors):
|
|
logger.warning(
|
|
"Technique-page count (%d) != vector count (%d). Skipping upsert.",
|
|
len(pages),
|
|
len(vectors),
|
|
)
|
|
return
|
|
|
|
points = []
|
|
for page, vector in zip(pages, vectors):
|
|
# Deterministic UUID: same page always gets the same point ID
|
|
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"tp:{page['page_id']}"))
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=vector,
|
|
payload={
|
|
"type": "technique_page",
|
|
"page_id": page["page_id"],
|
|
"creator_id": page["creator_id"],
|
|
"creator_name": page.get("creator_name", ""),
|
|
"title": page["title"],
|
|
"slug": page.get("slug", ""),
|
|
"topic_category": page["topic_category"],
|
|
"topic_tags": page.get("topic_tags") or [],
|
|
"summary": page.get("summary") or "",
|
|
},
|
|
)
|
|
points.append(point)
|
|
|
|
self.upsert_points(points)
|
|
|
|
def upsert_key_moments(
|
|
self,
|
|
moments: list[dict],
|
|
vectors: list[list[float]],
|
|
) -> None:
|
|
"""Build and upsert PointStructs for key moments.
|
|
|
|
Each moment dict must contain:
|
|
moment_id, source_video_id, title, start_time, end_time, content_type
|
|
|
|
Parameters
|
|
----------
|
|
moments:
|
|
Metadata dicts, one per key moment.
|
|
vectors:
|
|
Corresponding embedding vectors (same order as moments).
|
|
"""
|
|
if len(moments) != len(vectors):
|
|
logger.warning(
|
|
"Key-moment count (%d) != vector count (%d). Skipping upsert.",
|
|
len(moments),
|
|
len(vectors),
|
|
)
|
|
return
|
|
|
|
points = []
|
|
for moment, vector in zip(moments, vectors):
|
|
# Deterministic UUID: same moment always gets the same point ID
|
|
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"km:{moment['moment_id']}"))
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=vector,
|
|
payload={
|
|
"type": "key_moment",
|
|
"moment_id": moment["moment_id"],
|
|
"source_video_id": moment["source_video_id"],
|
|
"technique_page_id": moment.get("technique_page_id", ""),
|
|
"technique_page_slug": moment.get("technique_page_slug", ""),
|
|
"title": moment["title"],
|
|
"creator_name": moment.get("creator_name", ""),
|
|
"start_time": moment["start_time"],
|
|
"end_time": moment["end_time"],
|
|
"content_type": moment["content_type"],
|
|
},
|
|
)
|
|
points.append(point)
|
|
|
|
self.upsert_points(points)
|
|
|
|
# ── Technique section operations ─────────────────────────────────────
|
|
|
|
def delete_sections_by_page_id(self, page_id: str) -> None:
|
|
"""Delete all technique_section points for a given page_id.
|
|
|
|
Called before re-upserting sections to prevent orphan points when
|
|
headings are renamed or sections removed. Non-blocking — logs warning
|
|
on failure.
|
|
"""
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
|
|
|
try:
|
|
self._client.delete(
|
|
collection_name=self._collection,
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="page_id",
|
|
match=MatchValue(value=page_id),
|
|
),
|
|
FieldCondition(
|
|
key="type",
|
|
match=MatchValue(value="technique_section"),
|
|
),
|
|
],
|
|
),
|
|
)
|
|
logger.info(
|
|
"Deleted technique_section points for page_id=%s from '%s'.",
|
|
page_id, self._collection,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant delete sections for page_id=%s failed (%s: %s). Skipping.",
|
|
page_id, type(exc).__name__, exc,
|
|
)
|
|
|
|
def upsert_technique_sections(
|
|
self,
|
|
sections: list[dict],
|
|
vectors: list[list[float]],
|
|
) -> None:
|
|
"""Build and upsert PointStructs for technique page sections.
|
|
|
|
Each section dict must contain:
|
|
page_id, section_anchor, section_heading, creator_id, creator_name,
|
|
title (page title), slug (page slug), topic_category, topic_tags, summary
|
|
|
|
Uses deterministic UUIDs: ``uuid5(namespace, 'ts:{page_id}:{section_anchor}')``.
|
|
"""
|
|
if len(sections) != len(vectors):
|
|
logger.warning(
|
|
"Technique-section count (%d) != vector count (%d). Skipping upsert.",
|
|
len(sections), len(vectors),
|
|
)
|
|
return
|
|
|
|
points = []
|
|
for sec, vector in zip(sections, vectors):
|
|
point_id = str(uuid.uuid5(
|
|
_QDRANT_NAMESPACE,
|
|
f"ts:{sec['page_id']}:{sec['section_anchor']}",
|
|
))
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=vector,
|
|
payload={
|
|
"type": "technique_section",
|
|
"page_id": sec["page_id"],
|
|
"creator_id": sec.get("creator_id", ""),
|
|
"creator_name": sec.get("creator_name", ""),
|
|
"title": sec.get("title", ""),
|
|
"slug": sec.get("slug", ""),
|
|
"section_heading": sec["section_heading"],
|
|
"section_anchor": sec["section_anchor"],
|
|
"topic_category": sec.get("topic_category", ""),
|
|
"topic_tags": sec.get("topic_tags") or [],
|
|
"summary": (sec.get("summary") or "")[:200],
|
|
},
|
|
)
|
|
points.append(point)
|
|
|
|
self.upsert_points(points)
|