- Search now runs semantic + keyword in parallel, merges and deduplicates - Keyword results always included with match_context explaining WHY matched - Semantic results filtered by minimum score threshold (0.45) - match_context shows 'Creator: X', 'Tag: Y', 'Title match', 'Content: ...' - Qdrant points use deterministic uuid5 IDs (no more duplicates on reindex) - Embedding timeout raised from 300ms to 2s (Ollama needs it) - _enrich_qdrant_results reads creator_name from payload before DB fallback - Frontend displays match_context as highlighted bar on search result cards
235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
"""Qdrant vector database manager for collection lifecycle and point upserts.
|
|
|
|
Handles collection creation (idempotent) and batch upserts for technique pages
|
|
and key moments. Connection failures are non-blocking — the pipeline continues
|
|
without search indexing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import exceptions as qdrant_exceptions
|
|
from qdrant_client.models import Distance, PointStruct, VectorParams
|
|
|
|
from config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Namespace UUID for deterministic point IDs
|
|
_QDRANT_NAMESPACE = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
|
|
|
|
|
|
class QdrantManager:
|
|
"""Manages a Qdrant collection for Chrysopedia technique-page and key-moment vectors."""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self._client = QdrantClient(url=settings.qdrant_url)
|
|
self._collection = settings.qdrant_collection
|
|
|
|
# ── Collection management ────────────────────────────────────────────
|
|
|
|
def ensure_collection(self) -> None:
|
|
"""Create the collection if it does not already exist.
|
|
|
|
Uses cosine distance and the configured embedding dimensions.
|
|
"""
|
|
try:
|
|
if self._client.collection_exists(self._collection):
|
|
logger.info("Qdrant collection '%s' already exists.", self._collection)
|
|
return
|
|
|
|
self._client.create_collection(
|
|
collection_name=self._collection,
|
|
vectors_config=VectorParams(
|
|
size=self.settings.embedding_dimensions,
|
|
distance=Distance.COSINE,
|
|
),
|
|
)
|
|
logger.info(
|
|
"Created Qdrant collection '%s' (dim=%d, cosine).",
|
|
self._collection,
|
|
self.settings.embedding_dimensions,
|
|
)
|
|
except qdrant_exceptions.UnexpectedResponse as exc:
|
|
logger.warning(
|
|
"Qdrant error during ensure_collection (%s). Skipping.",
|
|
exc,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant connection failed during ensure_collection (%s: %s). Skipping.",
|
|
type(exc).__name__,
|
|
exc,
|
|
)
|
|
|
|
# ── Deletion ───────────────────────────────────────────────────────────
|
|
|
|
def delete_by_video_id(self, video_id: str) -> int:
|
|
"""Delete all points (key moments + technique pages) associated with a video.
|
|
|
|
Key moments have source_video_id in payload.
|
|
Technique pages don't have direct video linkage, so only moments are deleted.
|
|
|
|
Returns the count of deleted points (best-effort — Qdrant may not report exact counts).
|
|
"""
|
|
from qdrant_client.models import Filter, FieldCondition, MatchValue
|
|
|
|
try:
|
|
result = self._client.delete(
|
|
collection_name=self._collection,
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="source_video_id",
|
|
match=MatchValue(value=video_id),
|
|
),
|
|
],
|
|
),
|
|
)
|
|
logger.info(
|
|
"Deleted Qdrant points for video_id=%s from collection '%s'.",
|
|
video_id,
|
|
self._collection,
|
|
)
|
|
return 0 # Qdrant delete doesn't return count
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant delete for video_id=%s failed (%s: %s). Skipping.",
|
|
video_id,
|
|
type(exc).__name__,
|
|
exc,
|
|
)
|
|
return 0
|
|
|
|
# ── Low-level upsert ─────────────────────────────────────────────────
|
|
|
|
def upsert_points(self, points: list[PointStruct]) -> None:
|
|
"""Upsert a batch of pre-built PointStruct objects."""
|
|
if not points:
|
|
return
|
|
try:
|
|
self._client.upsert(
|
|
collection_name=self._collection,
|
|
points=points,
|
|
)
|
|
logger.info(
|
|
"Upserted %d points to Qdrant collection '%s'.",
|
|
len(points),
|
|
self._collection,
|
|
)
|
|
except qdrant_exceptions.UnexpectedResponse as exc:
|
|
logger.warning(
|
|
"Qdrant upsert failed (%s). %d points skipped.",
|
|
exc,
|
|
len(points),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Qdrant upsert connection error (%s: %s). %d points skipped.",
|
|
type(exc).__name__,
|
|
exc,
|
|
len(points),
|
|
)
|
|
|
|
# ── High-level upserts ───────────────────────────────────────────────
|
|
|
|
def upsert_technique_pages(
|
|
self,
|
|
pages: list[dict],
|
|
vectors: list[list[float]],
|
|
) -> None:
|
|
"""Build and upsert PointStructs for technique pages.
|
|
|
|
Each page dict must contain:
|
|
page_id, creator_id, title, topic_category, topic_tags, summary
|
|
|
|
Parameters
|
|
----------
|
|
pages:
|
|
Metadata dicts, one per technique page.
|
|
vectors:
|
|
Corresponding embedding vectors (same order as pages).
|
|
"""
|
|
if len(pages) != len(vectors):
|
|
logger.warning(
|
|
"Technique-page count (%d) != vector count (%d). Skipping upsert.",
|
|
len(pages),
|
|
len(vectors),
|
|
)
|
|
return
|
|
|
|
points = []
|
|
for page, vector in zip(pages, vectors):
|
|
# Deterministic UUID: same page always gets the same point ID
|
|
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"tp:{page['page_id']}"))
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=vector,
|
|
payload={
|
|
"type": "technique_page",
|
|
"page_id": page["page_id"],
|
|
"creator_id": page["creator_id"],
|
|
"creator_name": page.get("creator_name", ""),
|
|
"title": page["title"],
|
|
"slug": page.get("slug", ""),
|
|
"topic_category": page["topic_category"],
|
|
"topic_tags": page.get("topic_tags") or [],
|
|
"summary": page.get("summary") or "",
|
|
},
|
|
)
|
|
points.append(point)
|
|
|
|
self.upsert_points(points)
|
|
|
|
def upsert_key_moments(
|
|
self,
|
|
moments: list[dict],
|
|
vectors: list[list[float]],
|
|
) -> None:
|
|
"""Build and upsert PointStructs for key moments.
|
|
|
|
Each moment dict must contain:
|
|
moment_id, source_video_id, title, start_time, end_time, content_type
|
|
|
|
Parameters
|
|
----------
|
|
moments:
|
|
Metadata dicts, one per key moment.
|
|
vectors:
|
|
Corresponding embedding vectors (same order as moments).
|
|
"""
|
|
if len(moments) != len(vectors):
|
|
logger.warning(
|
|
"Key-moment count (%d) != vector count (%d). Skipping upsert.",
|
|
len(moments),
|
|
len(vectors),
|
|
)
|
|
return
|
|
|
|
points = []
|
|
for moment, vector in zip(moments, vectors):
|
|
# Deterministic UUID: same moment always gets the same point ID
|
|
point_id = str(uuid.uuid5(_QDRANT_NAMESPACE, f"km:{moment['moment_id']}"))
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=vector,
|
|
payload={
|
|
"type": "key_moment",
|
|
"moment_id": moment["moment_id"],
|
|
"source_video_id": moment["source_video_id"],
|
|
"technique_page_id": moment.get("technique_page_id", ""),
|
|
"technique_page_slug": moment.get("technique_page_slug", ""),
|
|
"title": moment["title"],
|
|
"creator_name": moment.get("creator_name", ""),
|
|
"start_time": moment["start_time"],
|
|
"end_time": moment["end_time"],
|
|
"content_type": moment["content_type"],
|
|
},
|
|
)
|
|
points.append(point)
|
|
|
|
self.upsert_points(points)
|