chrysopedia/backend/scripts/reindex_lightrag.py
jlightner 17b43d9778 feat: Added LightRAG /query/data as primary search engine with file_sou…
- "backend/config.py"
- "backend/search_service.py"

GSD-Task: S01/T01
2026-04-04 04:44:24 +00:00

432 lines
15 KiB
Python

#!/usr/bin/env python3
"""Reindex all technique pages into LightRAG for entity/relationship extraction.
Connects to PostgreSQL, extracts formatted text from each technique page
(with creator and key moment data), and submits to the LightRAG API.
Supports resume (skips already-processed file_sources) and dry-run mode.
Usage:
# Dry run — format and preview without submitting
python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3
# Submit first 2 pages
python3 /app/scripts/reindex_lightrag.py --limit 2
# Full reindex
python3 /app/scripts/reindex_lightrag.py
"""
import argparse
import json
import logging
import os
import sys
import time
from typing import Any
import httpx
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, joinedload, sessionmaker
# Resolve imports whether run from /app/ (Docker) or backend/ (local)
_script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
_backend_dir = os.path.dirname(_script_dir)
sys.path.insert(0, _backend_dir)
from models import Creator, KeyMoment, SourceVideo, TechniquePage # noqa: E402
logger = logging.getLogger("reindex_lightrag")
# ── Database ─────────────────────────────────────────────────────────────────
def get_sync_engine(db_url: str):
"""Create a sync SQLAlchemy engine, converting async URL if needed."""
url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
return create_engine(url, pool_pre_ping=True)
def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]:
"""Load all technique pages with creator, key moments, and source videos eagerly."""
query = (
session.query(TechniquePage)
.options(
joinedload(TechniquePage.creator),
joinedload(TechniquePage.key_moments).joinedload(KeyMoment.source_video),
)
.order_by(TechniquePage.title)
)
if limit:
query = query.limit(limit)
return query.all()
# ── Text formatting ──────────────────────────────────────────────────────────
def _format_v1_sections(body_sections: dict) -> str:
"""Format v1 body_sections (flat dict: heading → content)."""
parts = []
for heading, content in body_sections.items():
parts.append(f"## {heading}")
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
parts.append("\n".join(str(item) for item in content))
parts.append("")
return "\n".join(parts)
def _format_v2_sections(body_sections: list[dict]) -> str:
"""Format v2 body_sections (list of {heading, content, subsections})."""
parts = []
for section in body_sections:
heading = section.get("heading", "")
content = section.get("content", "")
if heading:
parts.append(f"## {heading}")
if content:
parts.append(content)
# Flatten subsections
for sub in section.get("subsections", []):
sub_heading = sub.get("heading", "")
sub_content = sub.get("content", "")
if sub_heading:
parts.append(f"### {sub_heading}")
if sub_content:
parts.append(sub_content)
parts.append("")
return "\n".join(parts)
def format_technique_page(page: TechniquePage) -> str:
"""Convert a TechniquePage + relations into a rich text document for LightRAG.
Includes structured provenance metadata for entity extraction and
creator-scoped retrieval.
"""
lines = []
# ── Structured provenance block ──────────────────────────────────────
lines.append(f"Technique: {page.title}")
if page.creator:
lines.append(f"Creator: {page.creator.name}")
lines.append(f"Creator ID: {page.creator_id}")
lines.append(f"Category: {page.topic_category or 'Uncategorized'}")
if page.topic_tags:
lines.append(f"Tags: {', '.join(page.topic_tags)}")
if page.plugins:
lines.append(f"Plugins: {', '.join(page.plugins)}")
# Source video provenance
if page.key_moments:
video_ids: dict[str, str] = {}
for km in page.key_moments:
sv = getattr(km, "source_video", None)
if sv and str(sv.id) not in video_ids:
video_ids[str(sv.id)] = sv.filename
if video_ids:
lines.append(f"Source Videos: {', '.join(video_ids.values())}")
lines.append(f"Source Video IDs: {', '.join(video_ids.keys())}")
lines.append("")
# ── Summary ──────────────────────────────────────────────────────────
if page.summary:
lines.append(f"Summary: {page.summary}")
lines.append("")
# ── Body sections ────────────────────────────────────────────────────
if page.body_sections:
fmt = getattr(page, "body_sections_format", "v1") or "v1"
if fmt == "v2" and isinstance(page.body_sections, list):
lines.append(_format_v2_sections(page.body_sections))
elif isinstance(page.body_sections, dict):
lines.append(_format_v1_sections(page.body_sections))
elif isinstance(page.body_sections, list):
# v1 tag but list data — treat as v2
lines.append(_format_v2_sections(page.body_sections))
else:
lines.append(str(page.body_sections))
# ── Key moments with source attribution ──────────────────────────────
if page.key_moments:
lines.append("Key Moments from Source Videos:")
for km in page.key_moments:
sv = getattr(km, "source_video", None)
source_info = f" (Source: {sv.filename})" if sv else ""
lines.append(f"- {km.title}: {km.summary}{source_info}")
lines.append("")
return "\n".join(lines).strip()
def file_source_for_page(page: TechniquePage) -> str:
"""Deterministic file_source identifier for a technique page.
Encodes creator_id for provenance tracking. Format:
technique:{slug}:creator:{creator_id}
"""
creator_id = str(page.creator_id) if page.creator_id else "unknown"
return f"technique:{page.slug}:creator:{creator_id}"
# ── LightRAG API ─────────────────────────────────────────────────────────────
def get_processed_sources(lightrag_url: str) -> set[str]:
"""Fetch all file_paths from LightRAG documents for resume support."""
url = f"{lightrag_url}/documents"
try:
resp = httpx.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
logger.warning("Failed to fetch existing documents for resume: %s", e)
return set()
sources = set()
statuses = data.get("statuses", {})
for status_group in statuses.values():
for doc in status_group:
fp = doc.get("file_path")
if fp:
sources.add(fp)
return sources
def clear_all_documents(lightrag_url: str) -> bool:
"""Delete all documents from LightRAG. Returns True on success.
Uses the /documents/delete_document endpoint with doc_ids (not file_path).
"""
# Get all document IDs
try:
resp = httpx.get(f"{lightrag_url}/documents", timeout=30)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
logger.error("Failed to fetch documents for clearing: %s", e)
return False
doc_ids = []
for status_group in data.get("statuses", {}).values():
for doc in status_group:
did = doc.get("id")
if did:
doc_ids.append(did)
if not doc_ids:
logger.info("No documents to clear.")
return True
logger.info("Clearing %d documents from LightRAG...", len(doc_ids))
try:
resp = httpx.request(
"DELETE",
f"{lightrag_url}/documents/delete_document",
json={"doc_ids": doc_ids, "delete_llm_cache": True},
timeout=120,
)
resp.raise_for_status()
logger.info("Cleared %d documents.", len(doc_ids))
return True
except httpx.HTTPError as e:
logger.error("Failed to delete documents: %s", e)
return False
def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None:
"""Submit a text document to LightRAG. Returns response dict or None on error."""
url = f"{lightrag_url}/documents/text"
payload = {"text": text, "file_source": file_source}
try:
resp = httpx.post(url, json=payload, timeout=60)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError as e:
logger.error("Failed to submit document %s: %s", file_source, e)
return None
def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool:
"""Poll pipeline_status until busy=false. Returns True if finished, False on timeout."""
url = f"{lightrag_url}/documents/pipeline_status"
start = time.monotonic()
while time.monotonic() - start < timeout:
try:
resp = httpx.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
if not data.get("busy", False):
return True
msg = data.get("latest_message", "")
if msg:
logger.debug(" Pipeline: %s", msg)
except httpx.HTTPError as e:
logger.warning(" Pipeline status check failed: %s", e)
time.sleep(10)
logger.warning("Pipeline did not finish within %ds timeout", timeout)
return False
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Reindex technique pages into LightRAG"
)
parser.add_argument(
"--lightrag-url",
default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"),
help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)",
)
parser.add_argument(
"--db-url",
default=os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia",
),
help="Database URL (async or sync format accepted)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Format and preview pages without submitting to LightRAG",
)
parser.add_argument(
"--force",
action="store_true",
help="Skip resume check — resubmit all pages even if already processed",
)
parser.add_argument(
"--clear-first",
action="store_true",
help="Delete all existing LightRAG documents before reindexing",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Process only the first N pages (for testing)",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
# Connect to PostgreSQL
logger.info("Connecting to PostgreSQL...")
try:
engine = get_sync_engine(args.db_url)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
except Exception as e:
logger.error("Failed to connect to PostgreSQL: %s", e)
sys.exit(1)
# Load technique pages
pages = load_technique_pages(session, limit=args.limit)
total = len(pages)
logger.info("Loaded %d technique page(s)", total)
if total == 0:
logger.info("No pages to process.")
session.close()
return
# Clear existing documents if requested
if args.clear_first and not args.dry_run:
clear_all_documents(args.lightrag_url)
# Resume support — get already-processed sources (skip if --force)
processed_sources: set[str] = set()
if not args.dry_run and not args.force:
logger.info("Checking LightRAG for already-processed documents...")
processed_sources = get_processed_sources(args.lightrag_url)
logger.info("Found %d existing document(s) in LightRAG", len(processed_sources))
# Process pages
submitted = 0
skipped = 0
errors = 0
total_chars = 0
for i, page in enumerate(pages, 1):
slug = page.slug
source = file_source_for_page(page)
text = format_technique_page(page)
total_chars += len(text)
if args.dry_run:
if i == 1:
print("=" * 80)
print(f"PREVIEW: {page.title} ({source})")
print("=" * 80)
print(text)
print("=" * 80)
logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text))
continue
# Resume — skip already-processed
if source in processed_sources:
logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug)
skipped += 1
continue
# Submit
logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text))
result = submit_document(args.lightrag_url, text, source)
if result is None:
errors += 1
continue
status = result.get("status", "unknown")
if status == "duplicated":
logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug)
skipped += 1
continue
if status not in ("success", "partial_success"):
logger.error("[%d/%d] Unexpected status '%s' for %s: %s",
i, total, status, slug, result.get("message", ""))
errors += 1
continue
submitted += 1
logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug,
result.get("track_id", "?"))
# Wait for pipeline to finish before submitting next
logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug)
finished = wait_for_pipeline(args.lightrag_url)
if finished:
logger.info("[%d/%d] Processed: %s", i, total, slug)
else:
logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug)
session.close()
# Summary
print()
print(f"{'DRY RUN ' if args.dry_run else ''}Summary:")
print(f" Total pages: {total}")
print(f" Total chars: {total_chars:,}")
if not args.dry_run:
print(f" Submitted: {submitted}")
print(f" Skipped: {skipped}")
print(f" Errors: {errors}")
if __name__ == "__main__":
main()