feat: Created reindex_lightrag.py that extracts technique pages from Po…
- "backend/scripts/reindex_lightrag.py" GSD-Task: S04/T01
This commit is contained in:
parent
ea8ddf74f0
commit
bd2be703a5
1 changed files with 354 additions and 0 deletions
354
backend/scripts/reindex_lightrag.py
Normal file
354
backend/scripts/reindex_lightrag.py
Normal file
|
|
@ -0,0 +1,354 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Reindex all technique pages into LightRAG for entity/relationship extraction.
|
||||
|
||||
Connects to PostgreSQL, extracts formatted text from each technique page
|
||||
(with creator and key moment data), and submits to the LightRAG API.
|
||||
|
||||
Supports resume (skips already-processed file_sources) and dry-run mode.
|
||||
|
||||
Usage:
|
||||
# Dry run — format and preview without submitting
|
||||
python3 /app/scripts/reindex_lightrag.py --dry-run --limit 3
|
||||
|
||||
# Submit first 2 pages
|
||||
python3 /app/scripts/reindex_lightrag.py --limit 2
|
||||
|
||||
# Full reindex
|
||||
python3 /app/scripts/reindex_lightrag.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session, joinedload, sessionmaker
|
||||
|
||||
# Resolve imports whether run from /app/ (Docker) or backend/ (local)
|
||||
_script_dir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
|
||||
_backend_dir = os.path.dirname(_script_dir)
|
||||
sys.path.insert(0, _backend_dir)
|
||||
|
||||
from models import Creator, KeyMoment, TechniquePage # noqa: E402
|
||||
|
||||
logger = logging.getLogger("reindex_lightrag")
|
||||
|
||||
|
||||
# ── Database ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def get_sync_engine(db_url: str):
|
||||
"""Create a sync SQLAlchemy engine, converting async URL if needed."""
|
||||
url = db_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
|
||||
return create_engine(url, pool_pre_ping=True)
|
||||
|
||||
|
||||
def load_technique_pages(session: Session, limit: int | None = None) -> list[TechniquePage]:
|
||||
"""Load all technique pages with creator and key moments eagerly."""
|
||||
query = (
|
||||
session.query(TechniquePage)
|
||||
.options(
|
||||
joinedload(TechniquePage.creator),
|
||||
joinedload(TechniquePage.key_moments),
|
||||
)
|
||||
.order_by(TechniquePage.title)
|
||||
)
|
||||
if limit:
|
||||
query = query.limit(limit)
|
||||
return query.all()
|
||||
|
||||
|
||||
# ── Text formatting ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _format_v1_sections(body_sections: dict) -> str:
|
||||
"""Format v1 body_sections (flat dict: heading → content)."""
|
||||
parts = []
|
||||
for heading, content in body_sections.items():
|
||||
parts.append(f"## {heading}")
|
||||
if isinstance(content, str):
|
||||
parts.append(content)
|
||||
elif isinstance(content, list):
|
||||
parts.append("\n".join(str(item) for item in content))
|
||||
parts.append("")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def _format_v2_sections(body_sections: list[dict]) -> str:
|
||||
"""Format v2 body_sections (list of {heading, content, subsections})."""
|
||||
parts = []
|
||||
for section in body_sections:
|
||||
heading = section.get("heading", "")
|
||||
content = section.get("content", "")
|
||||
if heading:
|
||||
parts.append(f"## {heading}")
|
||||
if content:
|
||||
parts.append(content)
|
||||
# Flatten subsections
|
||||
for sub in section.get("subsections", []):
|
||||
sub_heading = sub.get("heading", "")
|
||||
sub_content = sub.get("content", "")
|
||||
if sub_heading:
|
||||
parts.append(f"### {sub_heading}")
|
||||
if sub_content:
|
||||
parts.append(sub_content)
|
||||
parts.append("")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def format_technique_page(page: TechniquePage) -> str:
|
||||
"""Convert a TechniquePage + relations into a rich text document for LightRAG."""
|
||||
lines = []
|
||||
|
||||
# Header metadata
|
||||
lines.append(f"Technique: {page.title}")
|
||||
if page.creator:
|
||||
lines.append(f"Creator: {page.creator.name}")
|
||||
lines.append(f"Category: {page.topic_category or 'Uncategorized'}")
|
||||
if page.topic_tags:
|
||||
lines.append(f"Tags: {', '.join(page.topic_tags)}")
|
||||
if page.plugins:
|
||||
lines.append(f"Plugins: {', '.join(page.plugins)}")
|
||||
lines.append("")
|
||||
|
||||
# Summary
|
||||
if page.summary:
|
||||
lines.append(f"Summary: {page.summary}")
|
||||
lines.append("")
|
||||
|
||||
# Body sections — handle both formats
|
||||
if page.body_sections:
|
||||
fmt = getattr(page, "body_sections_format", "v1") or "v1"
|
||||
if fmt == "v2" and isinstance(page.body_sections, list):
|
||||
lines.append(_format_v2_sections(page.body_sections))
|
||||
elif isinstance(page.body_sections, dict):
|
||||
lines.append(_format_v1_sections(page.body_sections))
|
||||
elif isinstance(page.body_sections, list):
|
||||
# v1 tag but list data — treat as v2
|
||||
lines.append(_format_v2_sections(page.body_sections))
|
||||
else:
|
||||
lines.append(str(page.body_sections))
|
||||
|
||||
# Key moments from source videos
|
||||
if page.key_moments:
|
||||
lines.append("Key Moments from Source Videos:")
|
||||
for km in page.key_moments:
|
||||
lines.append(f"- {km.title}: {km.summary}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
def file_source_for_page(page: TechniquePage) -> str:
|
||||
"""Deterministic file_source identifier for a technique page."""
|
||||
return f"technique:{page.slug}"
|
||||
|
||||
|
||||
# ── LightRAG API ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def get_processed_sources(lightrag_url: str) -> set[str]:
|
||||
"""Fetch all file_paths from LightRAG documents for resume support."""
|
||||
url = f"{lightrag_url}/documents"
|
||||
try:
|
||||
resp = httpx.get(url, timeout=30)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
except httpx.HTTPError as e:
|
||||
logger.warning("Failed to fetch existing documents for resume: %s", e)
|
||||
return set()
|
||||
|
||||
sources = set()
|
||||
statuses = data.get("statuses", {})
|
||||
for status_group in statuses.values():
|
||||
for doc in status_group:
|
||||
fp = doc.get("file_path")
|
||||
if fp:
|
||||
sources.add(fp)
|
||||
return sources
|
||||
|
||||
|
||||
def submit_document(lightrag_url: str, text: str, file_source: str) -> dict[str, Any] | None:
|
||||
"""Submit a text document to LightRAG. Returns response dict or None on error."""
|
||||
url = f"{lightrag_url}/documents/text"
|
||||
payload = {"text": text, "file_source": file_source}
|
||||
try:
|
||||
resp = httpx.post(url, json=payload, timeout=60)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except httpx.HTTPError as e:
|
||||
logger.error("Failed to submit document %s: %s", file_source, e)
|
||||
return None
|
||||
|
||||
|
||||
def wait_for_pipeline(lightrag_url: str, timeout: int = 600) -> bool:
|
||||
"""Poll pipeline_status until busy=false. Returns True if finished, False on timeout."""
|
||||
url = f"{lightrag_url}/documents/pipeline_status"
|
||||
start = time.monotonic()
|
||||
while time.monotonic() - start < timeout:
|
||||
try:
|
||||
resp = httpx.get(url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
if not data.get("busy", False):
|
||||
return True
|
||||
msg = data.get("latest_message", "")
|
||||
if msg:
|
||||
logger.debug(" Pipeline: %s", msg)
|
||||
except httpx.HTTPError as e:
|
||||
logger.warning(" Pipeline status check failed: %s", e)
|
||||
time.sleep(10)
|
||||
logger.warning("Pipeline did not finish within %ds timeout", timeout)
|
||||
return False
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Reindex technique pages into LightRAG"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lightrag-url",
|
||||
default=os.environ.get("LIGHTRAG_URL", "http://chrysopedia-lightrag:9621"),
|
||||
help="LightRAG API base URL (default: http://chrysopedia-lightrag:9621)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--db-url",
|
||||
default=os.environ.get(
|
||||
"DATABASE_URL",
|
||||
"postgresql+asyncpg://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia",
|
||||
),
|
||||
help="Database URL (async or sync format accepted)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Format and preview pages without submitting to LightRAG",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--limit",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Process only the first N pages (for testing)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose", "-v",
|
||||
action="store_true",
|
||||
help="Enable debug logging",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
|
||||
# Connect to PostgreSQL
|
||||
logger.info("Connecting to PostgreSQL...")
|
||||
try:
|
||||
engine = get_sync_engine(args.db_url)
|
||||
SessionLocal = sessionmaker(bind=engine)
|
||||
session = SessionLocal()
|
||||
except Exception as e:
|
||||
logger.error("Failed to connect to PostgreSQL: %s", e)
|
||||
sys.exit(1)
|
||||
|
||||
# Load technique pages
|
||||
pages = load_technique_pages(session, limit=args.limit)
|
||||
total = len(pages)
|
||||
logger.info("Loaded %d technique page(s)", total)
|
||||
|
||||
if total == 0:
|
||||
logger.info("No pages to process.")
|
||||
session.close()
|
||||
return
|
||||
|
||||
# Resume support — get already-processed sources
|
||||
processed_sources: set[str] = set()
|
||||
if not args.dry_run:
|
||||
logger.info("Checking LightRAG for already-processed documents...")
|
||||
processed_sources = get_processed_sources(args.lightrag_url)
|
||||
logger.info("Found %d existing document(s) in LightRAG", len(processed_sources))
|
||||
|
||||
# Process pages
|
||||
submitted = 0
|
||||
skipped = 0
|
||||
errors = 0
|
||||
total_chars = 0
|
||||
|
||||
for i, page in enumerate(pages, 1):
|
||||
slug = page.slug
|
||||
source = file_source_for_page(page)
|
||||
text = format_technique_page(page)
|
||||
total_chars += len(text)
|
||||
|
||||
if args.dry_run:
|
||||
if i == 1:
|
||||
print("=" * 80)
|
||||
print(f"PREVIEW: {page.title} ({source})")
|
||||
print("=" * 80)
|
||||
print(text)
|
||||
print("=" * 80)
|
||||
logger.info("[%d/%d] Formatted: %s (%d chars)", i, total, slug, len(text))
|
||||
continue
|
||||
|
||||
# Resume — skip already-processed
|
||||
if source in processed_sources:
|
||||
logger.info("[%d/%d] Skipped (already processed): %s", i, total, slug)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Submit
|
||||
logger.info("[%d/%d] Submitting: %s (%d chars)", i, total, slug, len(text))
|
||||
result = submit_document(args.lightrag_url, text, source)
|
||||
if result is None:
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
status = result.get("status", "unknown")
|
||||
if status == "duplicated":
|
||||
logger.info("[%d/%d] Duplicated (already in LightRAG): %s", i, total, slug)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
if status not in ("success", "partial_success"):
|
||||
logger.error("[%d/%d] Unexpected status '%s' for %s: %s",
|
||||
i, total, status, slug, result.get("message", ""))
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
submitted += 1
|
||||
logger.info("[%d/%d] Submitted: %s (track_id=%s)", i, total, slug,
|
||||
result.get("track_id", "?"))
|
||||
|
||||
# Wait for pipeline to finish before submitting next
|
||||
logger.info("[%d/%d] Waiting for pipeline to process %s...", i, total, slug)
|
||||
finished = wait_for_pipeline(args.lightrag_url)
|
||||
if finished:
|
||||
logger.info("[%d/%d] Processed: %s", i, total, slug)
|
||||
else:
|
||||
logger.warning("[%d/%d] Pipeline timeout for %s — continuing anyway", i, total, slug)
|
||||
|
||||
session.close()
|
||||
|
||||
# Summary
|
||||
print()
|
||||
print(f"{'DRY RUN ' if args.dry_run else ''}Summary:")
|
||||
print(f" Total pages: {total}")
|
||||
print(f" Total chars: {total_chars:,}")
|
||||
if not args.dry_run:
|
||||
print(f" Submitted: {submitted}")
|
||||
print(f" Skipped: {skipped}")
|
||||
print(f" Errors: {errors}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Reference in a new issue