"""Export pipeline stage inputs for a video as a reusable JSON fixture. Connects to the live database, queries KeyMoments and classification data, and writes a fixture file that the test harness can consume offline. Usage: python -m pipeline.export_fixture --video-id --output fixtures/video.json python -m pipeline.export_fixture --video-id # prints to stdout python -m pipeline.export_fixture --list # list available videos Requires: DATABASE_URL, REDIS_URL environment variables (or .env file). """ from __future__ import annotations import argparse import json import sys import time from collections import Counter from pathlib import Path from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, sessionmaker def _log(tag: str, msg: str, level: str = "INFO") -> None: """Write structured log line to stderr.""" print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr) def _get_sync_session(database_url: str) -> Session: """Create a sync SQLAlchemy session from the database URL.""" url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") engine = create_engine(url, pool_pre_ping=True) factory = sessionmaker(bind=engine) return factory() def _list_videos(database_url: str) -> int: """List all videos with their processing status and moment counts.""" from models import Creator, KeyMoment, SourceVideo session = _get_sync_session(database_url) try: videos = ( session.execute( select(SourceVideo).order_by(SourceVideo.created_at.desc()) ) .scalars() .all() ) if not videos: _log("LIST", "No videos found in database") return 0 print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr) print(f"{'─'*38} {'─'*14} {'─'*7} {'─'*20} {'─'*40}", file=sys.stderr) for video in videos: moment_count = ( session.execute( select(KeyMoment.id).where(KeyMoment.source_video_id == video.id) ) .scalars() .all() ) creator = session.execute( select(Creator).where(Creator.id == video.creator_id) ).scalar_one_or_none() creator_name = creator.name if creator else "?" print( f"{str(video.id):<38s} {video.processing_status.value:<14s} " f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}", file=sys.stderr, ) print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr) return 0 finally: session.close() def export_fixture( database_url: str, redis_url: str, video_id: str, output_path: str | None = None, ) -> int: """Export stage 5 inputs for a video as a JSON fixture. Returns exit code: 0 = success, 1 = error. """ from models import Creator, KeyMoment, SourceVideo start = time.monotonic() _log("CONNECT", "Connecting to database...") session = _get_sync_session(database_url) try: # ── Load video ────────────────────────────────────────────────── video = session.execute( select(SourceVideo).where(SourceVideo.id == video_id) ).scalar_one_or_none() if video is None: _log("ERROR", f"Video not found: {video_id}", level="ERROR") return 1 creator = session.execute( select(Creator).where(Creator.id == video.creator_id) ).scalar_one_or_none() creator_name = creator.name if creator else "Unknown" _log( "VIDEO", f"Found: {video.filename} by {creator_name} " f"({video.duration_seconds or '?'}s, {video.content_type.value}, " f"status={video.processing_status.value})", ) # ── Load key moments ──────────────────────────────────────────── moments = ( session.execute( select(KeyMoment) .where(KeyMoment.source_video_id == video_id) .order_by(KeyMoment.start_time) ) .scalars() .all() ) if not moments: _log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR") _log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR") return 1 time_min = min(m.start_time for m in moments) time_max = max(m.end_time for m in moments) _log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)") # ── Load classification data ──────────────────────────────────── classification_data: list[dict] = [] cls_source = "missing" # Try Redis first try: import redis as redis_lib r = redis_lib.Redis.from_url(redis_url) key = f"chrysopedia:classification:{video_id}" raw = r.get(key) if raw is not None: classification_data = json.loads(raw) cls_source = "redis" ttl = r.ttl(key) _log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)") except Exception as exc: _log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN") # Fallback: check SourceVideo.classification_data column (Phase 2 addition) if not classification_data: video_cls = getattr(video, "classification_data", None) if video_cls: classification_data = video_cls cls_source = "postgresql" _log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)") if not classification_data: _log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN") _log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN") cls_source = "missing" # Build classification lookup by moment_id cls_by_moment_id = {c["moment_id"]: c for c in classification_data} # Count moments without classification unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id) if unclassified > 0: _log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN") # ── Build fixture ─────────────────────────────────────────────── fixture_moments = [] category_counts: Counter[str] = Counter() for m in moments: cls_info = cls_by_moment_id.get(str(m.id), {}) topic_category = cls_info.get("topic_category", "Uncategorized") topic_tags = cls_info.get("topic_tags", []) category_counts[topic_category] += 1 fixture_moments.append({ "moment_id": str(m.id), "title": m.title, "summary": m.summary, "content_type": m.content_type.value, "start_time": m.start_time, "end_time": m.end_time, "plugins": m.plugins or [], "raw_transcript": m.raw_transcript or "", # Classification data (stage 4 output) "classification": { "topic_category": topic_category, "topic_tags": topic_tags, }, # Compatibility fields for existing quality/scorer format "transcript_excerpt": (m.raw_transcript or "")[:500], "topic_tags": topic_tags, "topic_category": topic_category, }) fixture = { "video_id": str(video.id), "creator_name": creator_name, "content_type": video.content_type.value, "filename": video.filename, "duration_seconds": video.duration_seconds, "classification_source": cls_source, "export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "moments": fixture_moments, } fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False) fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024 # ── Output ────────────────────────────────────────────────────── if output_path: Path(output_path).parent.mkdir(parents=True, exist_ok=True) Path(output_path).write_text(fixture_json, encoding="utf-8") _log( "OUTPUT", f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, " f"{len(fixture_moments)} moments, {len(category_counts)} categories)", ) else: # Print fixture JSON to stdout print(fixture_json) _log( "OUTPUT", f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, " f"{len(fixture_moments)} moments, {len(category_counts)} categories)", ) # Category breakdown for cat, count in category_counts.most_common(): _log("CATEGORY", f" {cat}: {count} moments") elapsed = time.monotonic() - start _log("DONE", f"Export completed in {elapsed:.1f}s") return 0 except Exception as exc: _log("ERROR", f"Export failed: {exc}", level="ERROR") import traceback traceback.print_exc(file=sys.stderr) return 1 finally: session.close() def main() -> int: parser = argparse.ArgumentParser( prog="pipeline.export_fixture", description="Export pipeline stage inputs for a video as a reusable JSON fixture", ) parser.add_argument( "--video-id", type=str, help="UUID of the video to export", ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: print to stdout)", ) parser.add_argument( "--list", action="store_true", default=False, help="List all videos with status and moment counts", ) args = parser.parse_args() # Load settings from config import get_settings settings = get_settings() if args.list: return _list_videos(settings.database_url) if not args.video_id: parser.error("--video-id is required (or use --list to see available videos)") return export_fixture( database_url=settings.database_url, redis_url=settings.redis_url, video_id=args.video_id, output_path=args.output, ) if __name__ == "__main__": sys.exit(main())