diff --git a/backend/scripts/reindex_lightrag.py b/backend/scripts/reindex_lightrag.py new file mode 100644 index 0000000..fc90fb2 --- /dev/null +++ b/backend/scripts/reindex_lightrag.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +"""Reindex all technique pages into LightRAG for entity/relationship extraction. + +Connects to PostgreSQL, extracts formatted text from each technique page +(with creator and key moment data), and submits to the LightRAG API. + +Supports resume (skips already-processed file_sources) and dry-run mode. + +Usage: + # Dry run — format and preview without submitting + python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3 + + # Submit first 2 pages + python3 /app/scripts/reindex_lightrag.py --limit 2 + + # Full reindex + python3 /app/scripts/reindex_lightrag.py +""" + +import argparse +import json +import logging +import os +import sys +import time +from typing import Any + +import httpx +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, joinedload, sessionmaker + +# Resolve imports whether run from /app/ (Docker) or backend/ (local) +_script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) +_backend_dir = os.path.dirname(_script_dir) +sys.path.insert(0, _backend_dir) + +from models import Creator, KeyMoment, TechniquePage # noqa: E402 + +logger = logging.getLogger("reindex_lightrag") + + +# ── Database ───────────────────────────────────────────────────────────────── + + +def get_sync_engine(db_url: str): + """Create a sync SQLAlchemy engine, converting async URL if needed.""" + url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") + return create_engine(url, pool_pre_ping=True) + + +def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]: + """Load all technique pages with creator and key moments eagerly.""" + query = ( + session.query(TechniquePage) + .options( + joinedload(TechniquePage.creator), + joinedload(TechniquePage.key_moments), + ) + .order_by(TechniquePage.title) + ) + if limit: + query = query.limit(limit) + return query.all() + + +# ── Text formatting ────────────────────────────────────────────────────────── + + +def _format_v1_sections(body_sections: dict) -> str: + """Format v1 body_sections (flat dict: heading → content).""" + parts = [] + for heading, content in body_sections.items(): + parts.append(f"## {heading}") + if isinstance(content, str): + parts.append(content) + elif isinstance(content, list): + parts.append("\n".join(str(item) for item in content)) + parts.append("") + return "\n".join(parts) + + +def _format_v2_sections(body_sections: list[dict]) -> str: + """Format v2 body_sections (list of {heading, content, subsections}).""" + parts = [] + for section in body_sections: + heading = section.get("heading", "") + content = section.get("content", "") + if heading: + parts.append(f"## {heading}") + if content: + parts.append(content) + # Flatten subsections + for sub in section.get("subsections", []): + sub_heading = sub.get("heading", "") + sub_content = sub.get("content", "") + if sub_heading: + parts.append(f"### {sub_heading}") + if sub_content: + parts.append(sub_content) + parts.append("") + return "\n".join(parts) + + +def format_technique_page(page: TechniquePage) -> str: + """Convert a TechniquePage + relations into a rich text document for LightRAG.""" + lines = [] + + # Header metadata + lines.append(f"Technique: {page.title}") + if page.creator: + lines.append(f"Creator: {page.creator.name}") + lines.append(f"Category: {page.topic_category or 'Uncategorized'}") + if page.topic_tags: + lines.append(f"Tags: {', '.join(page.topic_tags)}") + if page.plugins: + lines.append(f"Plugins: {', '.join(page.plugins)}") + lines.append("") + + # Summary + if page.summary: + lines.append(f"Summary: {page.summary}") + lines.append("") + + # Body sections — handle both formats + if page.body_sections: + fmt = getattr(page, "body_sections_format", "v1") or "v1" + if fmt == "v2" and isinstance(page.body_sections, list): + lines.append(_format_v2_sections(page.body_sections)) + elif isinstance(page.body_sections, dict): + lines.append(_format_v1_sections(page.body_sections)) + elif isinstance(page.body_sections, list): + # v1 tag but list data — treat as v2 + lines.append(_format_v2_sections(page.body_sections)) + else: + lines.append(str(page.body_sections)) + + # Key moments from source videos + if page.key_moments: + lines.append("Key Moments from Source Videos:") + for km in page.key_moments: + lines.append(f"- {km.title}: {km.summary}") + lines.append("") + + return "\n".join(lines).strip() + + +def file_source_for_page(page: TechniquePage) -> str: + """Deterministic file_source identifier for a technique page.""" + return f"technique:{page.slug}" + + +# ── LightRAG API ───────────────────────────────────────────────────────────── + + +def get_processed_sources(lightrag_url: str) -> set[str]: + """Fetch all file_paths from LightRAG documents for resume support.""" + url = f"{lightrag_url}/documents" + try: + resp = httpx.get(url, timeout=30) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPError as e: + logger.warning("Failed to fetch existing documents for resume: %s", e) + return set() + + sources = set() + statuses = data.get("statuses", {}) + for status_group in statuses.values(): + for doc in status_group: + fp = doc.get("file_path") + if fp: + sources.add(fp) + return sources + + +def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None: + """Submit a text document to LightRAG. Returns response dict or None on error.""" + url = f"{lightrag_url}/documents/text" + payload = {"text": text, "file_source": file_source} + try: + resp = httpx.post(url, json=payload, timeout=60) + resp.raise_for_status() + return resp.json() + except httpx.HTTPError as e: + logger.error("Failed to submit document %s: %s", file_source, e) + return None + + +def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool: + """Poll pipeline_status until busy=false. Returns True if finished, False on timeout.""" + url = f"{lightrag_url}/documents/pipeline_status" + start = time.monotonic() + while time.monotonic() - start < timeout: + try: + resp = httpx.get(url, timeout=10) + resp.raise_for_status() + data = resp.json() + if not data.get("busy", False): + return True + msg = data.get("latest_message", "") + if msg: + logger.debug(" Pipeline: %s", msg) + except httpx.HTTPError as e: + logger.warning(" Pipeline status check failed: %s", e) + time.sleep(10) + logger.warning("Pipeline did not finish within %ds timeout", timeout) + return False + + +# ── Main ───────────────────────────────────────────────────────────────────── + + +def main(): + parser = argparse.ArgumentParser( + description="Reindex technique pages into LightRAG" + ) + parser.add_argument( + "--lightrag-url", + default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"), + help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)", + ) + parser.add_argument( + "--db-url", + default=os.environ.get( + "DATABASE_URL", + "postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia", + ), + help="Database URL (async or sync format accepted)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Format and preview pages without submitting to LightRAG", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Process only the first N pages (for testing)", + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable debug logging", + ) + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + # Connect to PostgreSQL + logger.info("Connecting to PostgreSQL...") + try: + engine = get_sync_engine(args.db_url) + SessionLocal = sessionmaker(bind=engine) + session = SessionLocal() + except Exception as e: + logger.error("Failed to connect to PostgreSQL: %s", e) + sys.exit(1) + + # Load technique pages + pages = load_technique_pages(session, limit=args.limit) + total = len(pages) + logger.info("Loaded %d technique page(s)", total) + + if total == 0: + logger.info("No pages to process.") + session.close() + return + + # Resume support — get already-processed sources + processed_sources: set[str] = set() + if not args.dry_run: + logger.info("Checking LightRAG for already-processed documents...") + processed_sources = get_processed_sources(args.lightrag_url) + logger.info("Found %d existing document(s) in LightRAG", len(processed_sources)) + + # Process pages + submitted = 0 + skipped = 0 + errors = 0 + total_chars = 0 + + for i, page in enumerate(pages, 1): + slug = page.slug + source = file_source_for_page(page) + text = format_technique_page(page) + total_chars += len(text) + + if args.dry_run: + if i == 1: + print("=" * 80) + print(f"PREVIEW: {page.title} ({source})") + print("=" * 80) + print(text) + print("=" * 80) + logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text)) + continue + + # Resume — skip already-processed + if source in processed_sources: + logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug) + skipped += 1 + continue + + # Submit + logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text)) + result = submit_document(args.lightrag_url, text, source) + if result is None: + errors += 1 + continue + + status = result.get("status", "unknown") + if status == "duplicated": + logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug) + skipped += 1 + continue + + if status not in ("success", "partial_success"): + logger.error("[%d/%d] Unexpected status '%s' for %s: %s", + i, total, status, slug, result.get("message", "")) + errors += 1 + continue + + submitted += 1 + logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug, + result.get("track_id", "?")) + + # Wait for pipeline to finish before submitting next + logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug) + finished = wait_for_pipeline(args.lightrag_url) + if finished: + logger.info("[%d/%d] Processed: %s", i, total, slug) + else: + logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug) + + session.close() + + # Summary + print() + print(f"{'DRY RUN ' if args.dry_run else ''}Summary:") + print(f" Total pages: {total}") + print(f" Total chars: {total_chars:,}") + if not args.dry_run: + print(f" Submitted: {submitted}") + print(f" Skipped: {skipped}") + print(f" Errors: {errors}") + + +if __name__ == "__main__": + main()