Drops prompt iteration cycles from 20-30 min to under 5 min by enabling
stage-isolated re-runs and offline prompt testing against exported fixtures.
Phase 1: Offline prompt test harness
- export_fixture.py: export stage 5 inputs from DB to reusable JSON fixtures
- test_harness.py: run synthesis offline with any prompt, no Docker needed
- promote subcommand: deploy winning prompts with backup and optional git commit
Phase 2: Classification data persistence
- Dual-write classification to PostgreSQL + Redis (fixes 24hr TTL data loss)
- Clean retrigger now clears Redis cache keys (fixes stale data bug)
- Alembic migration 011: classification_data JSONB column + stage_rerun enum
Phase 3: Stage-isolated re-run
- run_single_stage Celery task with prerequisite validation and prompt overrides
- _load_prompt supports per-video Redis overrides for testing custom prompts
- POST /admin/pipeline/rerun-stage/{video_id}/{stage_name} endpoint
- Frontend: Re-run Stage modal with stage selector and prompt override textarea
Phase 4: Chunking inspector
- GET /admin/pipeline/chunking/{video_id} returns topic boundaries,
classifications, and synthesis group breakdowns
- Frontend: collapsible Chunking Inspector panel per video
Phase 5: Prompt deployment & stale data cleanup
- GET /admin/pipeline/stale-pages detects pages from older prompts
- POST /admin/pipeline/bulk-resynthesize re-runs a stage on all completed videos
- Frontend: stale pages indicator badge with one-click bulk re-synth
Phase 6: Automated iteration foundation
- Quality CLI --video-id flag auto-exports fixture from DB
- POST /admin/pipeline/optimize-prompt/{stage} dispatches optimization as Celery task
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
306 lines
11 KiB
Python
306 lines
11 KiB
Python
"""Export pipeline stage inputs for a video as a reusable JSON fixture.
|
|
|
|
Connects to the live database, queries KeyMoments and classification data,
|
|
and writes a fixture file that the test harness can consume offline.
|
|
|
|
Usage:
|
|
python -m pipeline.export_fixture --video-id <uuid> --output fixtures/video.json
|
|
python -m pipeline.export_fixture --video-id <uuid> # prints to stdout
|
|
python -m pipeline.export_fixture --list # list available videos
|
|
|
|
Requires: DATABASE_URL, REDIS_URL environment variables (or .env file).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
|
|
def _log(tag: str, msg: str, level: str = "INFO") -> None:
|
|
"""Write structured log line to stderr."""
|
|
print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr)
|
|
|
|
|
|
def _get_sync_session(database_url: str) -> Session:
|
|
"""Create a sync SQLAlchemy session from the database URL."""
|
|
url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
|
|
engine = create_engine(url, pool_pre_ping=True)
|
|
factory = sessionmaker(bind=engine)
|
|
return factory()
|
|
|
|
|
|
def _list_videos(database_url: str) -> int:
|
|
"""List all videos with their processing status and moment counts."""
|
|
from models import Creator, KeyMoment, SourceVideo
|
|
|
|
session = _get_sync_session(database_url)
|
|
try:
|
|
videos = (
|
|
session.execute(
|
|
select(SourceVideo).order_by(SourceVideo.created_at.desc())
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
if not videos:
|
|
_log("LIST", "No videos found in database")
|
|
return 0
|
|
|
|
print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr)
|
|
print(f"{'─'*38} {'─'*14} {'─'*7} {'─'*20} {'─'*40}", file=sys.stderr)
|
|
|
|
for video in videos:
|
|
moment_count = (
|
|
session.execute(
|
|
select(KeyMoment.id).where(KeyMoment.source_video_id == video.id)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
creator = session.execute(
|
|
select(Creator).where(Creator.id == video.creator_id)
|
|
).scalar_one_or_none()
|
|
creator_name = creator.name if creator else "?"
|
|
|
|
print(
|
|
f"{str(video.id):<38s} {video.processing_status.value:<14s} "
|
|
f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr)
|
|
return 0
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def export_fixture(
|
|
database_url: str,
|
|
redis_url: str,
|
|
video_id: str,
|
|
output_path: str | None = None,
|
|
) -> int:
|
|
"""Export stage 5 inputs for a video as a JSON fixture.
|
|
|
|
Returns exit code: 0 = success, 1 = error.
|
|
"""
|
|
from models import Creator, KeyMoment, SourceVideo
|
|
|
|
start = time.monotonic()
|
|
_log("CONNECT", "Connecting to database...")
|
|
|
|
session = _get_sync_session(database_url)
|
|
try:
|
|
# ── Load video ──────────────────────────────────────────────────
|
|
video = session.execute(
|
|
select(SourceVideo).where(SourceVideo.id == video_id)
|
|
).scalar_one_or_none()
|
|
|
|
if video is None:
|
|
_log("ERROR", f"Video not found: {video_id}", level="ERROR")
|
|
return 1
|
|
|
|
creator = session.execute(
|
|
select(Creator).where(Creator.id == video.creator_id)
|
|
).scalar_one_or_none()
|
|
creator_name = creator.name if creator else "Unknown"
|
|
|
|
_log(
|
|
"VIDEO",
|
|
f"Found: {video.filename} by {creator_name} "
|
|
f"({video.duration_seconds or '?'}s, {video.content_type.value}, "
|
|
f"status={video.processing_status.value})",
|
|
)
|
|
|
|
# ── Load key moments ────────────────────────────────────────────
|
|
moments = (
|
|
session.execute(
|
|
select(KeyMoment)
|
|
.where(KeyMoment.source_video_id == video_id)
|
|
.order_by(KeyMoment.start_time)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
if not moments:
|
|
_log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR")
|
|
_log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR")
|
|
return 1
|
|
|
|
time_min = min(m.start_time for m in moments)
|
|
time_max = max(m.end_time for m in moments)
|
|
_log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)")
|
|
|
|
# ── Load classification data ────────────────────────────────────
|
|
classification_data: list[dict] = []
|
|
cls_source = "missing"
|
|
|
|
# Try Redis first
|
|
try:
|
|
import redis as redis_lib
|
|
|
|
r = redis_lib.Redis.from_url(redis_url)
|
|
key = f"chrysopedia:classification:{video_id}"
|
|
raw = r.get(key)
|
|
if raw is not None:
|
|
classification_data = json.loads(raw)
|
|
cls_source = "redis"
|
|
ttl = r.ttl(key)
|
|
_log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)")
|
|
except Exception as exc:
|
|
_log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN")
|
|
|
|
# Fallback: check SourceVideo.classification_data column (Phase 2 addition)
|
|
if not classification_data:
|
|
video_cls = getattr(video, "classification_data", None)
|
|
if video_cls:
|
|
classification_data = video_cls
|
|
cls_source = "postgresql"
|
|
_log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)")
|
|
|
|
if not classification_data:
|
|
_log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN")
|
|
_log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN")
|
|
cls_source = "missing"
|
|
|
|
# Build classification lookup by moment_id
|
|
cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
|
|
|
|
# Count moments without classification
|
|
unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id)
|
|
if unclassified > 0:
|
|
_log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN")
|
|
|
|
# ── Build fixture ───────────────────────────────────────────────
|
|
fixture_moments = []
|
|
category_counts: Counter[str] = Counter()
|
|
|
|
for m in moments:
|
|
cls_info = cls_by_moment_id.get(str(m.id), {})
|
|
topic_category = cls_info.get("topic_category", "Uncategorized")
|
|
topic_tags = cls_info.get("topic_tags", [])
|
|
category_counts[topic_category] += 1
|
|
|
|
fixture_moments.append({
|
|
"moment_id": str(m.id),
|
|
"title": m.title,
|
|
"summary": m.summary,
|
|
"content_type": m.content_type.value,
|
|
"start_time": m.start_time,
|
|
"end_time": m.end_time,
|
|
"plugins": m.plugins or [],
|
|
"raw_transcript": m.raw_transcript or "",
|
|
# Classification data (stage 4 output)
|
|
"classification": {
|
|
"topic_category": topic_category,
|
|
"topic_tags": topic_tags,
|
|
},
|
|
# Compatibility fields for existing quality/scorer format
|
|
"transcript_excerpt": (m.raw_transcript or "")[:500],
|
|
"topic_tags": topic_tags,
|
|
"topic_category": topic_category,
|
|
})
|
|
|
|
fixture = {
|
|
"video_id": str(video.id),
|
|
"creator_name": creator_name,
|
|
"content_type": video.content_type.value,
|
|
"filename": video.filename,
|
|
"duration_seconds": video.duration_seconds,
|
|
"classification_source": cls_source,
|
|
"export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"moments": fixture_moments,
|
|
}
|
|
|
|
fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False)
|
|
fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024
|
|
|
|
# ── Output ──────────────────────────────────────────────────────
|
|
if output_path:
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
Path(output_path).write_text(fixture_json, encoding="utf-8")
|
|
_log(
|
|
"OUTPUT",
|
|
f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, "
|
|
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
|
|
)
|
|
else:
|
|
# Print fixture JSON to stdout
|
|
print(fixture_json)
|
|
_log(
|
|
"OUTPUT",
|
|
f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, "
|
|
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
|
|
)
|
|
|
|
# Category breakdown
|
|
for cat, count in category_counts.most_common():
|
|
_log("CATEGORY", f" {cat}: {count} moments")
|
|
|
|
elapsed = time.monotonic() - start
|
|
_log("DONE", f"Export completed in {elapsed:.1f}s")
|
|
return 0
|
|
|
|
except Exception as exc:
|
|
_log("ERROR", f"Export failed: {exc}", level="ERROR")
|
|
import traceback
|
|
traceback.print_exc(file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="pipeline.export_fixture",
|
|
description="Export pipeline stage inputs for a video as a reusable JSON fixture",
|
|
)
|
|
parser.add_argument(
|
|
"--video-id",
|
|
type=str,
|
|
help="UUID of the video to export",
|
|
)
|
|
parser.add_argument(
|
|
"--output", "-o",
|
|
type=str,
|
|
default=None,
|
|
help="Output file path (default: print to stdout)",
|
|
)
|
|
parser.add_argument(
|
|
"--list",
|
|
action="store_true",
|
|
default=False,
|
|
help="List all videos with status and moment counts",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load settings
|
|
from config import get_settings
|
|
settings = get_settings()
|
|
|
|
if args.list:
|
|
return _list_videos(settings.database_url)
|
|
|
|
if not args.video_id:
|
|
parser.error("--video-id is required (or use --list to see available videos)")
|
|
|
|
return export_fixture(
|
|
database_url=settings.database_url,
|
|
redis_url=settings.redis_url,
|
|
video_id=args.video_id,
|
|
output_path=args.output,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|