chrysopedia/backend/pipeline/export_fixture.py
jlightner f0d0b8ac1a feat: add pipeline iteration tooling — offline test harness, stage re-runs, chunking inspector
Drops prompt iteration cycles from 20-30 min to under 5 min by enabling
stage-isolated re-runs and offline prompt testing against exported fixtures.

Phase 1: Offline prompt test harness
- export_fixture.py: export stage 5 inputs from DB to reusable JSON fixtures
- test_harness.py: run synthesis offline with any prompt, no Docker needed
- promote subcommand: deploy winning prompts with backup and optional git commit

Phase 2: Classification data persistence
- Dual-write classification to PostgreSQL + Redis (fixes 24hr TTL data loss)
- Clean retrigger now clears Redis cache keys (fixes stale data bug)
- Alembic migration 011: classification_data JSONB column + stage_rerun enum

Phase 3: Stage-isolated re-run
- run_single_stage Celery task with prerequisite validation and prompt overrides
- _load_prompt supports per-video Redis overrides for testing custom prompts
- POST /admin/pipeline/rerun-stage/{video_id}/{stage_name} endpoint
- Frontend: Re-run Stage modal with stage selector and prompt override textarea

Phase 4: Chunking inspector
- GET /admin/pipeline/chunking/{video_id} returns topic boundaries,
  classifications, and synthesis group breakdowns
- Frontend: collapsible Chunking Inspector panel per video

Phase 5: Prompt deployment & stale data cleanup
- GET /admin/pipeline/stale-pages detects pages from older prompts
- POST /admin/pipeline/bulk-resynthesize re-runs a stage on all completed videos
- Frontend: stale pages indicator badge with one-click bulk re-synth

Phase 6: Automated iteration foundation
- Quality CLI --video-id flag auto-exports fixture from DB
- POST /admin/pipeline/optimize-prompt/{stage} dispatches optimization as Celery task

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:47:46 +00:00

306 lines
11 KiB
Python

