feat: Added LightRAG /query/data as primary search engine with file_sou…

- "backend/config.py"
- "backend/search_service.py"

GSD-Task: S01/T01
This commit is contained in:
jlightner 2026-04-04 04:44:24 +00:00
parent 90dea3bde1
commit 4917fd3a32
25 changed files with 1723 additions and 46 deletions

View file

@ -11,7 +11,7 @@ ssh ub01
cd /vmPool/r/repos/xpltdco/chrysopedia
```
**GitHub:** https://github.com/xpltdco/chrysopedia (private, xpltdco org)
**Git:** https://git.xpltd.co/xpltdco/chrysopedia (Forgejo, xpltdco org)
## Why?

View file

@ -0,0 +1,37 @@
"""Add impersonation_log table for admin impersonation audit trail.
Revision ID: 018_add_impersonation_log
Revises: 017_add_consent_tables
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = "018_add_impersonation_log"
down_revision = "017_add_consent_tables"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"impersonation_log",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("admin_user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
sa.Column("target_user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
sa.Column("action", sa.String(10), nullable=False), # 'start' or 'stop'
sa.Column("ip_address", sa.String(45), nullable=True),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_impersonation_log_admin", "impersonation_log", ["admin_user_id"])
op.create_index("ix_impersonation_log_target", "impersonation_log", ["target_user_id"])
op.create_index("ix_impersonation_log_created", "impersonation_log", ["created_at"])
def downgrade() -> None:
op.drop_index("ix_impersonation_log_created")
op.drop_index("ix_impersonation_log_target")
op.drop_index("ix_impersonation_log_admin")
op.drop_table("impersonation_log")

View file

@ -56,6 +56,32 @@ def create_access_token(
return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM)
_IMPERSONATION_EXPIRE_MINUTES = 60 # 1 hour
def create_impersonation_token(
admin_user_id: uuid.UUID | str,
target_user_id: uuid.UUID | str,
target_role: str,
) -> str:
"""Create a scoped JWT for admin impersonation.
The token has sub=target_user_id so get_current_user loads the target,
plus original_user_id so the system knows it's impersonation.
"""
settings = get_settings()
now = datetime.now(timezone.utc)
payload = {
"sub": str(target_user_id),
"role": target_role,
"original_user_id": str(admin_user_id),
"type": "impersonation",
"iat": now,
"exp": now + timedelta(minutes=_IMPERSONATION_EXPIRE_MINUTES),
}
return jwt.encode(payload, settings.app_secret_key, algorithm=_ALGORITHM)
def decode_access_token(token: str) -> dict:
"""Decode and validate a JWT. Raises on expiry or malformed tokens."""
settings = get_settings()
@ -85,7 +111,11 @@ async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> User:
"""Decode JWT, load User from DB, raise 401 if missing or inactive."""
"""Decode JWT, load User from DB, raise 401 if missing or inactive.
If the token contains an original_user_id claim (impersonation),
sets _impersonating_admin_id on the returned user object.
"""
payload = decode_access_token(token)
user_id = payload.get("sub")
result = await session.execute(select(User).where(User.id == user_id))
@ -95,6 +125,8 @@ async def get_current_user(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
# Attach impersonation metadata (non-column runtime attribute)
user._impersonating_admin_id = payload.get("original_user_id") # type: ignore[attr-defined]
return user
@ -112,3 +144,16 @@ def require_role(required_role: UserRole):
return current_user
return _check
async def reject_impersonation(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Dependency that blocks write operations during impersonation."""
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is not None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Write operations are not allowed during impersonation",
)
return current_user

View file

@ -60,6 +60,11 @@ class Settings(BaseSettings):
qdrant_url: str = "http://localhost:6333"
qdrant_collection: str = "chrysopedia"
# LightRAG
lightrag_url: str = "http://chrysopedia-lightrag:9621"
lightrag_search_timeout: float = 2.0
lightrag_min_query_length: int = 3
# Prompt templates
prompts_path: str = "./prompts"

View file

@ -12,7 +12,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import get_settings
from routers import auth, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos
from routers import admin, auth, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos
def _setup_logging() -> None:
@ -78,6 +78,7 @@ app.add_middleware(
app.include_router(health.router)
# Versioned API
app.include_router(admin.router, prefix="/api/v1")
app.include_router(auth.router, prefix="/api/v1")
app.include_router(consent.router, prefix="/api/v1")
app.include_router(creator_dashboard.router, prefix="/api/v1")

View file

@ -654,3 +654,23 @@ class ConsentAuditLog(Base):
video_consent: Mapped[VideoConsent] = sa_relationship(
back_populates="audit_entries"
)
class ImpersonationLog(Base):
"""Audit trail for admin impersonation sessions."""
__tablename__ = "impersonation_log"
id: Mapped[uuid.UUID] = _uuid_pk()
admin_user_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True,
)
target_user_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True,
)
action: Mapped[str] = mapped_column(
String(10), nullable=False, doc="'start' or 'stop'"
)
ip_address: Mapped[str | None] = mapped_column(String(45), nullable=True)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)

View file

@ -221,6 +221,7 @@ class QdrantManager:
"type": "key_moment",
"moment_id": moment["moment_id"],
"source_video_id": moment["source_video_id"],
"creator_id": moment.get("creator_id", ""),
"technique_page_id": moment.get("technique_page_id", ""),
"technique_page_slug": moment.get("technique_page_slug", ""),
"title": moment["title"],

View file

@ -1673,11 +1673,12 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
video_creator_map: dict[str, str] = {}
if video_ids:
rows = session.execute(
select(SourceVideo.id, Creator.name)
select(SourceVideo.id, Creator.name, Creator.id.label("creator_id"))
.join(Creator, SourceVideo.creator_id == Creator.id)
.where(SourceVideo.id.in_(video_ids))
).all()
video_creator_map = {str(r[0]): r[1] for r in rows}
video_creator_id_map = {str(r[0]): str(r[2]) for r in rows}
embed_client = EmbeddingClient(settings)
qdrant = QdrantManager(settings)
@ -1737,6 +1738,7 @@ def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> st
moment_dicts.append({
"moment_id": str(m.id),
"source_video_id": str(m.source_video_id),
"creator_id": video_creator_id_map.get(str(m.source_video_id), ""),
"technique_page_id": tp_id,
"technique_page_slug": page_id_to_slug.get(tp_id, ""),
"title": m.title,

180
backend/routers/admin.py Normal file
View file

@ -0,0 +1,180 @@
"""Admin router — user management and impersonation."""
from __future__ import annotations
import logging
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import (
create_impersonation_token,
decode_access_token,
get_current_user,
require_role,
)
from database import get_session
from models import ImpersonationLog, User, UserRole
logger = logging.getLogger("chrysopedia.admin")
router = APIRouter(prefix="/admin", tags=["admin"])
_require_admin = require_role(UserRole.admin)
# ── Schemas ──────────────────────────────────────────────────────────────────
class UserListItem(BaseModel):
id: str
email: str
display_name: str
role: str
creator_id: str | None
is_active: bool
class Config:
from_attributes = True
class ImpersonateResponse(BaseModel):
access_token: str
token_type: str = "bearer"
target_user: UserListItem
class StopImpersonateResponse(BaseModel):
message: str
# ── Helpers ──────────────────────────────────────────────────────────────────
def _client_ip(request: Request) -> str | None:
"""Best-effort client IP from X-Forwarded-For or direct connection."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
# ── Endpoints ────────────────────────────────────────────────────────────────
@router.get("/users", response_model=list[UserListItem])
async def list_users(
_admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""List all users. Admin only."""
result = await session.execute(
select(User).order_by(User.display_name)
)
users = result.scalars().all()
return [
UserListItem(
id=str(u.id),
email=u.email,
display_name=u.display_name,
role=u.role.value,
creator_id=str(u.creator_id) if u.creator_id else None,
is_active=u.is_active,
)
for u in users
]
@router.post("/impersonate/{user_id}", response_model=ImpersonateResponse)
async def start_impersonation(
user_id: UUID,
request: Request,
admin: Annotated[User, Depends(_require_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Start impersonating a user. Admin only. Returns a scoped JWT."""
# Cannot impersonate yourself
if admin.id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot impersonate yourself",
)
# Load target user
result = await session.execute(select(User).where(User.id == user_id))
target = result.scalar_one_or_none()
if target is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Target user not found",
)
# Create impersonation token
token = create_impersonation_token(
admin_user_id=admin.id,
target_user_id=target.id,
target_role=target.role.value,
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin.id,
target_user_id=target.id,
action="start",
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation started: admin=%s target=%s",
admin.id, target.id,
)
return ImpersonateResponse(
access_token=token,
target_user=UserListItem(
id=str(target.id),
email=target.email,
display_name=target.display_name,
role=target.role.value,
creator_id=str(target.creator_id) if target.creator_id else None,
is_active=target.is_active,
),
)
@router.post("/impersonate/stop", response_model=StopImpersonateResponse)
async def stop_impersonation(
request: Request,
current_user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Stop impersonation. Requires a valid impersonation token."""
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not currently impersonating",
)
# Audit log
session.add(ImpersonationLog(
admin_user_id=admin_id,
target_user_id=current_user.id,
action="stop",
ip_address=_client_ip(request),
))
await session.commit()
logger.info(
"Impersonation stopped: admin=%s target=%s",
admin_id, current_user.id,
)
return StopImpersonateResponse(message="Impersonation ended")

View file

@ -14,6 +14,7 @@ from auth import (
create_access_token,
get_current_user,
hash_password,
reject_impersonation,
verify_password,
)
from database import get_session
@ -120,13 +121,17 @@ async def get_profile(
current_user: Annotated[User, Depends(get_current_user)],
):
"""Return the current user's profile."""
return current_user
resp = UserResponse.model_validate(current_user)
admin_id = getattr(current_user, "_impersonating_admin_id", None)
if admin_id is not None:
resp.impersonating = True
return resp
@router.put("/me", response_model=UserResponse)
async def update_profile(
body: UpdateProfileRequest,
current_user: Annotated[User, Depends(get_current_user)],
current_user: Annotated[User, Depends(reject_impersonation)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""Update the current user's display name and/or password."""

View file

@ -21,7 +21,7 @@ from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from auth import get_current_user, require_role
from auth import get_current_user, reject_impersonation, require_role
from database import get_session
from models import (
ConsentAuditLog,
@ -175,7 +175,7 @@ async def get_video_consent(
async def update_video_consent(
video_id: uuid.UUID,
body: VideoConsentUpdate,
current_user: Annotated[User, Depends(get_current_user)],
current_user: Annotated[User, Depends(reject_impersonation)],
session: Annotated[AsyncSession, Depends(get_session)],
request: Request,
):

View file

@ -566,6 +566,7 @@ class UserResponse(BaseModel):
creator_id: uuid.UUID | None = None
is_active: bool = True
created_at: datetime
impersonating: bool = False
class UpdateProfileRequest(BaseModel):

View file

@ -0,0 +1,547 @@
#!/usr/bin/env python3
"""A/B comparison of Chrysopedia's Qdrant search vs LightRAG retrieval.
Runs a set of queries against both backends and produces a scored comparison
report. Designed to run inside the chrysopedia-api container (has network
access to both services) or via tunneled URLs.
Usage:
# Dry run — show query set without executing
python3 /app/scripts/compare_search.py --dry-run
# Run first 5 queries
python3 /app/scripts/compare_search.py --limit 5
# Full comparison
python3 /app/scripts/compare_search.py
# Custom URLs
python3 /app/scripts/compare_search.py --api-url http://localhost:8000 --lightrag-url http://localhost:9621
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
logger = logging.getLogger("compare_search")
# ── Query set ────────────────────────────────────────────────────────────────
# Real user queries (from search_log)
USER_QUERIES = [
"squelch",
"keota snare",
"reverb",
"how does keota snare",
"bass",
"groove",
"drums",
"fx",
"textures",
"daw setup",
"synthesis",
"how does keota",
"over-leveling snare to control compression behavior",
]
# Curated domain queries — test different retrieval patterns
CURATED_QUERIES = [
# Broad topic queries
"bass design techniques",
"reverb chains and spatial effects",
"how to layer drums",
# Cross-entity synthesis (LightRAG strength)
"what plugins are commonly used for bass sounds",
"compare different approaches to snare layering",
"how do different producers approach sound design",
# Exact lookup (Qdrant strength)
"COPYCATT",
"Emperor arrangement",
# How-to / procedural
"how to create tension in a buildup",
"step by step resampling workflow",
# Concept queries
"frequency spectrum balance",
"signal chain for drums",
]
ALL_QUERIES = USER_QUERIES + CURATED_QUERIES
# ── Data structures ──────────────────────────────────────────────────────────
@dataclass
class SearchResult:
title: str
score: float
snippet: str
result_type: str = ""
creator: str = ""
slug: str = ""
@dataclass
class QdrantSearchResponse:
query: str
results: list[SearchResult] = field(default_factory=list)
partial_matches: list[SearchResult] = field(default_factory=list)
total: int = 0
latency_ms: float = 0.0
error: str = ""
@dataclass
class LightRAGResponse:
query: str
response_text: str = ""
references: list[dict[str, Any]] = field(default_factory=list)
latency_ms: float = 0.0
error: str = ""
@dataclass
class QueryComparison:
query: str
query_type: str # "user" or "curated"
qdrant: QdrantSearchResponse | None = None
lightrag: LightRAGResponse | None = None
# Scores (populated by scoring phase)
qdrant_relevance: float = 0.0
qdrant_coverage: int = 0
qdrant_diversity: int = 0
lightrag_relevance: float = 0.0
lightrag_coverage: int = 0
lightrag_answer_quality: float = 0.0
winner: str = "" # "qdrant", "lightrag", "tie"
# ── Qdrant search client ────────────────────────────────────────────────────
def query_qdrant_search(api_url: str, query: str, limit: int = 20) -> QdrantSearchResponse:
"""Query the Chrysopedia search API (Qdrant + keyword)."""
url = f"{api_url}/api/v1/search"
params = {"q": query, "scope": "all", "limit": limit}
start = time.monotonic()
try:
resp = httpx.get(url, params=params, timeout=15)
latency = (time.monotonic() - start) * 1000
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
latency = (time.monotonic() - start) * 1000
return QdrantSearchResponse(query=query, latency_ms=latency, error=str(e))
items = data.get("items", [])
partial = data.get("partial_matches", [])
results = [
SearchResult(
title=item.get("title", ""),
score=item.get("score", 0.0),
snippet=item.get("summary", "")[:200],
result_type=item.get("type", ""),
creator=item.get("creator_name", ""),
slug=item.get("slug", ""),
)
for item in items
]
partial_results = [
SearchResult(
title=item.get("title", ""),
score=item.get("score", 0.0),
snippet=item.get("summary", "")[:200],
result_type=item.get("type", ""),
creator=item.get("creator_name", ""),
slug=item.get("slug", ""),
)
for item in partial
]
return QdrantSearchResponse(
query=query,
results=results,
partial_matches=partial_results,
total=data.get("total", 0),
latency_ms=latency,
)
# ── LightRAG client ─────────────────────────────────────────────────────────
def query_lightrag(lightrag_url: str, query: str, mode: str = "hybrid") -> LightRAGResponse:
"""Query the LightRAG API."""
url = f"{lightrag_url}/query"
payload = {"query": query, "mode": mode}
start = time.monotonic()
try:
# LightRAG queries involve LLM inference — can take 2-4 minutes each
resp = httpx.post(url, json=payload, timeout=300)
latency = (time.monotonic() - start) * 1000
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
latency = (time.monotonic() - start) * 1000
return LightRAGResponse(query=query, latency_ms=latency, error=str(e))
return LightRAGResponse(
query=query,
response_text=data.get("response", ""),
references=[
{"id": ref.get("reference_id", ""), "file_path": ref.get("file_path", "")}
for ref in data.get("references", [])
],
latency_ms=latency,
)
# ── Scoring ──────────────────────────────────────────────────────────────────
def _token_overlap(query: str, text: str) -> float:
"""Fraction of query tokens found in text (case-insensitive)."""
if not text:
return 0.0
query_tokens = {t.lower() for t in query.split() if len(t) > 2}
if not query_tokens:
return 0.0
text_lower = text.lower()
matched = sum(1 for t in query_tokens if t in text_lower)
return matched / len(query_tokens)
def score_qdrant_results(comp: QueryComparison) -> None:
"""Score Qdrant results on relevance, coverage, and diversity."""
if not comp.qdrant or comp.qdrant.error:
return
results = comp.qdrant.results
if not results:
# Check partial matches
results = comp.qdrant.partial_matches
if not results:
comp.qdrant_relevance = 0.0
comp.qdrant_coverage = 0
comp.qdrant_diversity = 0
return
# Relevance: average token overlap across top-5 results
overlaps = []
for r in results[:5]:
combined = f"{r.title} {r.snippet} {r.creator}"
overlaps.append(_token_overlap(comp.query, combined))
comp.qdrant_relevance = round((sum(overlaps) / len(overlaps)) * 5, 2) if overlaps else 0.0
# Coverage: unique technique pages
slugs = {r.slug for r in results if r.slug}
comp.qdrant_coverage = len(slugs)
# Diversity: unique creators
creators = {r.creator for r in results if r.creator}
comp.qdrant_diversity = len(creators)
def score_lightrag_results(comp: QueryComparison) -> None:
"""Score LightRAG results on relevance, coverage, and answer quality."""
if not comp.lightrag or comp.lightrag.error:
return
text = comp.lightrag.response_text
refs = comp.lightrag.references
if not text:
comp.lightrag_relevance = 0.0
comp.lightrag_coverage = 0
comp.lightrag_answer_quality = 0.0
return
# Relevance: token overlap between query and response
comp.lightrag_relevance = round(_token_overlap(comp.query, text) * 5, 2)
# Coverage: unique technique pages referenced
unique_sources = {r["file_path"] for r in refs if r.get("file_path")}
comp.lightrag_coverage = len(unique_sources)
# Answer quality (0-5 composite):
quality = 0.0
# Length: longer synthesized answers are generally better (up to a point)
word_count = len(text.split())
if word_count > 20:
quality += 1.0
if word_count > 100:
quality += 0.5
if word_count > 200:
quality += 0.5
# References: more cross-page references = better synthesis
if len(unique_sources) >= 2:
quality += 1.0
if len(unique_sources) >= 4:
quality += 0.5
# Structure: has headings, bullet points, or numbered lists
if "**" in text or "##" in text:
quality += 0.5
if "- " in text or "* " in text:
quality += 0.5
# Doesn't say "no information available" or similar
negative_phrases = ["no information", "not mentioned", "no data", "cannot find"]
has_negative = any(phrase in text.lower() for phrase in negative_phrases)
if not has_negative:
quality += 0.5
else:
quality -= 1.0
comp.lightrag_answer_quality = round(min(quality, 5.0), 2)
def determine_winner(comp: QueryComparison) -> None:
"""Determine which backend wins for this query."""
# Composite score: relevance weight 0.4, coverage 0.3, quality/diversity 0.3
qdrant_score = (
comp.qdrant_relevance * 0.4
+ min(comp.qdrant_coverage, 5) * 0.3
+ min(comp.qdrant_diversity, 3) * 0.3
)
lightrag_score = (
comp.lightrag_relevance * 0.4
+ min(comp.lightrag_coverage, 5) * 0.3
+ comp.lightrag_answer_quality * 0.3
)
if abs(qdrant_score - lightrag_score) < 0.5:
comp.winner = "tie"
elif qdrant_score > lightrag_score:
comp.winner = "qdrant"
else:
comp.winner = "lightrag"
# ── Report generation ────────────────────────────────────────────────────────
def generate_markdown_report(comparisons: list[QueryComparison], output_dir: Path) -> Path:
"""Generate a human-readable markdown comparison report."""
lines: list[str] = []
lines.append("# Search A/B Comparison: Qdrant vs LightRAG")
lines.append(f"\n_Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}_")
lines.append(f"\n**Queries evaluated:** {len(comparisons)}")
# Aggregate stats
wins = {"qdrant": 0, "lightrag": 0, "tie": 0}
qdrant_latencies = []
lightrag_latencies = []
for c in comparisons:
wins[c.winner] += 1
if c.qdrant and not c.qdrant.error:
qdrant_latencies.append(c.qdrant.latency_ms)
if c.lightrag and not c.lightrag.error:
lightrag_latencies.append(c.lightrag.latency_ms)
lines.append("\n## Aggregate Results\n")
lines.append(f"| Metric | Qdrant Search | LightRAG |")
lines.append(f"|--------|:-------------:|:--------:|")
lines.append(f"| **Wins** | {wins['qdrant']} | {wins['lightrag']} |")
lines.append(f"| **Ties** | {wins['tie']} | {wins['tie']} |")
avg_q_str = f"{sum(qdrant_latencies) / len(qdrant_latencies):.0f}ms" if qdrant_latencies else "N/A"
avg_l_str = f"{sum(lightrag_latencies) / len(lightrag_latencies):.0f}ms" if lightrag_latencies else "N/A"
lines.append(f"| **Avg latency** | {avg_q_str} | {avg_l_str} |")
avg_qr = sum(c.qdrant_relevance for c in comparisons) / len(comparisons) if comparisons else 0
avg_lr = sum(c.lightrag_relevance for c in comparisons) / len(comparisons) if comparisons else 0
lines.append(f"| **Avg relevance** | {avg_qr:.2f}/5 | {avg_lr:.2f}/5 |")
avg_qc = sum(c.qdrant_coverage for c in comparisons) / len(comparisons) if comparisons else 0
avg_lc = sum(c.lightrag_coverage for c in comparisons) / len(comparisons) if comparisons else 0
lines.append(f"| **Avg coverage** | {avg_qc:.1f} pages | {avg_lc:.1f} refs |")
# Per-query detail
lines.append("\n## Per-Query Comparison\n")
lines.append("| # | Query | Type | Qdrant Rel | LR Rel | Qdrant Cov | LR Cov | LR Quality | Winner |")
lines.append("|---|-------|------|:----------:|:------:|:----------:|:------:|:----------:|:------:|")
for i, c in enumerate(comparisons, 1):
q_display = c.query[:45] + "" if len(c.query) > 45 else c.query
winner_emoji = {"qdrant": "🔵", "lightrag": "🟢", "tie": ""}[c.winner]
lines.append(
f"| {i} | {q_display} | {c.query_type} | {c.qdrant_relevance:.1f} | "
f"{c.lightrag_relevance:.1f} | {c.qdrant_coverage} | {c.lightrag_coverage} | "
f"{c.lightrag_answer_quality:.1f} | {winner_emoji} {c.winner} |"
)
# Detailed results for interesting queries
lines.append("\n## Notable Comparisons\n")
# Pick queries where there's a clear winner with interesting differences
notable = [c for c in comparisons if c.winner != "tie"][:5]
for c in notable:
lines.append(f"### Query: \"{c.query}\"\n")
lines.append(f"**Winner: {c.winner}**\n")
if c.qdrant and c.qdrant.results:
lines.append("**Qdrant results:**")
for r in c.qdrant.results[:3]:
lines.append(f"- {r.title} (by {r.creator}, score: {r.score:.2f})")
lines.append("")
if c.lightrag and c.lightrag.response_text:
# Show first 300 chars of LightRAG response
preview = c.lightrag.response_text[:300]
if len(c.lightrag.response_text) > 300:
preview += ""
lines.append(f"**LightRAG response preview:**")
lines.append(f"> {preview}\n")
if c.lightrag.references:
ref_slugs = [r["file_path"] for r in c.lightrag.references[:5]]
lines.append(f"References: {', '.join(ref_slugs)}\n")
# Data coverage note
lines.append("\n## Data Coverage Note\n")
lines.append(
"LightRAG has 18 of 93 technique pages indexed. "
"Results may improve significantly after full reindexing. "
"Qdrant has all 93 pages embedded."
)
report_path = output_dir / "comparison_report.md"
report_path.write_text("\n".join(lines), encoding="utf-8")
return report_path
def generate_json_report(comparisons: list[QueryComparison], output_dir: Path) -> Path:
"""Write full structured comparison data to JSON."""
def _serialize(obj):
if hasattr(obj, "__dict__"):
return {k: _serialize(v) for k, v in obj.__dict__.items()}
if isinstance(obj, list):
return [_serialize(i) for i in obj]
if isinstance(obj, dict):
return {k: _serialize(v) for k, v in obj.items()}
return obj
data = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"query_count": len(comparisons),
"comparisons": [_serialize(c) for c in comparisons],
}
report_path = output_dir / "comparison_report.json"
report_path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
return report_path
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="A/B compare Qdrant search vs LightRAG")
parser.add_argument(
"--api-url",
default=os.environ.get("API_URL", "http://127.0.0.1:8000"),
help="Chrysopedia API base URL (default: http://127.0.0.1:8000)",
)
parser.add_argument(
"--lightrag-url",
default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"),
help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)",
)
parser.add_argument(
"--output-dir",
default=os.environ.get("OUTPUT_DIR", "/app/scripts/output"),
help="Output directory for reports",
)
parser.add_argument("--limit", type=int, default=None, help="Process only first N queries")
parser.add_argument("--dry-run", action="store_true", help="Show query set without executing")
parser.add_argument("--verbose", "-v", action="store_true", help="Debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
queries = ALL_QUERIES[:args.limit] if args.limit else ALL_QUERIES
if args.dry_run:
print(f"Query set ({len(queries)} queries):")
for i, q in enumerate(queries, 1):
qtype = "user" if q in USER_QUERIES else "curated"
print(f" {i:2d}. [{qtype:>7s}] {q}")
return
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
comparisons: list[QueryComparison] = []
for i, query in enumerate(queries, 1):
qtype = "user" if query in USER_QUERIES else "curated"
logger.info("[%d/%d] Query: %r (%s)", i, len(queries), query, qtype)
# Query both backends
qdrant_resp = query_qdrant_search(args.api_url, query)
lightrag_resp = query_lightrag(args.lightrag_url, query)
if qdrant_resp.error:
logger.warning(" Qdrant error: %s", qdrant_resp.error)
else:
logger.info(" Qdrant: %d results in %.0fms", qdrant_resp.total, qdrant_resp.latency_ms)
if lightrag_resp.error:
logger.warning(" LightRAG error: %s", lightrag_resp.error)
else:
ref_count = len(lightrag_resp.references)
word_count = len(lightrag_resp.response_text.split())
logger.info(" LightRAG: %d words, %d refs in %.0fms", word_count, ref_count, lightrag_resp.latency_ms)
comp = QueryComparison(query=query, query_type=qtype, qdrant=qdrant_resp, lightrag=lightrag_resp)
# Score
score_qdrant_results(comp)
score_lightrag_results(comp)
determine_winner(comp)
logger.info(
" Scores → Qdrant: rel=%.1f cov=%d div=%d | LightRAG: rel=%.1f cov=%d qual=%.1f | Winner: %s",
comp.qdrant_relevance, comp.qdrant_coverage, comp.qdrant_diversity,
comp.lightrag_relevance, comp.lightrag_coverage, comp.lightrag_answer_quality,
comp.winner,
)
comparisons.append(comp)
# Generate reports
logger.info("Generating reports...")
md_path = generate_markdown_report(comparisons, output_dir)
json_path = generate_json_report(comparisons, output_dir)
# Summary
wins = {"qdrant": 0, "lightrag": 0, "tie": 0}
for c in comparisons:
wins[c.winner] += 1
print(f"\n{'=' * 60}")
print(f"Comparison complete: {len(comparisons)} queries")
print(f" Qdrant wins: {wins['qdrant']}")
print(f" LightRAG wins: {wins['lightrag']}")
print(f" Ties: {wins['tie']}")
print(f"\nReports:")
print(f" {md_path}")
print(f" {json_path}")
print(f"{'=' * 60}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""Query LightRAG with optional creator scoping.
A developer CLI for testing LightRAG queries, including creator-biased
retrieval using ll_keywords. Also serves as the foundation for
creator-scoped chat in M021.
Usage:
# Basic query
python3 /app/scripts/lightrag_query.py --query "snare design"
# Creator-scoped query
python3 /app/scripts/lightrag_query.py --query "snare design" --creator "COPYCATT"
# Different modes
python3 /app/scripts/lightrag_query.py --query "bass techniques" --mode local
# JSON output
python3 /app/scripts/lightrag_query.py --query "reverb" --json
# Context only (no LLM generation)
python3 /app/scripts/lightrag_query.py --query "reverb" --context-only
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from typing import Any
import httpx
def query_lightrag(
lightrag_url: str,
query: str,
mode: str = "hybrid",
creator: str | None = None,
context_only: bool = False,
top_k: int | None = None,
) -> dict[str, Any]:
"""Query LightRAG with optional creator scoping.
When a creator name is provided, it's passed as a low-level keyword
to bias retrieval toward documents mentioning that creator.
Parameters
----------
lightrag_url:
LightRAG API base URL.
query:
The search/question text.
mode:
Query mode: local, global, hybrid, naive, mix, bypass.
creator:
Optional creator name to bias retrieval toward.
context_only:
If True, returns retrieved context without LLM generation.
top_k:
Number of top items to retrieve (optional).
Returns
-------
Dict with keys: response, references, latency_ms, error.
"""
url = f"{lightrag_url}/query"
payload: dict[str, Any] = {
"query": query,
"mode": mode,
"include_references": True,
}
if creator:
# Use ll_keywords to bias retrieval toward the creator
payload["ll_keywords"] = [creator]
# Also prepend creator context to the query for better matching
payload["query"] = f"{query} (by {creator})"
if context_only:
payload["only_need_context"] = True
if top_k:
payload["top_k"] = top_k
start = time.monotonic()
try:
resp = httpx.post(url, json=payload, timeout=300)
latency_ms = (time.monotonic() - start) * 1000
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
latency_ms = (time.monotonic() - start) * 1000
return {"response": "", "references": [], "latency_ms": latency_ms, "error": str(e)}
return {
"response": data.get("response", ""),
"references": data.get("references", []),
"latency_ms": latency_ms,
"error": "",
}
def format_response(result: dict[str, Any], json_output: bool = False) -> str:
"""Format the query result for display."""
if json_output:
return json.dumps(result, indent=2, default=str)
lines = []
if result["error"]:
lines.append(f"ERROR: {result['error']}")
return "\n".join(lines)
lines.append(f"Latency: {result['latency_ms']:.0f}ms")
lines.append(f"Word count: {len(result['response'].split())}")
lines.append("")
# Response text
lines.append("" * 60)
lines.append(result["response"])
lines.append("" * 60)
# References
refs = result.get("references", [])
if refs:
lines.append(f"\nReferences ({len(refs)}):")
for ref in refs:
fp = ref.get("file_path", "?")
rid = ref.get("reference_id", "?")
lines.append(f" [{rid}] {fp}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Query LightRAG with optional creator scoping")
parser.add_argument("--query", "-q", required=True, help="Query text")
parser.add_argument("--creator", "-c", default=None, help="Creator name to bias retrieval")
parser.add_argument(
"--mode", "-m",
default="hybrid",
choices=["local", "global", "hybrid", "naive", "mix", "bypass"],
help="Query mode (default: hybrid)",
)
parser.add_argument("--context-only", action="store_true", help="Return context without LLM generation")
parser.add_argument("--top-k", type=int, default=None, help="Number of top items to retrieve")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
parser.add_argument(
"--lightrag-url",
default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"),
help="LightRAG API base URL",
)
args = parser.parse_args()
if args.creator:
print(f"Query: {args.query}")
print(f"Creator scope: {args.creator}")
print(f"Mode: {args.mode}")
else:
print(f"Query: {args.query}")
print(f"Mode: {args.mode}")
print("Querying LightRAG...")
result = query_lightrag(
lightrag_url=args.lightrag_url,
query=args.query,
mode=args.mode,
creator=args.creator,
context_only=args.context_only,
top_k=args.top_k,
)
print(format_response(result, json_output=args.json_output))
if __name__ == "__main__":
main()

View file

@ -34,7 +34,7 @@ _script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
_backend_dir = os.path.dirname(_script_dir)
sys.path.insert(0, _backend_dir)
from models import Creator, KeyMoment, TechniquePage # noqa: E402
from models import Creator, KeyMoment, SourceVideo, TechniquePage # noqa: E402
logger = logging.getLogger("reindex_lightrag")
@ -49,12 +49,12 @@ def get_sync_engine(db_url: str):
def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]:
"""Load all technique pages with creator and key moments eagerly."""
"""Load all technique pages with creator, key moments, and source videos eagerly."""
query = (
session.query(TechniquePage)
.options(
joinedload(TechniquePage.creator),
joinedload(TechniquePage.key_moments),
joinedload(TechniquePage.key_moments).joinedload(KeyMoment.source_video),
)
.order_by(TechniquePage.title)
)
@ -102,26 +102,42 @@ def _format_v2_sections(body_sections: list[dict]) -> str:
def format_technique_page(page: TechniquePage) -> str:
"""Convert a TechniquePage + relations into a rich text document for LightRAG."""
"""Convert a TechniquePage + relations into a rich text document for LightRAG.
Includes structured provenance metadata for entity extraction and
creator-scoped retrieval.
"""
lines = []
# Header metadata
# ── Structured provenance block ──────────────────────────────────────
lines.append(f"Technique: {page.title}")
if page.creator:
lines.append(f"Creator: {page.creator.name}")
lines.append(f"Creator ID: {page.creator_id}")
lines.append(f"Category: {page.topic_category or 'Uncategorized'}")
if page.topic_tags:
lines.append(f"Tags: {', '.join(page.topic_tags)}")
if page.plugins:
lines.append(f"Plugins: {', '.join(page.plugins)}")
# Source video provenance
if page.key_moments:
video_ids: dict[str, str] = {}
for km in page.key_moments:
sv = getattr(km, "source_video", None)
if sv and str(sv.id) not in video_ids:
video_ids[str(sv.id)] = sv.filename
if video_ids:
lines.append(f"Source Videos: {', '.join(video_ids.values())}")
lines.append(f"Source Video IDs: {', '.join(video_ids.keys())}")
lines.append("")
# Summary
# ── Summary ──────────────────────────────────────────────────────────
if page.summary:
lines.append(f"Summary: {page.summary}")
lines.append("")
# Body sections — handle both formats
# ── Body sections ────────────────────────────────────────────────────
if page.body_sections:
fmt = getattr(page, "body_sections_format", "v1") or "v1"
if fmt == "v2" and isinstance(page.body_sections, list):
@ -134,19 +150,26 @@ def format_technique_page(page: TechniquePage) -> str:
else:
lines.append(str(page.body_sections))
# Key moments from source videos
# ── Key moments with source attribution ──────────────────────────────
if page.key_moments:
lines.append("Key Moments from Source Videos:")
for km in page.key_moments:
lines.append(f"- {km.title}: {km.summary}")
sv = getattr(km, "source_video", None)
source_info = f" (Source: {sv.filename})" if sv else ""
lines.append(f"- {km.title}: {km.summary}{source_info}")
lines.append("")
return "\n".join(lines).strip()
def file_source_for_page(page: TechniquePage) -> str:
"""Deterministic file_source identifier for a technique page."""
return f"technique:{page.slug}"
"""Deterministic file_source identifier for a technique page.
Encodes creator_id for provenance tracking. Format:
technique:{slug}:creator:{creator_id}
"""
creator_id = str(page.creator_id) if page.creator_id else "unknown"
return f"technique:{page.slug}:creator:{creator_id}"
# ── LightRAG API ─────────────────────────────────────────────────────────────
@ -173,6 +196,47 @@ def get_processed_sources(lightrag_url: str) -> set[str]:
return sources
def clear_all_documents(lightrag_url: str) -> bool:
"""Delete all documents from LightRAG. Returns True on success.
Uses the /documents/delete_document endpoint with doc_ids (not file_path).
"""
# Get all document IDs
try:
resp = httpx.get(f"{lightrag_url}/documents", timeout=30)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
logger.error("Failed to fetch documents for clearing: %s", e)
return False
doc_ids = []
for status_group in data.get("statuses", {}).values():
for doc in status_group:
did = doc.get("id")
if did:
doc_ids.append(did)
if not doc_ids:
logger.info("No documents to clear.")
return True
logger.info("Clearing %d documents from LightRAG...", len(doc_ids))
try:
resp = httpx.request(
"DELETE",
f"{lightrag_url}/documents/delete_document",
json={"doc_ids": doc_ids, "delete_llm_cache": True},
timeout=120,
)
resp.raise_for_status()
logger.info("Cleared %d documents.", len(doc_ids))
return True
except httpx.HTTPError as e:
logger.error("Failed to delete documents: %s", e)
return False
def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None:
"""Submit a text document to LightRAG. Returns response dict or None on error."""
url = f"{lightrag_url}/documents/text"
@ -232,6 +296,16 @@ def main():
action="store_true",
help="Format and preview pages without submitting to LightRAG",
)
parser.add_argument(
"--force",
action="store_true",
help="Skip resume check — resubmit all pages even if already processed",
)
parser.add_argument(
"--clear-first",
action="store_true",
help="Delete all existing LightRAG documents before reindexing",
)
parser.add_argument(
"--limit",
type=int,
@ -271,9 +345,13 @@ def main():
session.close()
return
# Resume support — get already-processed sources
# Clear existing documents if requested
if args.clear_first and not args.dry_run:
clear_all_documents(args.lightrag_url)
# Resume support — get already-processed sources (skip if --force)
processed_sources: set[str] = set()
if not args.dry_run:
if not args.dry_run and not args.force:
logger.info("Checking LightRAG for already-processed documents...")
processed_sources = get_processed_sources(args.lightrag_url)
logger.info("Found %d existing document(s) in LightRAG", len(processed_sources))

View file

@ -10,9 +10,11 @@ from __future__ import annotations
import asyncio
import logging
import re
import time
from typing import Any
import httpx
import openai
from qdrant_client import AsyncQdrantClient
from qdrant_client.http import exceptions as qdrant_exceptions
@ -50,6 +52,13 @@ class SearchService:
self._qdrant = AsyncQdrantClient(url=settings.qdrant_url)
self._collection = settings.qdrant_collection
# LightRAG client
self._httpx = httpx.AsyncClient(
timeout=httpx.Timeout(settings.lightrag_search_timeout),
)
self._lightrag_url = settings.lightrag_url
self._lightrag_min_query_length = settings.lightrag_min_query_length
# ── Embedding ────────────────────────────────────────────────────────
async def embed_query(self, text: str) -> list[float] | None:
@ -392,6 +401,177 @@ class SearchService:
return partial
# ── LightRAG search ───────────────────────────────────────────────────
# Regex to parse file_source format: technique:{slug}:creator:{creator_id}
_FILE_SOURCE_RE = re.compile(r"^technique:(?P<slug>[^:]+):creator:(?P<creator_id>.+)$")
async def _lightrag_search(
self,
query: str,
limit: int,
db: AsyncSession,
) -> list[dict[str, Any]]:
"""Query LightRAG /query/data for entities, relationships, and chunks.
Maps results back to SearchResultItem dicts using file_source parsing
and DB batch lookup. Returns empty list on any failure (timeout,
connection, parse error) with a WARNING log caller falls back.
"""
start = time.monotonic()
try:
resp = await self._httpx.post(
f"{self._lightrag_url}/query/data",
json={"query": query, "mode": "hybrid", "top_k": limit},
)
resp.raise_for_status()
body = resp.json()
except httpx.TimeoutException:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"lightrag_search_fallback reason=timeout query=%r latency_ms=%.1f",
query, elapsed_ms,
)
return []
except httpx.HTTPError as exc:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"lightrag_search_fallback reason=http_error query=%r error=%s latency_ms=%.1f",
query, exc, elapsed_ms,
)
return []
except Exception as exc:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"lightrag_search_fallback reason=unexpected query=%r error=%s latency_ms=%.1f",
query, exc, elapsed_ms,
)
return []
# Parse response
try:
data = body.get("data", {})
if not data:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"lightrag_search_fallback reason=empty_data query=%r latency_ms=%.1f",
query, elapsed_ms,
)
return []
chunks = data.get("chunks", [])
entities = data.get("entities", [])
# relationships = data.get("relationships", []) # available for future use
# Extract technique slugs from chunk file_path/file_source fields
slug_set: set[str] = set()
slug_order: list[str] = [] # preserve retrieval rank
for chunk in chunks:
file_path = chunk.get("file_path", "")
m = self._FILE_SOURCE_RE.match(file_path)
if m and m.group("slug") not in slug_set:
slug = m.group("slug")
slug_set.add(slug)
slug_order.append(slug)
# Also try to extract slugs from entity names by matching DB later
entity_names: list[str] = []
for ent in entities:
name = ent.get("entity_name", "")
if name:
entity_names.append(name)
if not slug_set and not entity_names:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"lightrag_search_fallback reason=no_parseable_results query=%r "
"chunks=%d entities=%d latency_ms=%.1f",
query, len(chunks), len(entities), elapsed_ms,
)
return []
# Batch-lookup technique pages by slug
tp_map: dict[str, tuple] = {} # slug → (TechniquePage, Creator)
if slug_set:
tp_stmt = (
select(TechniquePage, Creator)
.join(Creator, TechniquePage.creator_id == Creator.id)
.where(TechniquePage.slug.in_(list(slug_set)))
)
tp_rows = await db.execute(tp_stmt)
for tp, cr in tp_rows.all():
tp_map[tp.slug] = (tp, cr)
# If we have entity names but no chunk matches, try matching
# entity names against technique page titles or creator names
if entity_names and not tp_map:
entity_name_pats = [f"%{name}%" for name in entity_names[:20]]
tp_stmt2 = (
select(TechniquePage, Creator)
.join(Creator, TechniquePage.creator_id == Creator.id)
.where(
or_(
TechniquePage.title.ilike(func.any_(entity_name_pats)),
Creator.name.ilike(func.any_(entity_name_pats)),
)
)
.limit(limit)
)
tp_rows2 = await db.execute(tp_stmt2)
for tp, cr in tp_rows2.all():
if tp.slug not in tp_map:
tp_map[tp.slug] = (tp, cr)
if tp.slug not in slug_set:
slug_order.append(tp.slug)
slug_set.add(tp.slug)
# Build result items in retrieval-rank order
results: list[dict[str, Any]] = []
seen_slugs: set[str] = set()
for idx, slug in enumerate(slug_order):
if slug in seen_slugs:
continue
seen_slugs.add(slug)
pair = tp_map.get(slug)
if not pair:
continue
tp, cr = pair
# Score: higher rank → higher score (1.0 down to ~0.5)
score = max(1.0 - (idx * 0.05), 0.5)
results.append({
"type": "technique_page",
"title": tp.title,
"slug": tp.slug,
"technique_page_slug": tp.slug,
"summary": tp.summary or "",
"topic_category": tp.topic_category,
"topic_tags": tp.topic_tags or [],
"creator_id": str(tp.creator_id),
"creator_name": cr.name,
"creator_slug": cr.slug,
"created_at": tp.created_at.isoformat() if tp.created_at else "",
"score": score,
"match_context": "LightRAG graph match",
})
if len(results) >= limit:
break
elapsed_ms = (time.monotonic() - start) * 1000
logger.info(
"lightrag_search query=%r latency_ms=%.1f result_count=%d chunks=%d entities=%d",
query, elapsed_ms, len(results), len(chunks), len(entities),
)
return results
except (KeyError, ValueError, TypeError) as exc:
elapsed_ms = (time.monotonic() - start) * 1000
body_snippet = str(body)[:200] if body else "<empty>"
logger.warning(
"lightrag_search_fallback reason=parse_error query=%r error=%s body=%.200s latency_ms=%.1f",
query, exc, body_snippet, elapsed_ms,
)
return []
# ── Orchestrator ─────────────────────────────────────────────────────
async def search(
@ -418,17 +598,23 @@ class SearchService:
if scope not in ("all", "topics", "creators"):
scope = "all"
# Map scope to Qdrant type filter
# topics scope: no filter — both technique_page and technique_section
# should appear in semantic results
type_filter_map = {
"all": None,
"topics": None,
"creators": None,
}
qdrant_type_filter = type_filter_map.get(scope)
# ── Primary: try LightRAG for queries ≥ min length ─────────────
lightrag_results: list[dict[str, Any]] = []
fallback_used = True # assume fallback until LightRAG succeeds
# Run both searches in parallel
use_lightrag = len(query) >= self._lightrag_min_query_length
if use_lightrag:
lightrag_results = await self._lightrag_search(query, limit, db)
if lightrag_results:
fallback_used = False
# ── Keyword search always runs (for merge/dedup) ─────────────
async def _keyword():
return await self.keyword_search(query, scope, limit, db, sort=sort)
# ── Fallback: Qdrant semantic (only when LightRAG didn't deliver) ──
qdrant_type_filter = None # no type filter — all result types welcome
async def _semantic():
vector = await self.embed_query(query)
if vector is None:
@ -444,14 +630,17 @@ class SearchService:
filtered.append(item)
return filtered
async def _keyword():
return await self.keyword_search(query, scope, limit, db, sort=sort)
semantic_results, kw_result = await asyncio.gather(
_semantic(),
_keyword(),
return_exceptions=True,
)
if fallback_used:
# LightRAG returned nothing — run Qdrant semantic + keyword in parallel
semantic_results, kw_result = await asyncio.gather(
_semantic(),
_keyword(),
return_exceptions=True,
)
else:
# LightRAG succeeded — only need keyword for supplementary merge
semantic_results = []
kw_result = await _keyword()
# Handle exceptions gracefully
if isinstance(semantic_results, Exception):
@ -464,8 +653,7 @@ class SearchService:
kw_items = kw_result["items"]
partial_matches = kw_result.get("partial_matches", [])
# Merge: keyword results first (they have explicit match_context),
# then semantic results that aren't already present
# Merge: LightRAG results first (primary), then keyword, then Qdrant semantic
seen_keys: set[str] = set()
merged: list[dict[str, Any]] = []
@ -475,6 +663,12 @@ class SearchService:
title = item.get("title", "")
return f"{t}:{s}:{title}"
for item in lightrag_results:
key = _dedup_key(item)
if key not in seen_keys:
seen_keys.add(key)
merged.append(item)
for item in kw_items:
key = _dedup_key(item)
if key not in seen_keys:
@ -490,13 +684,13 @@ class SearchService:
# Apply sort
merged = self._apply_sort(merged, sort)
fallback_used = len(kw_items) > 0 and len(semantic_results) == 0
# fallback_used is already set above
elapsed_ms = (time.monotonic() - start) * 1000
logger.info(
"Search query=%r scope=%s keyword=%d semantic=%d merged=%d partial=%d latency_ms=%.1f",
query, scope, len(kw_items), len(semantic_results),
len(merged), len(partial_matches), elapsed_ms,
"Search query=%r scope=%s lightrag=%d keyword=%d semantic=%d merged=%d partial=%d fallback=%s latency_ms=%.1f",
query, scope, len(lightrag_results), len(kw_items), len(semantic_results),
len(merged), len(partial_matches), fallback_used, elapsed_ms,
)
return {

View file

@ -19,7 +19,9 @@ const CreatorDashboard = React.lazy(() => import("./pages/CreatorDashboard"));
const CreatorSettings = React.lazy(() => import("./pages/CreatorSettings"));
const ConsentDashboard = React.lazy(() => import("./pages/ConsentDashboard"));
const WatchPage = React.lazy(() => import("./pages/WatchPage"));
const AdminUsers = React.lazy(() => import("./pages/AdminUsers"));
import AdminDropdown from "./components/AdminDropdown";
import ImpersonationBanner from "./components/ImpersonationBanner";
import AppFooter from "./components/AppFooter";
import SearchAutocomplete from "./components/SearchAutocomplete";
import ProtectedRoute from "./components/ProtectedRoute";
@ -100,6 +102,7 @@ function AppShell() {
return (
<div className="app">
<ImpersonationBanner />
<a href="#main-content" className="skip-link">Skip to content</a>
<header className="app-header" ref={headerRef}>
<Link to="/" className="app-header__brand">
@ -179,6 +182,7 @@ function AppShell() {
<Route path="/admin/reports" element={<Suspense fallback={<LoadingFallback />}><AdminReports /></Suspense>} />
<Route path="/admin/pipeline" element={<Suspense fallback={<LoadingFallback />}><AdminPipeline /></Suspense>} />
<Route path="/admin/techniques" element={<Suspense fallback={<LoadingFallback />}><AdminTechniquePages /></Suspense>} />
<Route path="/admin/users" element={<Suspense fallback={<LoadingFallback />}><AdminUsers /></Suspense>} />
{/* Info routes */}
<Route path="/about" element={<Suspense fallback={<LoadingFallback />}><About /></Suspense>} />

View file

@ -28,6 +28,22 @@ export interface UserResponse {
creator_id: string | null;
is_active: boolean;
created_at: string;
impersonating?: boolean;
}
export interface UserListItem {
id: string;
email: string;
display_name: string;
role: string;
creator_id: string | null;
is_active: boolean;
}
export interface ImpersonateResponse {
access_token: string;
token_type: string;
target_user: UserListItem;
}
export interface UpdateProfileRequest {
@ -68,3 +84,33 @@ export async function authUpdateProfile(
body: JSON.stringify(data),
});
}
// ── Admin: Impersonation ─────────────────────────────────────────────────────
export async function fetchUsers(token: string): Promise<UserListItem[]> {
return request<UserListItem[]>(`${BASE}/admin/users`, {
headers: { Authorization: `Bearer ${token}` },
});
}
export async function impersonateUser(
token: string,
userId: string,
): Promise<ImpersonateResponse> {
return request<ImpersonateResponse>(
`${BASE}/admin/impersonate/${userId}`,
{
method: "POST",
headers: { Authorization: `Bearer ${token}` },
},
);
}
export async function stopImpersonation(
token: string,
): Promise<{ message: string }> {
return request<{ message: string }>(`${BASE}/admin/impersonate/stop`, {
method: "POST",
headers: { Authorization: `Bearer ${token}` },
});
}

View file

@ -110,6 +110,14 @@ export default function AdminDropdown() {
>
Techniques
</Link>
<Link
to="/admin/users"
className="admin-dropdown__item"
role="menuitem"
onClick={() => setOpen(false)}
>
Users
</Link>
</div>
)}
</div>

View file

@ -0,0 +1,49 @@
.banner {
position: fixed;
top: 0;
left: 0;
right: 0;
z-index: 9999;
display: flex;
align-items: center;
justify-content: center;
gap: 1rem;
padding: 0.5rem 1rem;
background: #b45309;
color: #fff;
font-size: 0.85rem;
font-weight: 600;
letter-spacing: 0.02em;
}
.text {
display: flex;
align-items: center;
gap: 0.5rem;
}
.icon {
font-size: 1rem;
}
.exitBtn {
padding: 0.25rem 0.75rem;
border: 1px solid rgba(255, 255, 255, 0.5);
border-radius: 4px;
background: transparent;
color: #fff;
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
transition: background 150ms, border-color 150ms;
}
.exitBtn:hover {
background: rgba(255, 255, 255, 0.15);
border-color: #fff;
}
/* Push page content down when banner is showing */
:global(body.impersonating) {
padding-top: 40px;
}

View file

@ -0,0 +1,36 @@
import { useEffect } from "react";
import { useAuth } from "../context/AuthContext";
import styles from "./ImpersonationBanner.module.css";
/**
* Fixed amber banner shown when an admin is impersonating a creator.
* Adds body.impersonating class to push page content down.
*/
export default function ImpersonationBanner() {
const { isImpersonating, user, exitImpersonation } = useAuth();
useEffect(() => {
if (isImpersonating) {
document.body.classList.add("impersonating");
} else {
document.body.classList.remove("impersonating");
}
return () => {
document.body.classList.remove("impersonating");
};
}, [isImpersonating]);
if (!isImpersonating) return null;
return (
<div className={styles.banner} role="alert">
<span className={styles.text}>
<span className={styles.icon} aria-hidden="true">👁</span>
Viewing as <strong>{user?.display_name ?? "Unknown"}</strong>
</span>
<button className={styles.exitBtn} onClick={exitImpersonation}>
Exit
</button>
</div>
);
}

View file

@ -11,19 +11,26 @@ import {
authLogin,
authGetMe,
authRegister,
impersonateUser,
stopImpersonation as apiStopImpersonation,
ApiError,
type UserResponse,
type RegisterRequest,
} from "../api";
const ADMIN_TOKEN_KEY = "chrysopedia_admin_token";
interface AuthContextValue {
user: UserResponse | null;
token: string | null;
isAuthenticated: boolean;
isImpersonating: boolean;
loading: boolean;
login: (email: string, password: string) => Promise<void>;
register: (data: RegisterRequest) => Promise<UserResponse>;
logout: () => void;
startImpersonation: (userId: string) => Promise<void>;
exitImpersonation: () => Promise<void>;
}
const AuthContext = createContext<AuthContextValue | null>(null);
@ -77,20 +84,58 @@ export function AuthProvider({ children }: { children: ReactNode }) {
const logout = useCallback(() => {
localStorage.removeItem(AUTH_TOKEN_KEY);
sessionStorage.removeItem(ADMIN_TOKEN_KEY);
setToken(null);
setUser(null);
}, []);
const startImpersonation = useCallback(async (userId: string) => {
if (!token) return;
// Save admin token so we can restore it later
sessionStorage.setItem(ADMIN_TOKEN_KEY, token);
const resp = await impersonateUser(token, userId);
localStorage.setItem(AUTH_TOKEN_KEY, resp.access_token);
setToken(resp.access_token);
const me = await authGetMe(resp.access_token);
setUser(me);
}, [token]);
const exitImpersonation = useCallback(async () => {
// Try to call stop endpoint for audit log
if (token) {
try {
await apiStopImpersonation(token);
} catch {
// Best effort — still restore admin session
}
}
// Restore admin token
const adminToken = sessionStorage.getItem(ADMIN_TOKEN_KEY);
sessionStorage.removeItem(ADMIN_TOKEN_KEY);
if (adminToken) {
localStorage.setItem(AUTH_TOKEN_KEY, adminToken);
setToken(adminToken);
const me = await authGetMe(adminToken);
setUser(me);
} else {
// Fallback: just logout
logout();
}
}, [token, logout]);
return (
<AuthContext.Provider
value={{
user,
token,
isAuthenticated: !!user,
isImpersonating: !!user?.impersonating,
loading,
login,
register,
logout,
startImpersonation,
exitImpersonation,
}}
>
{children}

View file

@ -0,0 +1,96 @@
.page {
max-width: 900px;
margin: 0 auto;
padding: 2rem 1rem;
}
.title {
font-size: 1.5rem;
font-weight: 700;
margin-bottom: 1.5rem;
color: var(--text-primary, #e2e8f0);
}
.table {
width: 100%;
border-collapse: collapse;
font-size: 0.9rem;
}
.table th {
text-align: left;
padding: 0.6rem 0.75rem;
border-bottom: 2px solid var(--color-border, #2d2d3d);
color: var(--text-secondary, #828291);
font-weight: 600;
font-size: 0.8rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
.table td {
padding: 0.6rem 0.75rem;
border-bottom: 1px solid var(--color-border, #2d2d3d);
color: var(--text-primary, #e2e8f0);
}
.roleBadge {
display: inline-block;
padding: 0.15rem 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.03em;
}
.roleBadge[data-role="admin"] {
background: rgba(239, 68, 68, 0.15);
color: #ef4444;
}
.roleBadge[data-role="creator"] {
background: rgba(34, 211, 238, 0.15);
color: #22d3ee;
}
.viewAsBtn {
padding: 0.3rem 0.6rem;
border: 1px solid var(--color-accent, #22d3ee);
border-radius: 4px;
background: transparent;
color: var(--color-accent, #22d3ee);
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
transition: background 150ms, color 150ms;
}
.viewAsBtn:hover {
background: var(--color-accent, #22d3ee);
color: var(--color-bg, #0f0f1a);
}
.viewAsBtn:disabled {
opacity: 0.4;
cursor: not-allowed;
}
.loading,
.error,
.empty {
text-align: center;
padding: 3rem 1rem;
color: var(--text-secondary, #828291);
}
.error {
color: #ef4444;
}
@media (max-width: 600px) {
.table th:nth-child(2),
.table td:nth-child(2) {
display: none;
}
}

View file

@ -0,0 +1,96 @@
import { useEffect, useState } from "react";
import { useAuth } from "../context/AuthContext";
import { fetchUsers, type UserListItem } from "../api";
import { useDocumentTitle } from "../hooks/useDocumentTitle";
import styles from "./AdminUsers.module.css";
export default function AdminUsers() {
useDocumentTitle("Users — Admin");
const { token, startImpersonation, user: currentUser } = useAuth();
const [users, setUsers] = useState<UserListItem[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [impersonating, setImpersonating] = useState<string | null>(null);
useEffect(() => {
if (!token) return;
setLoading(true);
fetchUsers(token)
.then(setUsers)
.catch((e) => setError(e.message || "Failed to load users"))
.finally(() => setLoading(false));
}, [token]);
async function handleViewAs(userId: string) {
setImpersonating(userId);
try {
await startImpersonation(userId);
// Navigation will happen via auth context update
} catch (e: any) {
setError(e.message || "Failed to start impersonation");
setImpersonating(null);
}
}
if (loading) {
return (
<div className={styles.page}>
<h1 className={styles.title}>Users</h1>
<p className={styles.loading}>Loading users</p>
</div>
);
}
if (error) {
return (
<div className={styles.page}>
<h1 className={styles.title}>Users</h1>
<p className={styles.error}>{error}</p>
</div>
);
}
return (
<div className={styles.page}>
<h1 className={styles.title}>Users</h1>
{users.length === 0 ? (
<p className={styles.empty}>No users found.</p>
) : (
<table className={styles.table}>
<thead>
<tr>
<th>Name</th>
<th>Email</th>
<th>Role</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{users.map((u) => (
<tr key={u.id}>
<td>{u.display_name}</td>
<td>{u.email}</td>
<td>
<span className={styles.roleBadge} data-role={u.role}>
{u.role}
</span>
</td>
<td>
{u.role === "creator" && u.id !== currentUser?.id && (
<button
className={styles.viewAsBtn}
disabled={impersonating === u.id}
onClick={() => handleViewAs(u.id)}
>
{impersonating === u.id ? "Switching…" : "View As"}
</button>
)}
</td>
</tr>
))}
</tbody>
</table>
)}
</div>
);
}

View file

@ -1 +1 @@
{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/admin-pipeline.ts","./src/api/admin-techniques.ts","./src/api/auth.ts","./src/api/client.ts","./src/api/consent.ts","./src/api/creator-dashboard.ts","./src/api/creators.ts","./src/api/index.ts","./src/api/reports.ts","./src/api/search.ts","./src/api/stats.ts","./src/api/techniques.ts","./src/api/topics.ts","./src/api/videos.ts","./src/components/AdminDropdown.tsx","./src/components/AppFooter.tsx","./src/components/CategoryIcons.tsx","./src/components/CopyLinkButton.tsx","./src/components/CreatorAvatar.tsx","./src/components/PlayerControls.tsx","./src/components/ProtectedRoute.tsx","./src/components/ReportIssueModal.tsx","./src/components/SearchAutocomplete.tsx","./src/components/SocialIcons.tsx","./src/components/SortDropdown.tsx","./src/components/TableOfContents.tsx","./src/components/TagList.tsx","./src/components/ToggleSwitch.tsx","./src/components/TranscriptSidebar.tsx","./src/components/VideoPlayer.tsx","./src/context/AuthContext.tsx","./src/hooks/useCountUp.ts","./src/hooks/useDocumentTitle.ts","./src/hooks/useMediaSync.ts","./src/hooks/useSortPreference.ts","./src/pages/About.tsx","./src/pages/AdminPipeline.tsx","./src/pages/AdminReports.tsx","./src/pages/AdminTechniquePages.tsx","./src/pages/ConsentDashboard.tsx","./src/pages/CreatorDashboard.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorSettings.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/Login.tsx","./src/pages/Register.tsx","./src/pages/SearchResults.tsx","./src/pages/SubTopicPage.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx","./src/pages/WatchPage.tsx","./src/utils/catSlug.ts","./src/utils/citations.tsx"],"version":"5.6.3"}
{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/admin-pipeline.ts","./src/api/admin-techniques.ts","./src/api/auth.ts","./src/api/client.ts","./src/api/consent.ts","./src/api/creator-dashboard.ts","./src/api/creators.ts","./src/api/index.ts","./src/api/reports.ts","./src/api/search.ts","./src/api/stats.ts","./src/api/techniques.ts","./src/api/topics.ts","./src/api/videos.ts","./src/components/AdminDropdown.tsx","./src/components/AppFooter.tsx","./src/components/CategoryIcons.tsx","./src/components/CopyLinkButton.tsx","./src/components/CreatorAvatar.tsx","./src/components/ImpersonationBanner.tsx","./src/components/PlayerControls.tsx","./src/components/ProtectedRoute.tsx","./src/components/ReportIssueModal.tsx","./src/components/SearchAutocomplete.tsx","./src/components/SocialIcons.tsx","./src/components/SortDropdown.tsx","./src/components/TableOfContents.tsx","./src/components/TagList.tsx","./src/components/ToggleSwitch.tsx","./src/components/TranscriptSidebar.tsx","./src/components/VideoPlayer.tsx","./src/context/AuthContext.tsx","./src/hooks/useCountUp.ts","./src/hooks/useDocumentTitle.ts","./src/hooks/useMediaSync.ts","./src/hooks/useSortPreference.ts","./src/pages/About.tsx","./src/pages/AdminPipeline.tsx","./src/pages/AdminReports.tsx","./src/pages/AdminTechniquePages.tsx","./src/pages/AdminUsers.tsx","./src/pages/ConsentDashboard.tsx","./src/pages/CreatorDashboard.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorSettings.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/Login.tsx","./src/pages/Register.tsx","./src/pages/SearchResults.tsx","./src/pages/SubTopicPage.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx","./src/pages/WatchPage.tsx","./src/utils/catSlug.ts","./src/utils/citations.tsx"],"version":"5.6.3"}