432 lines
15 KiB
Python
432 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Reindex all technique pages into LightRAG for entity/relationship extraction.
|
|
|
|
Connects to PostgreSQL, extracts formatted text from each technique page
|
|
(with creator and key moment data), and submits to the LightRAG API.
|
|
|
|
Supports resume (skips already-processed file_sources) and dry-run mode.
|
|
|
|
Usage:
|
|
# Dry run — format and preview without submitting
|
|
python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3
|
|
|
|
# Submit first 2 pages
|
|
python3 /app/scripts/reindex_lightrag.py --limit 2
|
|
|
|
# Full reindex
|
|
python3 /app/scripts/reindex_lightrag.py
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session, joinedload, sessionmaker
|
|
|
|
# Resolve imports whether run from /app/ (Docker) or backend/ (local)
|
|
_script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
|
|
_backend_dir = os.path.dirname(_script_dir)
|
|
sys.path.insert(0, _backend_dir)
|
|
|
|
from models import Creator, KeyMoment, SourceVideo, TechniquePage # noqa: E402
|
|
|
|
logger = logging.getLogger("reindex_lightrag")
|
|
|
|
|
|
# ── Database ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_sync_engine(db_url: str):
|
|
"""Create a sync SQLAlchemy engine, converting async URL if needed."""
|
|
url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
|
|
return create_engine(url, pool_pre_ping=True)
|
|
|
|
|
|
def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]:
|
|
"""Load all technique pages with creator, key moments, and source videos eagerly."""
|
|
query = (
|
|
session.query(TechniquePage)
|
|
.options(
|
|
joinedload(TechniquePage.creator),
|
|
joinedload(TechniquePage.key_moments).joinedload(KeyMoment.source_video),
|
|
)
|
|
.order_by(TechniquePage.title)
|
|
)
|
|
if limit:
|
|
query = query.limit(limit)
|
|
return query.all()
|
|
|
|
|
|
# ── Text formatting ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _format_v1_sections(body_sections: dict) -> str:
|
|
"""Format v1 body_sections (flat dict: heading → content)."""
|
|
parts = []
|
|
for heading, content in body_sections.items():
|
|
parts.append(f"## {heading}")
|
|
if isinstance(content, str):
|
|
parts.append(content)
|
|
elif isinstance(content, list):
|
|
parts.append("\n".join(str(item) for item in content))
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _format_v2_sections(body_sections: list[dict]) -> str:
|
|
"""Format v2 body_sections (list of {heading, content, subsections})."""
|
|
parts = []
|
|
for section in body_sections:
|
|
heading = section.get("heading", "")
|
|
content = section.get("content", "")
|
|
if heading:
|
|
parts.append(f"## {heading}")
|
|
if content:
|
|
parts.append(content)
|
|
# Flatten subsections
|
|
for sub in section.get("subsections", []):
|
|
sub_heading = sub.get("heading", "")
|
|
sub_content = sub.get("content", "")
|
|
if sub_heading:
|
|
parts.append(f"### {sub_heading}")
|
|
if sub_content:
|
|
parts.append(sub_content)
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def format_technique_page(page: TechniquePage) -> str:
|
|
"""Convert a TechniquePage + relations into a rich text document for LightRAG.
|
|
|
|
Includes structured provenance metadata for entity extraction and
|
|
creator-scoped retrieval.
|
|
"""
|
|
lines = []
|
|
|
|
# ── Structured provenance block ──────────────────────────────────────
|
|
lines.append(f"Technique: {page.title}")
|
|
if page.creator:
|
|
lines.append(f"Creator: {page.creator.name}")
|
|
lines.append(f"Creator ID: {page.creator_id}")
|
|
lines.append(f"Category: {page.topic_category or 'Uncategorized'}")
|
|
if page.topic_tags:
|
|
lines.append(f"Tags: {', '.join(page.topic_tags)}")
|
|
if page.plugins:
|
|
lines.append(f"Plugins: {', '.join(page.plugins)}")
|
|
|
|
# Source video provenance
|
|
if page.key_moments:
|
|
video_ids: dict[str, str] = {}
|
|
for km in page.key_moments:
|
|
sv = getattr(km, "source_video", None)
|
|
if sv and str(sv.id) not in video_ids:
|
|
video_ids[str(sv.id)] = sv.filename
|
|
if video_ids:
|
|
lines.append(f"Source Videos: {', '.join(video_ids.values())}")
|
|
lines.append(f"Source Video IDs: {', '.join(video_ids.keys())}")
|
|
lines.append("")
|
|
|
|
# ── Summary ──────────────────────────────────────────────────────────
|
|
if page.summary:
|
|
lines.append(f"Summary: {page.summary}")
|
|
lines.append("")
|
|
|
|
# ── Body sections ────────────────────────────────────────────────────
|
|
if page.body_sections:
|
|
fmt = getattr(page, "body_sections_format", "v1") or "v1"
|
|
if fmt == "v2" and isinstance(page.body_sections, list):
|
|
lines.append(_format_v2_sections(page.body_sections))
|
|
elif isinstance(page.body_sections, dict):
|
|
lines.append(_format_v1_sections(page.body_sections))
|
|
elif isinstance(page.body_sections, list):
|
|
# v1 tag but list data — treat as v2
|
|
lines.append(_format_v2_sections(page.body_sections))
|
|
else:
|
|
lines.append(str(page.body_sections))
|
|
|
|
# ── Key moments with source attribution ──────────────────────────────
|
|
if page.key_moments:
|
|
lines.append("Key Moments from Source Videos:")
|
|
for km in page.key_moments:
|
|
sv = getattr(km, "source_video", None)
|
|
source_info = f" (Source: {sv.filename})" if sv else ""
|
|
lines.append(f"- {km.title}: {km.summary}{source_info}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
def file_source_for_page(page: TechniquePage) -> str:
|
|
"""Deterministic file_source identifier for a technique page.
|
|
|
|
Encodes creator_id for provenance tracking. Format:
|
|
technique:{slug}:creator:{creator_id}
|
|
"""
|
|
creator_id = str(page.creator_id) if page.creator_id else "unknown"
|
|
return f"technique:{page.slug}:creator:{creator_id}"
|
|
|
|
|
|
# ── LightRAG API ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_processed_sources(lightrag_url: str) -> set[str]:
|
|
"""Fetch all file_paths from LightRAG documents for resume support."""
|
|
url = f"{lightrag_url}/documents"
|
|
try:
|
|
resp = httpx.get(url, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except httpx.HTTPError as e:
|
|
logger.warning("Failed to fetch existing documents for resume: %s", e)
|
|
return set()
|
|
|
|
sources = set()
|
|
statuses = data.get("statuses", {})
|
|
for status_group in statuses.values():
|
|
for doc in status_group:
|
|
fp = doc.get("file_path")
|
|
if fp:
|
|
sources.add(fp)
|
|
return sources
|
|
|
|
|
|
def clear_all_documents(lightrag_url: str) -> bool:
|
|
"""Delete all documents from LightRAG. Returns True on success.
|
|
|
|
Uses the /documents/delete_document endpoint with doc_ids (not file_path).
|
|
"""
|
|
# Get all document IDs
|
|
try:
|
|
resp = httpx.get(f"{lightrag_url}/documents", timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except httpx.HTTPError as e:
|
|
logger.error("Failed to fetch documents for clearing: %s", e)
|
|
return False
|
|
|
|
doc_ids = []
|
|
for status_group in data.get("statuses", {}).values():
|
|
for doc in status_group:
|
|
did = doc.get("id")
|
|
if did:
|
|
doc_ids.append(did)
|
|
|
|
if not doc_ids:
|
|
logger.info("No documents to clear.")
|
|
return True
|
|
|
|
logger.info("Clearing %d documents from LightRAG...", len(doc_ids))
|
|
try:
|
|
resp = httpx.request(
|
|
"DELETE",
|
|
f"{lightrag_url}/documents/delete_document",
|
|
json={"doc_ids": doc_ids, "delete_llm_cache": True},
|
|
timeout=120,
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Cleared %d documents.", len(doc_ids))
|
|
return True
|
|
except httpx.HTTPError as e:
|
|
logger.error("Failed to delete documents: %s", e)
|
|
return False
|
|
|
|
|
|
def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None:
|
|
"""Submit a text document to LightRAG. Returns response dict or None on error."""
|
|
url = f"{lightrag_url}/documents/text"
|
|
payload = {"text": text, "file_source": file_source}
|
|
try:
|
|
resp = httpx.post(url, json=payload, timeout=60)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except httpx.HTTPError as e:
|
|
logger.error("Failed to submit document %s: %s", file_source, e)
|
|
return None
|
|
|
|
|
|
def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool:
|
|
"""Poll pipeline_status until busy=false. Returns True if finished, False on timeout."""
|
|
url = f"{lightrag_url}/documents/pipeline_status"
|
|
start = time.monotonic()
|
|
while time.monotonic() - start < timeout:
|
|
try:
|
|
resp = httpx.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("busy", False):
|
|
return True
|
|
msg = data.get("latest_message", "")
|
|
if msg:
|
|
logger.debug(" Pipeline: %s", msg)
|
|
except httpx.HTTPError as e:
|
|
logger.warning(" Pipeline status check failed: %s", e)
|
|
time.sleep(10)
|
|
logger.warning("Pipeline did not finish within %ds timeout", timeout)
|
|
return False
|
|
|
|
|
|
# ── Main ─────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Reindex technique pages into LightRAG"
|
|
)
|
|
parser.add_argument(
|
|
"--lightrag-url",
|
|
default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"),
|
|
help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)",
|
|
)
|
|
parser.add_argument(
|
|
"--db-url",
|
|
default=os.environ.get(
|
|
"DATABASE_URL",
|
|
"postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia",
|
|
),
|
|
help="Database URL (async or sync format accepted)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Format and preview pages without submitting to LightRAG",
|
|
)
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Skip resume check — resubmit all pages even if already processed",
|
|
)
|
|
parser.add_argument(
|
|
"--clear-first",
|
|
action="store_true",
|
|
help="Delete all existing LightRAG documents before reindexing",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=None,
|
|
help="Process only the first N pages (for testing)",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable debug logging",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
# Connect to PostgreSQL
|
|
logger.info("Connecting to PostgreSQL...")
|
|
try:
|
|
engine = get_sync_engine(args.db_url)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
session = SessionLocal()
|
|
except Exception as e:
|
|
logger.error("Failed to connect to PostgreSQL: %s", e)
|
|
sys.exit(1)
|
|
|
|
# Load technique pages
|
|
pages = load_technique_pages(session, limit=args.limit)
|
|
total = len(pages)
|
|
logger.info("Loaded %d technique page(s)", total)
|
|
|
|
if total == 0:
|
|
logger.info("No pages to process.")
|
|
session.close()
|
|
return
|
|
|
|
# Clear existing documents if requested
|
|
if args.clear_first and not args.dry_run:
|
|
clear_all_documents(args.lightrag_url)
|
|
|
|
# Resume support — get already-processed sources (skip if --force)
|
|
processed_sources: set[str] = set()
|
|
if not args.dry_run and not args.force:
|
|
logger.info("Checking LightRAG for already-processed documents...")
|
|
processed_sources = get_processed_sources(args.lightrag_url)
|
|
logger.info("Found %d existing document(s) in LightRAG", len(processed_sources))
|
|
|
|
# Process pages
|
|
submitted = 0
|
|
skipped = 0
|
|
errors = 0
|
|
total_chars = 0
|
|
|
|
for i, page in enumerate(pages, 1):
|
|
slug = page.slug
|
|
source = file_source_for_page(page)
|
|
text = format_technique_page(page)
|
|
total_chars += len(text)
|
|
|
|
if args.dry_run:
|
|
if i == 1:
|
|
print("=" * 80)
|
|
print(f"PREVIEW: {page.title} ({source})")
|
|
print("=" * 80)
|
|
print(text)
|
|
print("=" * 80)
|
|
logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text))
|
|
continue
|
|
|
|
# Resume — skip already-processed
|
|
if source in processed_sources:
|
|
logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug)
|
|
skipped += 1
|
|
continue
|
|
|
|
# Submit
|
|
logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text))
|
|
result = submit_document(args.lightrag_url, text, source)
|
|
if result is None:
|
|
errors += 1
|
|
continue
|
|
|
|
status = result.get("status", "unknown")
|
|
if status == "duplicated":
|
|
logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug)
|
|
skipped += 1
|
|
continue
|
|
|
|
if status not in ("success", "partial_success"):
|
|
logger.error("[%d/%d] Unexpected status '%s' for %s: %s",
|
|
i, total, status, slug, result.get("message", ""))
|
|
errors += 1
|
|
continue
|
|
|
|
submitted += 1
|
|
logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug,
|
|
result.get("track_id", "?"))
|
|
|
|
# Wait for pipeline to finish before submitting next
|
|
logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug)
|
|
finished = wait_for_pipeline(args.lightrag_url)
|
|
if finished:
|
|
logger.info("[%d/%d] Processed: %s", i, total, slug)
|
|
else:
|
|
logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug)
|
|
|
|
session.close()
|
|
|
|
# Summary
|
|
print()
|
|
print(f"{'DRY RUN ' if args.dry_run else ''}Summary:")
|
|
print(f" Total pages: {total}")
|
|
print(f" Total chars: {total_chars:,}")
|
|
if not args.dry_run:
|
|
print(f" Submitted: {submitted}")
|
|
print(f" Skipped: {skipped}")
|
|
print(f" Errors: {errors}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|