"""Export pipeline stage inputs for a video as a reusable JSON fixture.
Connects to the live database, queries KeyMoments and classification data,
and writes a fixture file that the test harness can consume offline.
Usage:
python -m pipeline.export_fixture --video-id <uuid> --output fixtures/video.json
python -m pipeline.export_fixture --video-id <uuid> # prints to stdout
python -m pipeline.export_fixture --list # list available videos
Requires: DATABASE_URL, REDIS_URL environment variables (or .env file).
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter
from pathlib import Path
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
def _log(tag: str, msg: str, level: str = "INFO") -> None:
"""Write structured log line to stderr."""
print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr)
def _get_sync_session(database_url: str) -> Session:
"""Create a sync SQLAlchemy session from the database URL."""
url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
engine = create_engine(url, pool_pre_ping=True)
factory = sessionmaker(bind=engine)
return factory()
def _list_videos(database_url: str) -> int:
"""List all videos with their processing status and moment counts."""
from models import Creator, KeyMoment, SourceVideo
session = _get_sync_session(database_url)
try:
videos = (
session.execute(
select(SourceVideo).order_by(SourceVideo.created_at.desc())
)
.scalars()
.all()
)
if not videos:
_log("LIST", "No videos found in database")
return 0
print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr)
print(f"{''*38} {''*14} {''*7} {''*20} {''*40}", file=sys.stderr)
for video in videos:
moment_count = (
session.execute(
select(KeyMoment.id).where(KeyMoment.source_video_id == video.id)
)
.scalars()
.all()
)
creator = session.execute(
select(Creator).where(Creator.id == video.creator_id)
).scalar_one_or_none()
creator_name = creator.name if creator else "?"
print(
f"{str(video.id):<38s} {video.processing_status.value:<14s} "
f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}",
file=sys.stderr,
)
print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr)
return 0
finally:
session.close()
def export_fixture(
database_url: str,
redis_url: str,
video_id: str,
output_path: str | None = None,
) -> int:
"""Export stage 5 inputs for a video as a JSON fixture.
Returns exit code: 0 = success, 1 = error.
"""
from models import Creator, KeyMoment, SourceVideo
start = time.monotonic()
_log("CONNECT", "Connecting to database...")
session = _get_sync_session(database_url)
try:
# ── Load video ──────────────────────────────────────────────────
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video is None:
_log("ERROR", f"Video not found: {video_id}", level="ERROR")
return 1
creator = session.execute(
select(Creator).where(Creator.id == video.creator_id)
).scalar_one_or_none()
creator_name = creator.name if creator else "Unknown"
_log(
"VIDEO",
f"Found: {video.filename} by {creator_name} "
f"({video.duration_seconds or '?'}s, {video.content_type.value}, "
f"status={video.processing_status.value})",
)
# ── Load key moments ────────────────────────────────────────────
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
_log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR")
_log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR")
return 1
time_min = min(m.start_time for m in moments)
time_max = max(m.end_time for m in moments)
_log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)")
# ── Load classification data ────────────────────────────────────
classification_data: list[dict] = []
cls_source = "missing"
# Try Redis first
try:
import redis as redis_lib
r = redis_lib.Redis.from_url(redis_url)
key = f"chrysopedia:classification:{video_id}"
raw = r.get(key)
if raw is not None:
classification_data = json.loads(raw)
cls_source = "redis"
ttl = r.ttl(key)
_log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)")
except Exception as exc:
_log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN")
# Fallback: check SourceVideo.classification_data column (Phase 2 addition)
if not classification_data:
video_cls = getattr(video, "classification_data", None)
if video_cls:
classification_data = video_cls
cls_source = "postgresql"
_log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)")
if not classification_data:
_log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN")
_log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN")
cls_source = "missing"
# Build classification lookup by moment_id
cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
# Count moments without classification
unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id)
if unclassified > 0:
_log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN")
# ── Build fixture ───────────────────────────────────────────────
fixture_moments = []
category_counts: Counter[str] = Counter()
for m in moments:
cls_info = cls_by_moment_id.get(str(m.id), {})
topic_category = cls_info.get("topic_category", "Uncategorized")
topic_tags = cls_info.get("topic_tags", [])
category_counts[topic_category] += 1
fixture_moments.append({
"moment_id": str(m.id),
"title": m.title,
"summary": m.summary,
"content_type": m.content_type.value,
"start_time": m.start_time,
"end_time": m.end_time,
"plugins": m.plugins or [],
"raw_transcript": m.raw_transcript or "",
# Classification data (stage 4 output)
"classification": {
"topic_category": topic_category,
"topic_tags": topic_tags,
},
# Compatibility fields for existing quality/scorer format
"transcript_excerpt": (m.raw_transcript or "")[:500],
"topic_tags": topic_tags,
"topic_category": topic_category,
})
fixture = {
"video_id": str(video.id),
"creator_name": creator_name,
"content_type": video.content_type.value,
"filename": video.filename,
"duration_seconds": video.duration_seconds,
"classification_source": cls_source,
"export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"moments": fixture_moments,
}
fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False)
fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024
# ── Output ──────────────────────────────────────────────────────
if output_path:
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
Path(output_path).write_text(fixture_json, encoding="utf-8")
_log(
"OUTPUT",
f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, "
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
)
else:
# Print fixture JSON to stdout
print(fixture_json)
_log(
"OUTPUT",
f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, "
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
)
# Category breakdown
for cat, count in category_counts.most_common():
_log("CATEGORY", f" {cat}: {count} moments")
elapsed = time.monotonic() - start
_log("DONE", f"Export completed in {elapsed:.1f}s")
return 0
except Exception as exc:
_log("ERROR", f"Export failed: {exc}", level="ERROR")
import traceback
traceback.print_exc(file=sys.stderr)
return 1
finally:
session.close()
def main() -> int:
parser = argparse.ArgumentParser(
prog="pipeline.export_fixture",
description="Export pipeline stage inputs for a video as a reusable JSON fixture",
)
parser.add_argument(
"--video-id",
type=str,
help="UUID of the video to export",
)
parser.add_argument(
"--output", "-o",
type=str,
default=None,
help="Output file path (default: print to stdout)",
)
parser.add_argument(
"--list",
action="store_true",
default=False,
help="List all videos with status and moment counts",
)
args = parser.parse_args()
# Load settings
from config import get_settings
settings = get_settings()
if args.list:
return _list_videos(settings.database_url)
if not args.video_id:
parser.error("--video-id is required (or use --list to see available videos)")
return export_fixture(
database_url=settings.database_url,
redis_url=settings.redis_url,
video_id=args.video_id,
output_path=args.output,
)
if __name__ == "__main__":
sys.exit(main())