#!/usr/bin/env python3 """Reindex all technique pages into LightRAG for entity/relationship extraction. Connects to PostgreSQL, extracts formatted text from each technique page (with creator and key moment data), and submits to the LightRAG API. Supports resume (skips already-processed file_sources) and dry-run mode. Usage: # Dry run — format and preview without submitting python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3 # Submit first 2 pages python3 /app/scripts/reindex_lightrag.py --limit 2 # Full reindex python3 /app/scripts/reindex_lightrag.py """ import argparse import json import logging import os import sys import time from typing import Any import httpx from sqlalchemy import create_engine from sqlalchemy.orm import Session, joinedload, sessionmaker # Resolve imports whether run from /app/ (Docker) or backend/ (local) _script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) _backend_dir = os.path.dirname(_script_dir) sys.path.insert(0, _backend_dir) from models import Creator, KeyMoment, SourceVideo, TechniquePage # noqa: E402 logger = logging.getLogger("reindex_lightrag") # ── Database ───────────────────────────────────────────────────────────────── def get_sync_engine(db_url: str): """Create a sync SQLAlchemy engine, converting async URL if needed.""" url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") return create_engine(url, pool_pre_ping=True) def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]: """Load all technique pages with creator, key moments, and source videos eagerly.""" query = ( session.query(TechniquePage) .options( joinedload(TechniquePage.creator), joinedload(TechniquePage.key_moments).joinedload(KeyMoment.source_video), ) .order_by(TechniquePage.title) ) if limit: query = query.limit(limit) return query.all() # ── Text formatting ────────────────────────────────────────────────────────── def _format_v1_sections(body_sections: dict) -> str: """Format v1 body_sections (flat dict: heading → content).""" parts = [] for heading, content in body_sections.items(): parts.append(f"## {heading}") if isinstance(content, str): parts.append(content) elif isinstance(content, list): parts.append("\n".join(str(item) for item in content)) parts.append("") return "\n".join(parts) def _format_v2_sections(body_sections: list[dict]) -> str: """Format v2 body_sections (list of {heading, content, subsections}).""" parts = [] for section in body_sections: heading = section.get("heading", "") content = section.get("content", "") if heading: parts.append(f"## {heading}") if content: parts.append(content) # Flatten subsections for sub in section.get("subsections", []): sub_heading = sub.get("heading", "") sub_content = sub.get("content", "") if sub_heading: parts.append(f"### {sub_heading}") if sub_content: parts.append(sub_content) parts.append("") return "\n".join(parts) def format_technique_page(page: TechniquePage) -> str: """Convert a TechniquePage + relations into a rich text document for LightRAG. Includes structured provenance metadata for entity extraction and creator-scoped retrieval. """ lines = [] # ── Structured provenance block ────────────────────────────────────── lines.append(f"Technique: {page.title}") if page.creator: lines.append(f"Creator: {page.creator.name}") lines.append(f"Creator ID: {page.creator_id}") lines.append(f"Category: {page.topic_category or 'Uncategorized'}") if page.topic_tags: lines.append(f"Tags: {', '.join(page.topic_tags)}") if page.plugins: lines.append(f"Plugins: {', '.join(page.plugins)}") # Source video provenance if page.key_moments: video_ids: dict[str, str] = {} for km in page.key_moments: sv = getattr(km, "source_video", None) if sv and str(sv.id) not in video_ids: video_ids[str(sv.id)] = sv.filename if video_ids: lines.append(f"Source Videos: {', '.join(video_ids.values())}") lines.append(f"Source Video IDs: {', '.join(video_ids.keys())}") lines.append("") # ── Summary ────────────────────────────────────────────────────────── if page.summary: lines.append(f"Summary: {page.summary}") lines.append("") # ── Body sections ──────────────────────────────────────────────────── if page.body_sections: fmt = getattr(page, "body_sections_format", "v1") or "v1" if fmt == "v2" and isinstance(page.body_sections, list): lines.append(_format_v2_sections(page.body_sections)) elif isinstance(page.body_sections, dict): lines.append(_format_v1_sections(page.body_sections)) elif isinstance(page.body_sections, list): # v1 tag but list data — treat as v2 lines.append(_format_v2_sections(page.body_sections)) else: lines.append(str(page.body_sections)) # ── Key moments with source attribution ────────────────────────────── if page.key_moments: lines.append("Key Moments from Source Videos:") for km in page.key_moments: sv = getattr(km, "source_video", None) source_info = f" (Source: {sv.filename})" if sv else "" lines.append(f"- {km.title}: {km.summary}{source_info}") lines.append("") return "\n".join(lines).strip() def file_source_for_page(page: TechniquePage) -> str: """Deterministic file_source identifier for a technique page. Encodes creator_id for provenance tracking. Format: technique:{slug}:creator:{creator_id} """ creator_id = str(page.creator_id) if page.creator_id else "unknown" return f"technique:{page.slug}:creator:{creator_id}" # ── LightRAG API ───────────────────────────────────────────────────────────── def get_processed_sources(lightrag_url: str) -> set[str]: """Fetch all file_paths from LightRAG documents for resume support.""" url = f"{lightrag_url}/documents" try: resp = httpx.get(url, timeout=30) resp.raise_for_status() data = resp.json() except httpx.HTTPError as e: logger.warning("Failed to fetch existing documents for resume: %s", e) return set() sources = set() statuses = data.get("statuses", {}) for status_group in statuses.values(): for doc in status_group: fp = doc.get("file_path") if fp: sources.add(fp) return sources def clear_all_documents(lightrag_url: str) -> bool: """Delete all documents from LightRAG. Returns True on success. Uses the /documents/delete_document endpoint with doc_ids (not file_path). """ # Get all document IDs try: resp = httpx.get(f"{lightrag_url}/documents", timeout=30) resp.raise_for_status() data = resp.json() except httpx.HTTPError as e: logger.error("Failed to fetch documents for clearing: %s", e) return False doc_ids = [] for status_group in data.get("statuses", {}).values(): for doc in status_group: did = doc.get("id") if did: doc_ids.append(did) if not doc_ids: logger.info("No documents to clear.") return True logger.info("Clearing %d documents from LightRAG...", len(doc_ids)) try: resp = httpx.request( "DELETE", f"{lightrag_url}/documents/delete_document", json={"doc_ids": doc_ids, "delete_llm_cache": True}, timeout=120, ) resp.raise_for_status() logger.info("Cleared %d documents.", len(doc_ids)) return True except httpx.HTTPError as e: logger.error("Failed to delete documents: %s", e) return False def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None: """Submit a text document to LightRAG. Returns response dict or None on error.""" url = f"{lightrag_url}/documents/text" payload = {"text": text, "file_source": file_source} try: resp = httpx.post(url, json=payload, timeout=60) resp.raise_for_status() return resp.json() except httpx.HTTPError as e: logger.error("Failed to submit document %s: %s", file_source, e) return None def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool: """Poll pipeline_status until busy=false. Returns True if finished, False on timeout.""" url = f"{lightrag_url}/documents/pipeline_status" start = time.monotonic() while time.monotonic() - start < timeout: try: resp = httpx.get(url, timeout=10) resp.raise_for_status() data = resp.json() if not data.get("busy", False): return True msg = data.get("latest_message", "") if msg: logger.debug(" Pipeline: %s", msg) except httpx.HTTPError as e: logger.warning(" Pipeline status check failed: %s", e) time.sleep(10) logger.warning("Pipeline did not finish within %ds timeout", timeout) return False # ── Main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Reindex technique pages into LightRAG" ) parser.add_argument( "--lightrag-url", default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"), help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)", ) parser.add_argument( "--db-url", default=os.environ.get( "DATABASE_URL", "postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia", ), help="Database URL (async or sync format accepted)", ) parser.add_argument( "--dry-run", action="store_true", help="Format and preview pages without submitting to LightRAG", ) parser.add_argument( "--force", action="store_true", help="Skip resume check — resubmit all pages even if already processed", ) parser.add_argument( "--clear-first", action="store_true", help="Delete all existing LightRAG documents before reindexing", ) parser.add_argument( "--limit", type=int, default=None, help="Process only the first N pages (for testing)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging", ) args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) # Connect to PostgreSQL logger.info("Connecting to PostgreSQL...") try: engine = get_sync_engine(args.db_url) SessionLocal = sessionmaker(bind=engine) session = SessionLocal() except Exception as e: logger.error("Failed to connect to PostgreSQL: %s", e) sys.exit(1) # Load technique pages pages = load_technique_pages(session, limit=args.limit) total = len(pages) logger.info("Loaded %d technique page(s)", total) if total == 0: logger.info("No pages to process.") session.close() return # Clear existing documents if requested if args.clear_first and not args.dry_run: clear_all_documents(args.lightrag_url) # Resume support — get already-processed sources (skip if --force) processed_sources: set[str] = set() if not args.dry_run and not args.force: logger.info("Checking LightRAG for already-processed documents...") processed_sources = get_processed_sources(args.lightrag_url) logger.info("Found %d existing document(s) in LightRAG", len(processed_sources)) # Process pages submitted = 0 skipped = 0 errors = 0 total_chars = 0 for i, page in enumerate(pages, 1): slug = page.slug source = file_source_for_page(page) text = format_technique_page(page) total_chars += len(text) if args.dry_run: if i == 1: print("=" * 80) print(f"PREVIEW: {page.title} ({source})") print("=" * 80) print(text) print("=" * 80) logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text)) continue # Resume — skip already-processed if source in processed_sources: logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug) skipped += 1 continue # Submit logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text)) result = submit_document(args.lightrag_url, text, source) if result is None: errors += 1 continue status = result.get("status", "unknown") if status == "duplicated": logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug) skipped += 1 continue if status not in ("success", "partial_success"): logger.error("[%d/%d] Unexpected status '%s' for %s: %s", i, total, status, slug, result.get("message", "")) errors += 1 continue submitted += 1 logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug, result.get("track_id", "?")) # Wait for pipeline to finish before submitting next logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug) finished = wait_for_pipeline(args.lightrag_url) if finished: logger.info("[%d/%d] Processed: %s", i, total, slug) else: logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug) session.close() # Summary print() print(f"{'DRY RUN ' if args.dry_run else ''}Summary:") print(f" Total pages: {total}") print(f" Total chars: {total_chars:,}") if not args.dry_run: print(f" Submitted: {submitted}") print(f" Skipped: {skipped}") print(f" Errors: {errors}") if __name__ == "__main__": main()