#!/usr/bin/env python3 """Reindex all technique pages into LightRAG for entity/relationship extraction. Connects to PostgreSQL, extracts formatted text from each technique page (with creator and key moment data), and submits to the LightRAG API. Supports resume (skips already-processed file_sources) and dry-run mode. Usage: # Dry run — format and preview without submitting python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3 # Submit first 2 pages python3 /app/scripts/reindex_lightrag.py --limit 2 # Full reindex python3 /app/scripts/reindex_lightrag.py """ import argparse import json import logging import os import sys import time from typing import Any import httpx from sqlalchemy import create_engine from sqlalchemy.orm import Session, joinedload, sessionmaker # Resolve imports whether run from /app/ (Docker) or backend/ (local) _script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) _backend_dir = os.path.dirname(_script_dir) sys.path.insert(0, _backend_dir) from models import Creator, KeyMoment, TechniquePage # noqa: E402 logger = logging.getLogger("reindex_lightrag") # ── Database ───────────────────────────────────────────────────────────────── def get_sync_engine(db_url: str): """Create a sync SQLAlchemy engine, converting async URL if needed.""" url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") return create_engine(url, pool_pre_ping=True) def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]: """Load all technique pages with creator and key moments eagerly.""" query = ( session.query(TechniquePage) .options( joinedload(TechniquePage.creator), joinedload(TechniquePage.key_moments), ) .order_by(TechniquePage.title) ) if limit: query = query.limit(limit) return query.all() # ── Text formatting ────────────────────────────────────────────────────────── def _format_v1_sections(body_sections: dict) -> str: """Format v1 body_sections (flat dict: heading → content).""" parts = [] for heading, content in body_sections.items(): parts.append(f"## {heading}") if isinstance(content, str): parts.append(content) elif isinstance(content, list): parts.append("\n".join(str(item) for item in content)) parts.append("") return "\n".join(parts) def _format_v2_sections(body_sections: list[dict]) -> str: """Format v2 body_sections (list of {heading, content, subsections}).""" parts = [] for section in body_sections: heading = section.get("heading", "") content = section.get("content", "") if heading: parts.append(f"## {heading}") if content: parts.append(content) # Flatten subsections for sub in section.get("subsections", []): sub_heading = sub.get("heading", "") sub_content = sub.get("content", "") if sub_heading: parts.append(f"### {sub_heading}") if sub_content: parts.append(sub_content) parts.append("") return "\n".join(parts) def format_technique_page(page: TechniquePage) -> str: """Convert a TechniquePage + relations into a rich text document for LightRAG.""" lines = [] # Header metadata lines.append(f"Technique: {page.title}") if page.creator: lines.append(f"Creator: {page.creator.name}") lines.append(f"Category: {page.topic_category or 'Uncategorized'}") if page.topic_tags: lines.append(f"Tags: {', '.join(page.topic_tags)}") if page.plugins: lines.append(f"Plugins: {', '.join(page.plugins)}") lines.append("") # Summary if page.summary: lines.append(f"Summary: {page.summary}") lines.append("") # Body sections — handle both formats if page.body_sections: fmt = getattr(page, "body_sections_format", "v1") or "v1" if fmt == "v2" and isinstance(page.body_sections, list): lines.append(_format_v2_sections(page.body_sections)) elif isinstance(page.body_sections, dict): lines.append(_format_v1_sections(page.body_sections)) elif isinstance(page.body_sections, list): # v1 tag but list data — treat as v2 lines.append(_format_v2_sections(page.body_sections)) else: lines.append(str(page.body_sections)) # Key moments from source videos if page.key_moments: lines.append("Key Moments from Source Videos:") for km in page.key_moments: lines.append(f"- {km.title}: {km.summary}") lines.append("") return "\n".join(lines).strip() def file_source_for_page(page: TechniquePage) -> str: """Deterministic file_source identifier for a technique page.""" return f"technique:{page.slug}" # ── LightRAG API ───────────────────────────────────────────────────────────── def get_processed_sources(lightrag_url: str) -> set[str]: """Fetch all file_paths from LightRAG documents for resume support.""" url = f"{lightrag_url}/documents" try: resp = httpx.get(url, timeout=30) resp.raise_for_status() data = resp.json() except httpx.HTTPError as e: logger.warning("Failed to fetch existing documents for resume: %s", e) return set() sources = set() statuses = data.get("statuses", {}) for status_group in statuses.values(): for doc in status_group: fp = doc.get("file_path") if fp: sources.add(fp) return sources def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None: """Submit a text document to LightRAG. Returns response dict or None on error.""" url = f"{lightrag_url}/documents/text" payload = {"text": text, "file_source": file_source} try: resp = httpx.post(url, json=payload, timeout=60) resp.raise_for_status() return resp.json() except httpx.HTTPError as e: logger.error("Failed to submit document %s: %s", file_source, e) return None def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool: """Poll pipeline_status until busy=false. Returns True if finished, False on timeout.""" url = f"{lightrag_url}/documents/pipeline_status" start = time.monotonic() while time.monotonic() - start < timeout: try: resp = httpx.get(url, timeout=10) resp.raise_for_status() data = resp.json() if not data.get("busy", False): return True msg = data.get("latest_message", "") if msg: logger.debug(" Pipeline: %s", msg) except httpx.HTTPError as e: logger.warning(" Pipeline status check failed: %s", e) time.sleep(10) logger.warning("Pipeline did not finish within %ds timeout", timeout) return False # ── Main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Reindex technique pages into LightRAG" ) parser.add_argument( "--lightrag-url", default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"), help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)", ) parser.add_argument( "--db-url", default=os.environ.get( "DATABASE_URL", "postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia", ), help="Database URL (async or sync format accepted)", ) parser.add_argument( "--dry-run", action="store_true", help="Format and preview pages without submitting to LightRAG", ) parser.add_argument( "--limit", type=int, default=None, help="Process only the first N pages (for testing)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging", ) args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) # Connect to PostgreSQL logger.info("Connecting to PostgreSQL...") try: engine = get_sync_engine(args.db_url) SessionLocal = sessionmaker(bind=engine) session = SessionLocal() except Exception as e: logger.error("Failed to connect to PostgreSQL: %s", e) sys.exit(1) # Load technique pages pages = load_technique_pages(session, limit=args.limit) total = len(pages) logger.info("Loaded %d technique page(s)", total) if total == 0: logger.info("No pages to process.") session.close() return # Resume support — get already-processed sources processed_sources: set[str] = set() if not args.dry_run: logger.info("Checking LightRAG for already-processed documents...") processed_sources = get_processed_sources(args.lightrag_url) logger.info("Found %d existing document(s) in LightRAG", len(processed_sources)) # Process pages submitted = 0 skipped = 0 errors = 0 total_chars = 0 for i, page in enumerate(pages, 1): slug = page.slug source = file_source_for_page(page) text = format_technique_page(page) total_chars += len(text) if args.dry_run: if i == 1: print("=" * 80) print(f"PREVIEW: {page.title} ({source})") print("=" * 80) print(text) print("=" * 80) logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text)) continue # Resume — skip already-processed if source in processed_sources: logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug) skipped += 1 continue # Submit logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text)) result = submit_document(args.lightrag_url, text, source) if result is None: errors += 1 continue status = result.get("status", "unknown") if status == "duplicated": logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug) skipped += 1 continue if status not in ("success", "partial_success"): logger.error("[%d/%d] Unexpected status '%s' for %s: %s", i, total, status, slug, result.get("message", "")) errors += 1 continue submitted += 1 logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug, result.get("track_id", "?")) # Wait for pipeline to finish before submitting next logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug) finished = wait_for_pipeline(args.lightrag_url) if finished: logger.info("[%d/%d] Processed: %s", i, total, slug) else: logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug) session.close() # Summary print() print(f"{'DRY RUN ' if args.dry_run else ''}Summary:") print(f" Total pages: {total}") print(f" Total chars: {total_chars:,}") if not args.dry_run: print(f" Submitted: {submitted}") print(f" Skipped: {skipped}") print(f" Errors: {errors}") if __name__ == "__main__": main()