diff --git a/alembic/versions/011_classification_cache_and_stage_rerun.py b/alembic/versions/011_classification_cache_and_stage_rerun.py new file mode 100644 index 0000000..4cf4cb8 --- /dev/null +++ b/alembic/versions/011_classification_cache_and_stage_rerun.py @@ -0,0 +1,35 @@ +"""Add classification_data JSONB column to source_videos and stage_rerun trigger. + +Persists stage 4 classification data in PostgreSQL alongside Redis cache, +eliminating the 24-hour TTL data loss risk. Also adds the 'stage_rerun' +trigger value for single-stage re-run support. + +Revision ID: 011_classification_cache_and_stage_rerun +Revises: 010_add_pipeline_runs +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + +revision = "011_classification_cache_and_stage_rerun" +down_revision = "010_add_pipeline_runs" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add classification_data column to source_videos + op.add_column( + "source_videos", + sa.Column("classification_data", JSONB, nullable=True), + ) + + # Add 'stage_rerun' to the pipeline_run_trigger enum + # PostgreSQL enums require ALTER TYPE to add values + op.execute("ALTER TYPE pipeline_run_trigger ADD VALUE IF NOT EXISTS 'stage_rerun'") + + +def downgrade() -> None: + op.drop_column("source_videos", "classification_data") + # Note: PostgreSQL does not support removing values from enums. + # The 'stage_rerun' value will remain but be unused after downgrade. diff --git a/backend/models.py b/backend/models.py index f2e0fac..5a0e3ed 100644 --- a/backend/models.py +++ b/backend/models.py @@ -138,6 +138,7 @@ class SourceVideo(Base): default=ProcessingStatus.not_started, server_default="not_started", ) + classification_data: Mapped[list | None] = mapped_column(JSONB, nullable=True) created_at: Mapped[datetime] = mapped_column( default=_now, server_default=func.now() ) @@ -377,6 +378,7 @@ class PipelineRunTrigger(str, enum.Enum): clean_reprocess = "clean_reprocess" auto_ingest = "auto_ingest" bulk = "bulk" + stage_rerun = "stage_rerun" class PipelineRun(Base): diff --git a/backend/pipeline/export_fixture.py b/backend/pipeline/export_fixture.py new file mode 100644 index 0000000..25b7856 --- /dev/null +++ b/backend/pipeline/export_fixture.py @@ -0,0 +1,306 @@ +"""Export pipeline stage inputs for a video as a reusable JSON fixture. + +Connects to the live database, queries KeyMoments and classification data, +and writes a fixture file that the test harness can consume offline. + +Usage: + python -m pipeline.export_fixture --video-id --output fixtures/video.json + python -m pipeline.export_fixture --video-id # prints to stdout + python -m pipeline.export_fixture --list # list available videos + +Requires: DATABASE_URL, REDIS_URL environment variables (or .env file). +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from collections import Counter +from pathlib import Path + +from sqlalchemy import create_engine, select +from sqlalchemy.orm import Session, sessionmaker + + +def _log(tag: str, msg: str, level: str = "INFO") -> None: + """Write structured log line to stderr.""" + print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr) + + +def _get_sync_session(database_url: str) -> Session: + """Create a sync SQLAlchemy session from the database URL.""" + url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") + engine = create_engine(url, pool_pre_ping=True) + factory = sessionmaker(bind=engine) + return factory() + + +def _list_videos(database_url: str) -> int: + """List all videos with their processing status and moment counts.""" + from models import Creator, KeyMoment, SourceVideo + + session = _get_sync_session(database_url) + try: + videos = ( + session.execute( + select(SourceVideo).order_by(SourceVideo.created_at.desc()) + ) + .scalars() + .all() + ) + if not videos: + _log("LIST", "No videos found in database") + return 0 + + print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr) + print(f"{'─'*38} {'─'*14} {'─'*7} {'─'*20} {'─'*40}", file=sys.stderr) + + for video in videos: + moment_count = ( + session.execute( + select(KeyMoment.id).where(KeyMoment.source_video_id == video.id) + ) + .scalars() + .all() + ) + creator = session.execute( + select(Creator).where(Creator.id == video.creator_id) + ).scalar_one_or_none() + creator_name = creator.name if creator else "?" + + print( + f"{str(video.id):<38s} {video.processing_status.value:<14s} " + f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}", + file=sys.stderr, + ) + + print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr) + return 0 + finally: + session.close() + + +def export_fixture( + database_url: str, + redis_url: str, + video_id: str, + output_path: str | None = None, +) -> int: + """Export stage 5 inputs for a video as a JSON fixture. + + Returns exit code: 0 = success, 1 = error. + """ + from models import Creator, KeyMoment, SourceVideo + + start = time.monotonic() + _log("CONNECT", "Connecting to database...") + + session = _get_sync_session(database_url) + try: + # ── Load video ────────────────────────────────────────────────── + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one_or_none() + + if video is None: + _log("ERROR", f"Video not found: {video_id}", level="ERROR") + return 1 + + creator = session.execute( + select(Creator).where(Creator.id == video.creator_id) + ).scalar_one_or_none() + creator_name = creator.name if creator else "Unknown" + + _log( + "VIDEO", + f"Found: {video.filename} by {creator_name} " + f"({video.duration_seconds or '?'}s, {video.content_type.value}, " + f"status={video.processing_status.value})", + ) + + # ── Load key moments ──────────────────────────────────────────── + moments = ( + session.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + .scalars() + .all() + ) + + if not moments: + _log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR") + _log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR") + return 1 + + time_min = min(m.start_time for m in moments) + time_max = max(m.end_time for m in moments) + _log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)") + + # ── Load classification data ──────────────────────────────────── + classification_data: list[dict] = [] + cls_source = "missing" + + # Try Redis first + try: + import redis as redis_lib + + r = redis_lib.Redis.from_url(redis_url) + key = f"chrysopedia:classification:{video_id}" + raw = r.get(key) + if raw is not None: + classification_data = json.loads(raw) + cls_source = "redis" + ttl = r.ttl(key) + _log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)") + except Exception as exc: + _log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN") + + # Fallback: check SourceVideo.classification_data column (Phase 2 addition) + if not classification_data: + video_cls = getattr(video, "classification_data", None) + if video_cls: + classification_data = video_cls + cls_source = "postgresql" + _log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)") + + if not classification_data: + _log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN") + _log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN") + cls_source = "missing" + + # Build classification lookup by moment_id + cls_by_moment_id = {c["moment_id"]: c for c in classification_data} + + # Count moments without classification + unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id) + if unclassified > 0: + _log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN") + + # ── Build fixture ─────────────────────────────────────────────── + fixture_moments = [] + category_counts: Counter[str] = Counter() + + for m in moments: + cls_info = cls_by_moment_id.get(str(m.id), {}) + topic_category = cls_info.get("topic_category", "Uncategorized") + topic_tags = cls_info.get("topic_tags", []) + category_counts[topic_category] += 1 + + fixture_moments.append({ + "moment_id": str(m.id), + "title": m.title, + "summary": m.summary, + "content_type": m.content_type.value, + "start_time": m.start_time, + "end_time": m.end_time, + "plugins": m.plugins or [], + "raw_transcript": m.raw_transcript or "", + # Classification data (stage 4 output) + "classification": { + "topic_category": topic_category, + "topic_tags": topic_tags, + }, + # Compatibility fields for existing quality/scorer format + "transcript_excerpt": (m.raw_transcript or "")[:500], + "topic_tags": topic_tags, + "topic_category": topic_category, + }) + + fixture = { + "video_id": str(video.id), + "creator_name": creator_name, + "content_type": video.content_type.value, + "filename": video.filename, + "duration_seconds": video.duration_seconds, + "classification_source": cls_source, + "export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "moments": fixture_moments, + } + + fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False) + fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024 + + # ── Output ────────────────────────────────────────────────────── + if output_path: + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + Path(output_path).write_text(fixture_json, encoding="utf-8") + _log( + "OUTPUT", + f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, " + f"{len(fixture_moments)} moments, {len(category_counts)} categories)", + ) + else: + # Print fixture JSON to stdout + print(fixture_json) + _log( + "OUTPUT", + f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, " + f"{len(fixture_moments)} moments, {len(category_counts)} categories)", + ) + + # Category breakdown + for cat, count in category_counts.most_common(): + _log("CATEGORY", f" {cat}: {count} moments") + + elapsed = time.monotonic() - start + _log("DONE", f"Export completed in {elapsed:.1f}s") + return 0 + + except Exception as exc: + _log("ERROR", f"Export failed: {exc}", level="ERROR") + import traceback + traceback.print_exc(file=sys.stderr) + return 1 + finally: + session.close() + + +def main() -> int: + parser = argparse.ArgumentParser( + prog="pipeline.export_fixture", + description="Export pipeline stage inputs for a video as a reusable JSON fixture", + ) + parser.add_argument( + "--video-id", + type=str, + help="UUID of the video to export", + ) + parser.add_argument( + "--output", "-o", + type=str, + default=None, + help="Output file path (default: print to stdout)", + ) + parser.add_argument( + "--list", + action="store_true", + default=False, + help="List all videos with status and moment counts", + ) + + args = parser.parse_args() + + # Load settings + from config import get_settings + settings = get_settings() + + if args.list: + return _list_videos(settings.database_url) + + if not args.video_id: + parser.error("--video-id is required (or use --list to see available videos)") + + return export_fixture( + database_url=settings.database_url, + redis_url=settings.redis_url, + video_id=args.video_id, + output_path=args.output, + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/backend/pipeline/quality/__main__.py b/backend/pipeline/quality/__main__.py index 091826a..8c6d062 100644 --- a/backend/pipeline/quality/__main__.py +++ b/backend/pipeline/quality/__main__.py @@ -236,12 +236,17 @@ def main() -> int: default=2, help="Variants generated per iteration (default: 2)", ) - opt_parser.add_argument( + opt_source = opt_parser.add_mutually_exclusive_group(required=True) + opt_source.add_argument( "--file", type=str, - required=True, help="Path to moments JSON fixture file", ) + opt_source.add_argument( + "--video-id", + type=str, + help="Video UUID — exports fixture from DB automatically (requires DATABASE_URL, REDIS_URL)", + ) opt_parser.add_argument( "--output-dir", type=str, @@ -360,10 +365,34 @@ def _run_optimize(args: argparse.Namespace) -> int: ) return 1 - # Validate fixture file exists - fixture = Path(args.file) + # Resolve fixture: either from --file or auto-export from --video-id + fixture_path: str + if args.file: + fixture_path = args.file + else: + # Auto-export from database + print(f"\n[OPTIMIZE] Exporting fixture from video_id={args.video_id}...", file=sys.stderr) + import tempfile + from pipeline.export_fixture import export_fixture + + settings = get_settings() + tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="optimize_fixture_", delete=False) + tmp.close() + exit_code = export_fixture( + database_url=settings.database_url, + redis_url=settings.redis_url, + video_id=args.video_id, + output_path=tmp.name, + ) + if exit_code != 0: + print(f"Error: fixture export failed (exit code {exit_code})", file=sys.stderr) + return 1 + fixture_path = tmp.name + print(f"[OPTIMIZE] Fixture exported to: {fixture_path}", file=sys.stderr) + + fixture = Path(fixture_path) if not fixture.exists(): - print(f"Error: fixture file not found: {args.file}", file=sys.stderr) + print(f"Error: fixture file not found: {fixture_path}", file=sys.stderr) return 1 # Ensure output dir @@ -375,7 +404,7 @@ def _run_optimize(args: argparse.Namespace) -> int: loop = OptimizationLoop( client=client, stage=args.stage, - fixture_path=args.file, + fixture_path=fixture_path, iterations=args.iterations, variants_per_iter=args.variants_per_iter, output_dir=args.output_dir, @@ -407,7 +436,7 @@ def _run_optimize(args: argparse.Namespace) -> int: stage=args.stage, iterations=args.iterations, variants_per_iter=args.variants_per_iter, - fixture_path=args.file, + fixture_path=fixture_path, ) print(f" Results written to: {json_path}") except OSError as exc: diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index e148cd6..2e6fe5d 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -241,11 +241,34 @@ def _get_sync_session() -> Session: return _SessionLocal() -def _load_prompt(template_name: str) -> str: +def _load_prompt(template_name: str, video_id: str | None = None) -> str: """Read a prompt template from the prompts directory. - Raises FileNotFoundError if the template does not exist. + If ``video_id`` is provided, checks Redis for a per-video prompt override + (key: ``chrysopedia:prompt_override:{video_id}:{template_name}``) before + falling back to the on-disk template. Overrides are set by + ``run_single_stage`` for single-stage re-runs with custom prompts. + + Raises FileNotFoundError if no override exists and the template is missing. """ + # Check for per-video prompt override in Redis + if video_id: + try: + import redis + settings = get_settings() + r = redis.Redis.from_url(settings.redis_url) + override_key = f"chrysopedia:prompt_override:{video_id}:{template_name}" + override = r.get(override_key) + if override: + prompt_text = override.decode("utf-8") + logger.info( + "[PROMPT] Using override from Redis: video_id=%s, template=%s (%d chars)", + video_id, template_name, len(prompt_text), + ) + return prompt_text + except Exception as exc: + logger.warning("[PROMPT] Redis override check failed: %s", exc) + settings = get_settings() path = Path(settings.prompts_path) / template_name if not path.exists(): @@ -398,7 +421,7 @@ def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str: transcript_text = "\n".join(transcript_lines) # Load prompt and call LLM - system_prompt = _load_prompt("stage2_segmentation.txt") + system_prompt = _load_prompt("stage2_segmentation.txt", video_id=video_id) user_prompt = f"\n{transcript_text}\n" llm = _get_llm_client() @@ -477,7 +500,7 @@ def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str: label = seg.topic_label or "unlabeled" groups[label].append(seg) - system_prompt = _load_prompt("stage3_extraction.txt") + system_prompt = _load_prompt("stage3_extraction.txt", video_id=video_id) llm = _get_llm_client() model_override, modality = _get_stage_config(3) hard_limit = get_settings().llm_max_tokens_hard_limit @@ -650,7 +673,7 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str tags_data = _load_canonical_tags() taxonomy_text = _format_taxonomy_for_prompt(tags_data) - system_prompt = _load_prompt("stage4_classification.txt") + system_prompt = _load_prompt("stage4_classification.txt", video_id=video_id) llm = _get_llm_client() model_override, modality = _get_stage_config(4) hard_limit = get_settings().llm_max_tokens_hard_limit @@ -716,26 +739,108 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str def _store_classification_data(video_id: str, data: list[dict]) -> None: - """Store classification data in Redis for cross-stage communication.""" + """Store classification data in Redis (cache) and PostgreSQL (durable). + + Dual-write ensures classification data survives Redis TTL expiry or flush. + Redis serves as the fast-path cache; PostgreSQL is the durable fallback. + """ import redis settings = get_settings() - r = redis.Redis.from_url(settings.redis_url) - key = f"chrysopedia:classification:{video_id}" - r.set(key, json.dumps(data), ex=86400) # Expire after 24 hours + + # Redis: fast cache with 7-day TTL + try: + r = redis.Redis.from_url(settings.redis_url) + key = f"chrysopedia:classification:{video_id}" + r.set(key, json.dumps(data), ex=604800) # 7 days + logger.info( + "[CLASSIFY-STORE] Redis write: video_id=%s, %d entries, ttl=7d", + video_id, len(data), + ) + except Exception as exc: + logger.warning( + "[CLASSIFY-STORE] Redis write failed for video_id=%s: %s", video_id, exc, + ) + + # PostgreSQL: durable storage on SourceVideo.classification_data + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one_or_none() + if video: + video.classification_data = data + session.commit() + logger.info( + "[CLASSIFY-STORE] PostgreSQL write: video_id=%s, %d entries", + video_id, len(data), + ) + else: + logger.warning( + "[CLASSIFY-STORE] Video not found for PostgreSQL write: %s", video_id, + ) + except Exception as exc: + session.rollback() + logger.warning( + "[CLASSIFY-STORE] PostgreSQL write failed for video_id=%s: %s", video_id, exc, + ) + finally: + session.close() def _load_classification_data(video_id: str) -> list[dict]: - """Load classification data from Redis.""" + """Load classification data from Redis (fast path) or PostgreSQL (fallback). + + Tries Redis first. If the key has expired or Redis is unavailable, falls + back to the durable SourceVideo.classification_data column. + """ import redis settings = get_settings() - r = redis.Redis.from_url(settings.redis_url) - key = f"chrysopedia:classification:{video_id}" - raw = r.get(key) - if raw is None: - return [] - return json.loads(raw) + + # Try Redis first (fast path) + try: + r = redis.Redis.from_url(settings.redis_url) + key = f"chrysopedia:classification:{video_id}" + raw = r.get(key) + if raw is not None: + data = json.loads(raw) + logger.info( + "[CLASSIFY-LOAD] Source: redis, video_id=%s, %d entries", + video_id, len(data), + ) + return data + except Exception as exc: + logger.warning( + "[CLASSIFY-LOAD] Redis unavailable for video_id=%s: %s", video_id, exc, + ) + + # Fallback to PostgreSQL + logger.info("[CLASSIFY-LOAD] Redis miss, falling back to PostgreSQL for video_id=%s", video_id) + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one_or_none() + if video and video.classification_data: + data = video.classification_data + logger.info( + "[CLASSIFY-LOAD] Source: postgresql, video_id=%s, %d entries", + video_id, len(data), + ) + return data + except Exception as exc: + logger.warning( + "[CLASSIFY-LOAD] PostgreSQL fallback failed for video_id=%s: %s", video_id, exc, + ) + finally: + session.close() + + logger.warning( + "[CLASSIFY-LOAD] No classification data found in Redis or PostgreSQL for video_id=%s", + video_id, + ) + return [] @@ -969,7 +1074,7 @@ def _merge_pages_by_slug( indent=2, ensure_ascii=False, ) - merge_system_prompt = _load_prompt("stage5_merge.txt") + merge_system_prompt = _load_prompt("stage5_merge.txt", video_id=video_id) merge_user_prompt = f"{creator_name}\n\n{pages_json}\n" max_tokens = estimate_max_tokens( @@ -1080,7 +1185,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: category = cls_info.get("topic_category", "Uncategorized").strip().title() groups[category].append((moment, cls_info)) - system_prompt = _load_prompt("stage5_synthesis.txt") + system_prompt = _load_prompt("stage5_synthesis.txt", video_id=video_id) llm = _get_llm_client() model_override, modality = _get_stage_config(5) hard_limit = settings.llm_max_tokens_hard_limit @@ -1784,3 +1889,214 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str: raise return video_id + + +# ── Single-Stage Re-Run ───────────────────────────────────────────────────── + +@celery_app.task +def run_single_stage( + video_id: str, + stage_name: str, + trigger: str = "stage_rerun", + prompt_override: str | None = None, +) -> str: + """Re-run a single pipeline stage without running predecessors. + + Designed for fast prompt iteration — especially stage 5 synthesis. + Bypasses the processing_status==complete guard, creates a proper + PipelineRun record, and restores status on completion. + + If ``prompt_override`` is provided, it is stored in Redis as a + per-video override that ``_load_prompt`` reads before falling back + to the on-disk template. The override is cleaned up after the stage runs. + + Returns the video_id. + """ + import redis as redis_lib + + logger.info( + "[RERUN] Starting single-stage re-run: video_id=%s, stage=%s, trigger=%s", + video_id, stage_name, trigger, + ) + + # Validate stage name + if stage_name not in _PIPELINE_STAGES: + raise ValueError( + f"[RERUN] Invalid stage '{stage_name}'. " + f"Valid stages: {_PIPELINE_STAGES}" + ) + + # Validate video exists + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one_or_none() + if video is None: + raise ValueError(f"[RERUN] Video not found: {video_id}") + original_status = video.processing_status + finally: + session.close() + + # Validate prerequisites for the requested stage + prereq_ok, prereq_msg = _check_stage_prerequisites(video_id, stage_name) + if not prereq_ok: + logger.error("[RERUN] Prerequisite check failed: %s", prereq_msg) + raise ValueError(f"[RERUN] Prerequisites not met: {prereq_msg}") + + logger.info("[RERUN] Prerequisite check passed: %s", prereq_msg) + + # Store prompt override in Redis if provided + override_key = None + if prompt_override: + settings = get_settings() + try: + r = redis_lib.Redis.from_url(settings.redis_url) + # Map stage name to its prompt template + stage_prompt_map = { + "stage2_segmentation": "stage2_segmentation.txt", + "stage3_extraction": "stage3_extraction.txt", + "stage4_classification": "stage4_classification.txt", + "stage5_synthesis": "stage5_synthesis.txt", + } + template = stage_prompt_map.get(stage_name) + if template: + override_key = f"chrysopedia:prompt_override:{video_id}:{template}" + r.set(override_key, prompt_override, ex=3600) # 1-hour TTL + logger.info( + "[RERUN] Prompt override stored: key=%s (%d chars, first 100: %s)", + override_key, len(prompt_override), prompt_override[:100], + ) + except Exception as exc: + logger.warning("[RERUN] Failed to store prompt override: %s", exc) + + # Snapshot prior pages (needed for stage 5 page matching) + if stage_name in ("stage5_synthesis",): + _snapshot_prior_pages(video_id) + + # Create pipeline run record + run_id = _create_run(video_id, trigger) + logger.info("[RERUN] Created run_id=%s", run_id) + + # Temporarily set status to processing + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + video.processing_status = ProcessingStatus.processing + session.commit() + finally: + session.close() + + # Run the single stage + start = time.monotonic() + try: + task_func = _STAGE_TASKS[stage_name] + task_func(video_id, run_id=run_id) + + elapsed = time.monotonic() - start + logger.info( + "[RERUN] Stage %s completed: %.1fs, video_id=%s", + stage_name, elapsed, video_id, + ) + _finish_run(run_id, "complete") + + # Restore status to complete + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + video.processing_status = ProcessingStatus.complete + session.commit() + logger.info("[RERUN] Status restored to complete") + finally: + session.close() + + except Exception as exc: + elapsed = time.monotonic() - start + logger.error( + "[RERUN] Stage %s FAILED after %.1fs: %s", + stage_name, elapsed, exc, + ) + _set_error_status(video_id, stage_name, exc) + _finish_run(run_id, "error", error_stage=stage_name) + raise + + finally: + # Clean up prompt override from Redis + if override_key: + try: + settings = get_settings() + r = redis_lib.Redis.from_url(settings.redis_url) + r.delete(override_key) + logger.info("[RERUN] Prompt override cleaned up: %s", override_key) + except Exception as exc: + logger.warning("[RERUN] Failed to clean up prompt override: %s", exc) + + return video_id + + +def _check_stage_prerequisites(video_id: str, stage_name: str) -> tuple[bool, str]: + """Validate that prerequisite data exists for a stage re-run. + + Returns (ok, message) where message describes what was found or missing. + """ + session = _get_sync_session() + try: + if stage_name == "stage2_segmentation": + # Needs transcript segments + count = session.execute( + select(func.count(TranscriptSegment.id)) + .where(TranscriptSegment.source_video_id == video_id) + ).scalar() or 0 + if count == 0: + return False, "No transcript segments found" + return True, f"transcript_segments={count}" + + if stage_name == "stage3_extraction": + # Needs transcript segments with topic_labels + count = session.execute( + select(func.count(TranscriptSegment.id)) + .where( + TranscriptSegment.source_video_id == video_id, + TranscriptSegment.topic_label.isnot(None), + ) + ).scalar() or 0 + if count == 0: + return False, "No labeled transcript segments (stage 2 must complete first)" + return True, f"labeled_segments={count}" + + if stage_name == "stage4_classification": + # Needs key moments + count = session.execute( + select(func.count(KeyMoment.id)) + .where(KeyMoment.source_video_id == video_id) + ).scalar() or 0 + if count == 0: + return False, "No key moments found (stage 3 must complete first)" + return True, f"key_moments={count}" + + if stage_name == "stage5_synthesis": + # Needs key moments + classification data + km_count = session.execute( + select(func.count(KeyMoment.id)) + .where(KeyMoment.source_video_id == video_id) + ).scalar() or 0 + if km_count == 0: + return False, "No key moments found (stages 2-3 must complete first)" + + cls_data = _load_classification_data(video_id) + cls_source = "redis+pg" + if not cls_data: + return False, f"No classification data found (stage 4 must complete first), key_moments={km_count}" + + return True, f"key_moments={km_count}, classification_entries={len(cls_data)}" + + if stage_name == "stage6_embed_and_index": + return True, "stage 6 is non-blocking and always runs" + + return False, f"Unknown stage: {stage_name}" + finally: + session.close() diff --git a/backend/pipeline/test_harness.py b/backend/pipeline/test_harness.py new file mode 100644 index 0000000..074628f --- /dev/null +++ b/backend/pipeline/test_harness.py @@ -0,0 +1,481 @@ +"""Offline prompt test harness for Chrysopedia synthesis. + +Loads a fixture JSON (exported by export_fixture.py) and a prompt file, +calls the LLM, and outputs the synthesized result. No Docker, no database, +no Redis, no Celery — just prompt + fixture + LLM endpoint. + +Usage: + python -m pipeline.test_harness \\ + --fixture fixtures/real_video_xyz.json \\ + --prompt prompts/stage5_synthesis.txt \\ + --output /tmp/result.json + + # Run all categories in a fixture: + python -m pipeline.test_harness --fixture fixtures/video.json + + # Run a specific category only: + python -m pipeline.test_harness --fixture fixtures/video.json --category "Sound Design" + +Exit codes: 0=success, 1=LLM error, 2=parse error, 3=fixture error +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from collections import Counter, defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import NamedTuple + +from pydantic import ValidationError + +from config import get_settings +from pipeline.llm_client import LLMClient, estimate_max_tokens +from pipeline.schemas import SynthesisResult + + +# ── Lightweight stand-in for KeyMoment ORM model ─────────────────────────── + +class _MockContentType: + """Mimics KeyMomentContentType enum with a .value property.""" + def __init__(self, value: str) -> None: + self.value = value + + +class MockKeyMoment(NamedTuple): + """Lightweight stand-in for the ORM KeyMoment. + + Has the same attributes that _build_moments_text() accesses: + title, summary, content_type, start_time, end_time, plugins, raw_transcript. + """ + title: str + summary: str + content_type: object # _MockContentType + start_time: float + end_time: float + plugins: list[str] + raw_transcript: str + + +def _log(tag: str, msg: str, level: str = "INFO") -> None: + """Write structured log line to stderr.""" + print(f"[HARNESS] [{level}] {tag}: {msg}", file=sys.stderr) + + +# ── Moment text builder (mirrors stages.py _build_moments_text) ──────────── + +def build_moments_text( + moment_group: list[tuple[MockKeyMoment, dict]], + category: str, +) -> tuple[str, set[str]]: + """Build the moments prompt text — matches _build_moments_text in stages.py.""" + moments_lines = [] + all_tags: set[str] = set() + for i, (m, cls_info) in enumerate(moment_group): + tags = cls_info.get("topic_tags", []) + all_tags.update(tags) + moments_lines.append( + f"[{i}] Title: {m.title}\n" + f" Summary: {m.summary}\n" + f" Content type: {m.content_type.value}\n" + f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n" + f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n" + f" Category: {category}\n" + f" Tags: {', '.join(tags) if tags else 'none'}\n" + f" Transcript excerpt: {(m.raw_transcript or '')[:300]}" + ) + return "\n\n".join(moments_lines), all_tags + + +# ── Fixture loading ──────────────────────────────────────────────────────── + +@dataclass +class FixtureData: + """Parsed fixture with moments grouped by category.""" + creator_name: str + video_id: str + content_type: str + filename: str + # Groups: category -> list of (MockKeyMoment, cls_info_dict) + groups: dict[str, list[tuple[MockKeyMoment, dict]]] + total_moments: int + + +def load_fixture(path: str) -> FixtureData: + """Load and parse a fixture JSON file into grouped moments.""" + fixture_path = Path(path) + if not fixture_path.exists(): + raise FileNotFoundError(f"Fixture not found: {path}") + + raw = fixture_path.read_text(encoding="utf-8") + size_kb = len(raw.encode("utf-8")) / 1024 + data = json.loads(raw) + + moments_raw = data.get("moments", []) + if not moments_raw: + raise ValueError(f"Fixture has no moments: {path}") + + _log("FIXTURE", f"Loading: {path} ({size_kb:.1f} KB, {len(moments_raw)} moments)") + + # Build MockKeyMoment objects and group by category + groups: dict[str, list[tuple[MockKeyMoment, dict]]] = defaultdict(list) + + for m in moments_raw: + cls = m.get("classification", {}) + category = cls.get("topic_category", m.get("topic_category", "Uncategorized")) + tags = cls.get("topic_tags", m.get("topic_tags", [])) + + mock = MockKeyMoment( + title=m.get("title", m.get("summary", "")[:80]), + summary=m.get("summary", ""), + content_type=_MockContentType(m.get("content_type", "technique")), + start_time=m.get("start_time", 0.0), + end_time=m.get("end_time", 0.0), + plugins=m.get("plugins", []), + raw_transcript=m.get("raw_transcript", m.get("transcript_excerpt", "")), + ) + cls_info = {"topic_category": category, "topic_tags": tags} + groups[category].append((mock, cls_info)) + + # Log breakdown + cat_counts = {cat: len(moms) for cat, moms in groups.items()} + counts = list(cat_counts.values()) + _log( + "FIXTURE", + f"Breakdown: {len(groups)} categories, " + f"moments per category: min={min(counts)}, max={max(counts)}, " + f"avg={sum(counts)/len(counts):.1f}", + ) + for cat, count in sorted(cat_counts.items(), key=lambda x: -x[1]): + _log("FIXTURE", f" {cat}: {count} moments") + + return FixtureData( + creator_name=data.get("creator_name", "Unknown"), + video_id=data.get("video_id", "unknown"), + content_type=data.get("content_type", "tutorial"), + filename=data.get("filename", "unknown"), + groups=dict(groups), + total_moments=len(moments_raw), + ) + + +# ── Synthesis runner ─────────────────────────────────────────────────────── + +def run_synthesis( + fixture: FixtureData, + prompt_path: str, + category_filter: str | None = None, + model_override: str | None = None, + modality: str | None = None, +) -> tuple[list[dict], int]: + """Run synthesis on fixture data, returns (pages, exit_code). + + Returns all synthesized pages as dicts plus an exit code. + """ + # Load prompt + prompt_file = Path(prompt_path) + if not prompt_file.exists(): + _log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR") + return [], 3 + + system_prompt = prompt_file.read_text(encoding="utf-8") + _log("PROMPT", f"Loading: {prompt_path} ({len(system_prompt)} chars)") + + # Setup LLM + settings = get_settings() + llm = LLMClient(settings) + + stage_model = model_override or settings.llm_stage5_model or settings.llm_model + stage_modality = modality or settings.llm_stage5_modality or "thinking" + hard_limit = settings.llm_max_tokens_hard_limit + + _log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}") + + # Filter categories if requested + categories = fixture.groups + if category_filter: + if category_filter not in categories: + _log("ERROR", f"Category '{category_filter}' not found. Available: {list(categories.keys())}", level="ERROR") + return [], 3 + categories = {category_filter: categories[category_filter]} + + all_pages: list[dict] = [] + total_prompt_tokens = 0 + total_completion_tokens = 0 + total_duration_ms = 0 + exit_code = 0 + + for cat_idx, (category, moment_group) in enumerate(categories.items(), 1): + _log("SYNTH", f"Category {cat_idx}/{len(categories)}: '{category}' ({len(moment_group)} moments)") + + # Build user prompt (same format as stages.py _synthesize_chunk) + moments_text, all_tags = build_moments_text(moment_group, category) + user_prompt = f"{fixture.creator_name}\n\n{moments_text}\n" + + estimated_tokens = estimate_max_tokens( + system_prompt, user_prompt, + stage="stage5_synthesis", + hard_limit=hard_limit, + ) + _log( + "SYNTH", + f" Building prompt: {len(moment_group)} moments, " + f"max_tokens={estimated_tokens}, tags={sorted(all_tags)[:5]}{'...' if len(all_tags) > 5 else ''}", + ) + + # Call LLM + call_start = time.monotonic() + _log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}") + + try: + raw = llm.complete( + system_prompt, + user_prompt, + response_model=SynthesisResult, + modality=stage_modality, + model_override=stage_model, + max_tokens=estimated_tokens, + ) + except Exception as exc: + _log("ERROR", f" LLM call failed: {exc}", level="ERROR") + exit_code = 1 + continue + + call_duration_ms = int((time.monotonic() - call_start) * 1000) + prompt_tokens = getattr(raw, "prompt_tokens", None) or 0 + completion_tokens = getattr(raw, "completion_tokens", None) or 0 + finish_reason = getattr(raw, "finish_reason", "unknown") + + total_prompt_tokens += prompt_tokens + total_completion_tokens += completion_tokens + total_duration_ms += call_duration_ms + + _log( + "LLM", + f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, " + f"{call_duration_ms}ms, finish_reason={finish_reason}", + ) + + if finish_reason == "length": + _log( + "WARN", + " finish_reason=length — output likely truncated! " + "Consider reducing fixture size or increasing max_tokens.", + level="WARN", + ) + + # Parse response + try: + result = SynthesisResult.model_validate_json(str(raw)) + except (ValidationError, json.JSONDecodeError) as exc: + _log("ERROR", f" Parse failed: {exc}", level="ERROR") + _log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR") + exit_code = 2 + continue + + # Log per-page summary + _log("SYNTH", f" Parsed: {len(result.pages)} pages synthesized") + total_words = 0 + for page in result.pages: + sections = page.body_sections or {} + word_count = sum(len(str(v).split()) for v in sections.values()) + total_words += word_count + _log( + "PAGE", + f" '{page.title}' ({page.slug}): " + f"{len(sections)} sections, {word_count} words, " + f"{len(page.moment_indices)} moments linked, " + f"quality={page.source_quality}", + ) + + all_pages.append(page.model_dump()) + + # Summary + _log("SUMMARY", f"Total: {len(all_pages)} pages across {len(categories)} categories") + _log("SUMMARY", f"Tokens: {total_prompt_tokens} prompt + {total_completion_tokens} completion = {total_prompt_tokens + total_completion_tokens} total") + _log("SUMMARY", f"Duration: {total_duration_ms}ms ({total_duration_ms / 1000:.1f}s)") + + return all_pages, exit_code + + +# ── Promote: deploy a prompt to production ───────────────────────────────── + +_STAGE_PROMPT_MAP = { + 2: "stage2_segmentation.txt", + 3: "stage3_extraction.txt", + 4: "stage4_classification.txt", + 5: "stage5_synthesis.txt", +} + + +def promote_prompt(prompt_path: str, stage: int, reason: str, commit: bool = False) -> int: + """Copy a winning prompt to the canonical path and create a backup. + + The worker reads prompts from disk at runtime — no restart needed. + """ + import hashlib + import shutil + + if stage not in _STAGE_PROMPT_MAP: + _log("ERROR", f"Invalid stage {stage}. Valid: {sorted(_STAGE_PROMPT_MAP)}", level="ERROR") + return 1 + + settings = get_settings() + template_name = _STAGE_PROMPT_MAP[stage] + canonical = Path(settings.prompts_path) / template_name + source = Path(prompt_path) + + if not source.exists(): + _log("ERROR", f"Source prompt not found: {prompt_path}", level="ERROR") + return 1 + + new_prompt = source.read_text(encoding="utf-8") + new_hash = hashlib.sha256(new_prompt.encode()).hexdigest()[:12] + + # Backup current prompt + old_prompt = "" + old_hash = "none" + if canonical.exists(): + old_prompt = canonical.read_text(encoding="utf-8") + old_hash = hashlib.sha256(old_prompt.encode()).hexdigest()[:12] + + if old_prompt.strip() == new_prompt.strip(): + _log("PROMOTE", "No change — new prompt is identical to current prompt") + return 0 + + archive_dir = Path(settings.prompts_path) / "archive" + archive_dir.mkdir(parents=True, exist_ok=True) + ts = time.strftime("%Y%m%d_%H%M%S", time.gmtime()) + backup = archive_dir / f"{template_name.replace('.txt', '')}_{ts}.txt" + shutil.copy2(canonical, backup) + _log("PROMOTE", f"Backed up current prompt: {old_hash} -> {backup}") + + # Write new prompt + canonical.write_text(new_prompt, encoding="utf-8") + + old_lines = old_prompt.strip().splitlines() + new_lines = new_prompt.strip().splitlines() + _log("PROMOTE", f"Installed new prompt: {new_hash} ({len(new_prompt)} chars, {len(new_lines)} lines)") + _log("PROMOTE", f"Previous: {old_hash} ({len(old_prompt)} chars, {len(old_lines)} lines)") + _log("PROMOTE", f"Reason: {reason}") + _log("PROMOTE", "Worker reads prompts from disk at runtime — no restart needed") + + if commit: + import subprocess + try: + subprocess.run( + ["git", "add", str(canonical)], + cwd=str(canonical.parent.parent), + check=True, capture_output=True, + ) + msg = f"prompt: promote stage{stage} — {reason}" + subprocess.run( + ["git", "commit", "-m", msg], + cwd=str(canonical.parent.parent), + check=True, capture_output=True, + ) + _log("PROMOTE", f"Git commit created: {msg}") + except subprocess.CalledProcessError as exc: + _log("PROMOTE", f"Git commit failed: {exc}", level="WARN") + + return 0 + + +# ── CLI ──────────────────────────────────────────────────────────────────── + +def main() -> int: + parser = argparse.ArgumentParser( + prog="pipeline.test_harness", + description="Offline prompt test harness for Chrysopedia synthesis", + ) + sub = parser.add_subparsers(dest="command") + + # -- run subcommand (default behavior) -- + run_parser = sub.add_parser("run", help="Run synthesis against a fixture") + run_parser.add_argument("--fixture", "-f", type=str, required=True, help="Fixture JSON file") + run_parser.add_argument("--prompt", "-p", type=str, default=None, help="Prompt file (default: stage5_synthesis.txt)") + run_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path") + run_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category") + run_parser.add_argument("--model", type=str, default=None, help="Override LLM model") + run_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"]) + + # -- promote subcommand -- + promo_parser = sub.add_parser("promote", help="Deploy a winning prompt to production") + promo_parser.add_argument("--prompt", "-p", type=str, required=True, help="Path to the winning prompt file") + promo_parser.add_argument("--stage", "-s", type=int, default=5, help="Stage number (default: 5)") + promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted") + promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit") + + args = parser.parse_args() + + # If no subcommand, check for --fixture for backward compat + if args.command is None: + # Support running without subcommand for backward compat + parser.print_help() + return 1 + + if args.command == "promote": + return promote_prompt(args.prompt, args.stage, args.reason, args.commit) + + # -- run command -- + prompt_path = args.prompt + if prompt_path is None: + settings = get_settings() + prompt_path = str(Path(settings.prompts_path) / "stage5_synthesis.txt") + + overall_start = time.monotonic() + try: + fixture = load_fixture(args.fixture) + except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc: + _log("ERROR", f"Fixture error: {exc}", level="ERROR") + return 3 + + pages, exit_code = run_synthesis( + fixture=fixture, + prompt_path=prompt_path, + category_filter=args.category, + model_override=args.model, + modality=args.modality, + ) + + if not pages and exit_code != 0: + return exit_code + + output = { + "fixture_source": args.fixture, + "prompt_source": prompt_path, + "creator_name": fixture.creator_name, + "video_id": fixture.video_id, + "category_filter": args.category, + "pages": pages, + "metadata": { + "page_count": len(pages), + "total_words": sum( + sum(len(str(v).split()) for v in p.get("body_sections", {}).values()) + for p in pages + ), + "elapsed_seconds": round(time.monotonic() - overall_start, 1), + }, + } + + output_json = json.dumps(output, indent=2, ensure_ascii=False) + + if args.output: + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + Path(args.output).write_text(output_json, encoding="utf-8") + _log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)") + else: + print(output_json) + _log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)") + + total_elapsed = time.monotonic() - overall_start + _log("DONE", f"Completed in {total_elapsed:.1f}s (exit_code={exit_code})") + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index 7f45a90..af2f01f 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -251,6 +251,23 @@ async def clean_retrigger_pipeline( logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc) deleted_counts["qdrant_vectors"] = f"skipped: {exc}" + # Clear Redis classification/prior-pages cache to prevent stale data + try: + import redis as redis_lib + settings = get_settings() + r = redis_lib.Redis.from_url(settings.redis_url) + r.delete(f"chrysopedia:classification:{video_id}") + r.delete(f"chrysopedia:prior_pages:{video_id}") + deleted_counts["redis_cache"] = "cleared" + logger.info("Redis cache cleared for video_id=%s", video_id) + except Exception as exc: + logger.warning("Redis cache cleanup failed for video_id=%s: %s", video_id, exc) + deleted_counts["redis_cache"] = f"skipped: {exc}" + + # Clear durable classification_data column too + video.classification_data = None + await db.commit() + # Now trigger the pipeline from pipeline.stages import run_pipeline try: @@ -270,6 +287,210 @@ async def clean_retrigger_pipeline( } +# ── Admin: Re-run Single Stage ────────────────────────────────────────────── + +@router.post("/admin/pipeline/rerun-stage/{video_id}/{stage_name}") +async def rerun_stage( + video_id: str, + stage_name: str, + prompt_override: str | None = None, + db: AsyncSession = Depends(get_session), +): + """Re-run a single pipeline stage without running predecessors. + + Designed for fast prompt iteration — especially stage 5 synthesis. + Optionally accepts a prompt_override string that temporarily replaces + the on-disk prompt template for this run only. + + Valid stage names: stage2_segmentation, stage3_extraction, + stage4_classification, stage5_synthesis, stage6_embed_and_index. + """ + from pipeline.stages import _PIPELINE_STAGES, run_single_stage + + # Validate stage name + if stage_name not in _PIPELINE_STAGES: + raise HTTPException( + status_code=400, + detail=f"Invalid stage '{stage_name}'. Valid: {_PIPELINE_STAGES}", + ) + + # Validate video exists + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + # Dispatch single-stage re-run + try: + run_single_stage.delay( + str(video.id), + stage_name, + trigger="stage_rerun", + prompt_override=prompt_override, + ) + logger.info( + "Single-stage re-run dispatched: video_id=%s, stage=%s, prompt_override=%s", + video_id, stage_name, "yes" if prompt_override else "no", + ) + except Exception as exc: + logger.warning( + "Failed to dispatch single-stage re-run for video_id=%s: %s", video_id, exc, + ) + raise HTTPException( + status_code=503, + detail="Stage re-run dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "stage_rerun_dispatched", + "video_id": str(video.id), + "stage": stage_name, + "prompt_override": bool(prompt_override), + } + + +# ── Admin: Chunking Inspector ─────────────────────────────────────────────── + +@router.get("/admin/pipeline/chunking/{video_id}") +async def get_chunking_data( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Return chunking/grouping data for a video — topic boundaries, classifications, + and synthesis group breakdowns. + + Helps diagnose whether bad synthesis output is a prompt problem or a data shape problem. + """ + from config import get_settings + + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + settings = get_settings() + + # 1. Topic boundaries (stage 2 output): segments grouped by topic_label + segments = ( + await db.execute( + select(TranscriptSegment) + .where(TranscriptSegment.source_video_id == video_id) + .order_by(TranscriptSegment.segment_index) + ) + ).scalars().all() + + topic_boundaries: list[dict] = [] + current_label = None + current_group: list[dict] = [] + + for seg in segments: + label = seg.topic_label or "unlabeled" + if label != current_label: + if current_group: + topic_boundaries.append({ + "topic_label": current_label, + "segment_count": len(current_group), + "start_time": current_group[0]["start_time"], + "end_time": current_group[-1]["end_time"], + "start_index": current_group[0]["segment_index"], + "end_index": current_group[-1]["segment_index"], + }) + current_label = label + current_group = [] + + current_group.append({ + "start_time": seg.start_time, + "end_time": seg.end_time, + "segment_index": seg.segment_index, + }) + + if current_group: + topic_boundaries.append({ + "topic_label": current_label, + "segment_count": len(current_group), + "start_time": current_group[0]["start_time"], + "end_time": current_group[-1]["end_time"], + "start_index": current_group[0]["segment_index"], + "end_index": current_group[-1]["segment_index"], + }) + + # 2. Key moments (stage 3 output) + moments = ( + await db.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + ).scalars().all() + + key_moments = [ + { + "id": str(m.id), + "title": m.title, + "content_type": m.content_type.value, + "start_time": m.start_time, + "end_time": m.end_time, + "plugins": m.plugins or [], + "technique_page_id": str(m.technique_page_id) if m.technique_page_id else None, + } + for m in moments + ] + + # 3. Classification data (stage 4 output) + classification: list[dict] = [] + cls_source = "missing" + + if video.classification_data: + classification = video.classification_data + cls_source = "postgresql" + else: + try: + import redis as redis_lib + r = redis_lib.Redis.from_url(settings.redis_url) + raw = r.get(f"chrysopedia:classification:{video_id}") + if raw: + classification = __import__("json").loads(raw) + cls_source = "redis" + except Exception: + pass + + # 4. Synthesis groups (how stage 5 would group moments) + cls_by_moment_id = {c["moment_id"]: c for c in classification} + synthesis_groups: dict[str, dict] = {} + + for m in moments: + cls_info = cls_by_moment_id.get(str(m.id), {}) + category = cls_info.get("topic_category", "Uncategorized").strip().title() + if category not in synthesis_groups: + synthesis_groups[category] = { + "category": category, + "moment_count": 0, + "moment_ids": [], + "exceeds_chunk_threshold": False, + } + synthesis_groups[category]["moment_count"] += 1 + synthesis_groups[category]["moment_ids"].append(str(m.id)) + + chunk_size = settings.synthesis_chunk_size + for group in synthesis_groups.values(): + group["exceeds_chunk_threshold"] = group["moment_count"] > chunk_size + group["chunks_needed"] = max(1, -(-group["moment_count"] // chunk_size)) # ceil division + + return { + "video_id": str(video.id), + "total_segments": len(segments), + "total_moments": len(moments), + "classification_source": cls_source, + "synthesis_chunk_size": chunk_size, + "topic_boundaries": topic_boundaries, + "key_moments": key_moments, + "classification": classification, + "synthesis_groups": list(synthesis_groups.values()), + } + + # ── Admin: Revoke ──────────────────────────────────────────────────────────── @router.post("/admin/pipeline/revoke/{video_id}") @@ -578,7 +799,215 @@ async def get_token_summary( ) -# ── Admin: Worker status ───────────────────────────────────────────────────── +# ── Admin: Stale Pages ────────────────────────────────────────────────────── + +@router.get("/admin/pipeline/stale-pages") +async def get_stale_pages( + db: AsyncSession = Depends(get_session), +): + """Detect technique pages synthesized with an older prompt than the current one. + + Compares the SHA-256 hash of the current stage5_synthesis.txt against the + prompt hashes stored in TechniquePageVersion.pipeline_metadata. + """ + import hashlib + from pathlib import Path as _Path + from models import TechniquePage, TechniquePageVersion + + settings = get_settings() + prompt_path = _Path(settings.prompts_path) / "stage5_synthesis.txt" + + if not prompt_path.exists(): + raise HTTPException(status_code=500, detail="stage5_synthesis.txt not found") + + current_hash = hashlib.sha256( + prompt_path.read_text(encoding="utf-8").encode() + ).hexdigest()[:12] + + # Get all technique pages + pages = (await db.execute(select(TechniquePage))).scalars().all() + total = len(pages) + + stale_count = 0 + fresh_count = 0 + stale_by_creator: dict[str, dict] = {} + + for page in pages: + # Get latest version to check prompt hash + latest_version = ( + await db.execute( + select(TechniquePageVersion) + .where(TechniquePageVersion.technique_page_id == page.id) + .order_by(TechniquePageVersion.version_number.desc()) + .limit(1) + ) + ).scalar_one_or_none() + + page_hash = None + if latest_version and latest_version.pipeline_metadata: + meta = latest_version.pipeline_metadata + page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash")) + + if page_hash == current_hash: + fresh_count += 1 + else: + stale_count += 1 + # Look up creator name + creator = (await db.execute( + select(Creator.name).where(Creator.id == page.creator_id) + )).scalar_one_or_none() or "Unknown" + + if creator not in stale_by_creator: + stale_by_creator[creator] = {"creator": creator, "stale_count": 0, "page_slugs": []} + stale_by_creator[creator]["stale_count"] += 1 + stale_by_creator[creator]["page_slugs"].append(page.slug) + + return { + "current_prompt_hash": current_hash, + "total_pages": total, + "stale_pages": stale_count, + "fresh_pages": fresh_count, + "stale_by_creator": list(stale_by_creator.values()), + } + + +# ── Admin: Bulk Re-Synthesize ─────────────────────────────────────────────── + +@router.post("/admin/pipeline/bulk-resynthesize") +async def bulk_resynthesize( + video_ids: list[str] | None = None, + stage: str = "stage5_synthesis", + db: AsyncSession = Depends(get_session), +): + """Re-run a single stage on multiple videos without full pipeline reset. + + If video_ids is None, targets all videos with processing_status='complete'. + Rate-limited to avoid overwhelming the worker queue. + """ + from pipeline.stages import _PIPELINE_STAGES, run_single_stage + + if stage not in _PIPELINE_STAGES: + raise HTTPException(status_code=400, detail=f"Invalid stage: {stage}") + + if video_ids is None: + result = await db.execute( + select(SourceVideo.id).where( + SourceVideo.processing_status == ProcessingStatus.complete + ) + ) + video_ids = [str(row[0]) for row in result.all()] + + if not video_ids: + return {"status": "no_videos", "dispatched": 0, "skipped": 0, "total": 0} + + dispatched = 0 + skipped = [] + for vid in video_ids: + try: + run_single_stage.delay(vid, stage, trigger="stage_rerun") + dispatched += 1 + except Exception as exc: + logger.warning("Bulk re-synth dispatch failed for video_id=%s: %s", vid, exc) + skipped.append({"video_id": vid, "reason": str(exc)}) + + logger.info( + "[BULK-RESYNTH] Dispatched %d/%d %s re-runs", + dispatched, len(video_ids), stage, + ) + + return { + "status": "dispatched", + "stage": stage, + "total": len(video_ids), + "dispatched": dispatched, + "skipped": skipped if skipped else None, + } + + +# ── Admin: Prompt Optimization ────────────────────────────────────────────── + +@router.post("/admin/pipeline/optimize-prompt/{stage}") +async def trigger_optimization( + stage: int, + video_id: str = Query(..., description="Video UUID to use as fixture source"), + iterations: int = Query(5, description="Number of optimization iterations"), + variants_per_iter: int = Query(2, description="Variants per iteration"), +): + """Trigger an automated prompt optimization run as a background Celery task. + + Exports a fixture from the specified video, then runs the optimization loop + with generate-score-select cycles. Progress is tracked in + quality/results/progress_stage{N}.json. + """ + from pipeline.quality.scorer import STAGE_CONFIGS + + if stage not in STAGE_CONFIGS: + raise HTTPException(status_code=400, detail=f"Unsupported stage {stage}. Valid: {sorted(STAGE_CONFIGS)}") + + # Dispatch as a Celery task + from worker import celery_app + + @celery_app.task + def _run_optimization(video_id: str, stage: int, iterations: int, variants_per_iter: int) -> dict: + """Background optimization task.""" + import tempfile + from pipeline.export_fixture import export_fixture + from pipeline.quality.optimizer import OptimizationLoop + from pipeline.quality.__main__ import write_results_json + from config import get_settings as _get_settings + from pipeline.llm_client import LLMClient as _LLMClient + + settings = _get_settings() + + # Export fixture + tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="opt_fixture_", delete=False) + tmp.close() + exit_code = export_fixture(settings.database_url, settings.redis_url, video_id, tmp.name) + if exit_code != 0: + return {"error": f"Fixture export failed (exit code {exit_code})"} + + client = _LLMClient(settings) + loop = OptimizationLoop( + client=client, + stage=stage, + fixture_path=tmp.name, + iterations=iterations, + variants_per_iter=variants_per_iter, + output_dir="backend/pipeline/quality/results/", + ) + result = loop.run() + + json_path = write_results_json( + result=result, output_dir="backend/pipeline/quality/results/", + stage=stage, iterations=iterations, variants_per_iter=variants_per_iter, + fixture_path=tmp.name, + ) + return { + "best_score": result.best_score.composite, + "results_path": json_path, + "iterations": iterations, + } + + try: + task = _run_optimization.delay(video_id, stage, iterations, variants_per_iter) + logger.info( + "[OPTIMIZE] Dispatched optimization: stage=%d, video_id=%s, iterations=%d, task_id=%s", + stage, video_id, iterations, task.id, + ) + except Exception as exc: + raise HTTPException(status_code=503, detail=f"Failed to dispatch optimization: {exc}") from exc + + return { + "status": "dispatched", + "task_id": task.id, + "stage": stage, + "video_id": video_id, + "iterations": iterations, + "variants_per_iter": variants_per_iter, + } + + +# ── Admin: Reindex All ───────────────────────────────────────────────────── @router.post("/admin/pipeline/reindex-all") async def reindex_all( diff --git a/frontend/src/App.css b/frontend/src/App.css index 1cac18d..4d334df 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -2895,6 +2895,53 @@ a.app-footer__repo:hover { overflow-y: auto; } +.modal-subtitle { + color: var(--color-text-secondary); + font-size: 0.85rem; + margin: 0.25rem 0 1rem; +} + +.modal-field { + margin-bottom: 1rem; +} + +.modal-field label { + display: block; + margin-bottom: 0.35rem; + font-size: 0.85rem; + font-weight: 600; + color: var(--color-text-primary); +} + +.modal-field select, +.modal-field textarea { + width: 100%; + background: var(--color-bg); + color: var(--color-text-primary); + border: 1px solid var(--color-border); + border-radius: 6px; + padding: 0.5rem; + font-family: inherit; + font-size: 0.85rem; +} + +.modal-field textarea { + resize: vertical; + min-height: 80px; +} + +.modal-optional { + font-weight: 400; + color: var(--color-text-secondary); + font-size: 0.8rem; +} + +.modal-actions { + display: flex; + gap: 0.5rem; + margin-top: 1rem; +} + .report-modal__title { margin: 0 0 0.75rem; color: var(--color-text-primary); @@ -3021,6 +3068,16 @@ a.app-footer__repo:hover { opacity: 0.85; } +.btn--warning { + background: var(--color-warning, #f0ad4e); + color: #1a1a1a; + border-color: var(--color-warning, #f0ad4e); +} + +.btn--warning:hover:not(:disabled) { + opacity: 0.85; +} + .btn--random { background: var(--color-bg-input); color: var(--color-text-primary); @@ -4142,6 +4199,124 @@ a.app-footer__repo:hover { flex-wrap: wrap; } +/* ── Chunking Inspector ───────────────────────────────────────────────── */ + +.chunking-inspector { + margin-top: 0.75rem; +} + +.chunking-inspector__toggle { + background: none; + border: none; + color: var(--color-text-secondary); + cursor: pointer; + font-size: 0.85rem; + padding: 0.25rem 0; +} + +.chunking-inspector__toggle:hover { + color: var(--color-text-primary); +} + +.chunking-inspector__loading, +.chunking-inspector__error { + font-size: 0.8rem; + color: var(--color-text-muted); + padding: 0.5rem 0; +} + +.chunking-inspector__error { + color: var(--color-danger); +} + +.chunking-inspector__body { + margin-top: 0.5rem; +} + +.chunking-inspector__summary { + display: flex; + gap: 1rem; + font-size: 0.8rem; + color: var(--color-text-secondary); + padding: 0.5rem; + background: var(--color-bg); + border-radius: 6px; + margin-bottom: 0.75rem; + flex-wrap: wrap; +} + +.chunking-inspector__section { + margin-bottom: 0.75rem; +} + +.chunking-inspector__section h4 { + font-size: 0.8rem; + color: var(--color-text-secondary); + margin: 0 0 0.35rem; + font-weight: 600; +} + +.chunking-inspector__topics { + display: flex; + flex-direction: column; + gap: 0.25rem; +} + +.chunking-topic { + display: flex; + justify-content: space-between; + padding: 0.3rem 0.5rem; + background: var(--color-bg); + border-radius: 4px; + font-size: 0.8rem; +} + +.chunking-topic__label { + color: var(--color-text-primary); + font-weight: 500; +} + +.chunking-topic__meta { + color: var(--color-text-muted); + white-space: nowrap; +} + +.chunking-inspector__groups { + display: flex; + flex-wrap: wrap; + gap: 0.5rem; +} + +.chunking-group { + display: flex; + flex-direction: column; + padding: 0.5rem 0.75rem; + background: var(--color-bg); + border-radius: 6px; + border-left: 3px solid var(--color-accent); + font-size: 0.8rem; + min-width: 140px; +} + +.chunking-group--split { + border-left-color: var(--color-warning, #f0ad4e); +} + +.chunking-group__category { + font-weight: 600; + color: var(--color-text-primary); +} + +.chunking-group__count { + color: var(--color-text-secondary); +} + +.chunking-group__warn { + color: var(--color-warning, #f0ad4e); + font-size: 0.75rem; + margin-top: 0.15rem; +} + /* ── Pipeline Badges ────────────────────────────────────────────────────── */ .pipeline-badge { diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts index 071cd4f..0f18f42 100644 --- a/frontend/src/api/public-client.ts +++ b/frontend/src/api/public-client.ts @@ -583,6 +583,113 @@ export async function cleanRetriggerPipeline(videoId: string): Promise; + classification: Array>; + synthesis_groups: ChunkingSynthesisGroup[]; +} + +export async function fetchChunkingData(videoId: string): Promise { + return request(`${BASE}/admin/pipeline/chunking/${videoId}`); +} + +// ── Single-Stage Re-Run ──────────────────────────────────────────────────── + +export interface RerunStageResponse { + status: string; + video_id: string; + stage: string; + prompt_override: boolean; +} + +export async function rerunStage( + videoId: string, + stageName: string, + promptOverride?: string, +): Promise { + const body: Record = {}; + if (promptOverride) { + body.prompt_override = promptOverride; + } + return request( + `${BASE}/admin/pipeline/rerun-stage/${videoId}/${stageName}`, + { + method: "POST", + body: Object.keys(body).length > 0 ? JSON.stringify(body) : undefined, + }, + ); +} + +// ── Stale Pages & Bulk Re-Synthesize ─────────────────────────────────────── + +export interface StalePageCreator { + creator: string; + stale_count: number; + page_slugs: string[]; +} + +export interface StalePagesResponse { + current_prompt_hash: string; + total_pages: number; + stale_pages: number; + fresh_pages: number; + stale_by_creator: StalePageCreator[]; +} + +export async function fetchStalePages(): Promise { + return request(`${BASE}/admin/pipeline/stale-pages`); +} + +export interface BulkResynthResponse { + status: string; + stage: string; + total: number; + dispatched: number; + skipped: Array<{ video_id: string; reason: string }> | null; +} + +export async function bulkResynthesize( + videoIds?: string[], + stage = "stage5_synthesis", +): Promise { + return request(`${BASE}/admin/pipeline/bulk-resynthesize`, { + method: "POST", + body: JSON.stringify({ video_ids: videoIds ?? null, stage }), + }); +} + // ── Debug Mode ────────────────────────────────────────────────────────────── export interface DebugModeResponse { diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx index ce7322c..e6f2712 100644 --- a/frontend/src/pages/AdminPipeline.tsx +++ b/frontend/src/pages/AdminPipeline.tsx @@ -16,6 +16,10 @@ import { triggerPipeline, revokePipeline, cleanRetriggerPipeline, + rerunStage, + fetchChunkingData, + fetchStalePages, + bulkResynthesize, fetchCreators, fetchRecentActivity, type PipelineVideoItem, @@ -771,6 +775,88 @@ interface BulkLogEntry { message: string; } +// ── Chunking Inspector ───────────────────────────────────────────────────── + +function ChunkingInspector({ videoId }: { videoId: string }) { + const [data, setData] = useState> | null>(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [expanded, setExpanded] = useState(false); + + useEffect(() => { + if (!expanded) return; + let cancelled = false; + setLoading(true); + fetchChunkingData(videoId) + .then((res) => { if (!cancelled) { setData(res); setError(null); } }) + .catch((err) => { if (!cancelled) setError(err instanceof Error ? err.message : "Failed"); }) + .finally(() => { if (!cancelled) setLoading(false); }); + return () => { cancelled = true; }; + }, [videoId, expanded]); + + return ( +
+ + + {expanded && loading &&
Loading chunking data...
} + {expanded && error &&
{error}
} + + {expanded && data && ( +
+
+ {data.total_segments} segments + {data.total_moments} moments + Classification: {data.classification_source} + Chunk size: {data.synthesis_chunk_size} +
+ + {/* Topic Boundaries */} +
+

Topic Boundaries (Stage 2)

+
+ {data.topic_boundaries.map((tb, i) => ( +
+ {tb.topic_label} + + {tb.segment_count} segs · {tb.start_time.toFixed(0)}s-{tb.end_time.toFixed(0)}s + +
+ ))} +
+
+ + {/* Synthesis Groups */} +
+

Synthesis Groups (Stage 5 Input)

+
+ {data.synthesis_groups.map((g) => ( +
+ {g.category} + {g.moment_count} moments + {g.exceeds_chunk_threshold && ( + + Will split into {g.chunks_needed} chunks + + )} +
+ ))} +
+
+
+ )} +
+ ); +} + + export default function AdminPipeline() { useDocumentTitle("Pipeline Management — Chrysopedia"); const [searchParams] = useSearchParams(); @@ -786,6 +872,12 @@ export default function AdminPipeline() { const [creatorFilter, setCreatorFilter] = useState(null); const [searchQuery, setSearchQuery] = useState(""); const [selectedIds, setSelectedIds] = useState>(new Set()); + // Re-run stage modal state + const [rerunModalVideo, setRerunModalVideo] = useState(null); + const [rerunStageSelect, setRerunStageSelect] = useState("stage5_synthesis"); + const [rerunPromptOverride, setRerunPromptOverride] = useState(""); + // Stale pages state + const [stalePagesCount, setStalePagesCount] = useState(null); const [bulkProgress, setBulkProgress] = useState(null); const [bulkLog, setBulkLog] = useState([]); const [changedIds, setChangedIds] = useState>(new Set()); @@ -869,6 +961,32 @@ export default function AdminPipeline() { }); }, []); + // Load stale page count + useEffect(() => { + fetchStalePages() + .then((res) => setStalePagesCount(res.stale_pages)) + .catch(() => { /* silently fail */ }); + }, []); + + const handleBulkResynth = async () => { + if (!confirm("Re-run stage 5 synthesis on all completed videos with the current prompt?")) return; + try { + const res = await bulkResynthesize(); + setActionMessage({ + id: "__bulk__", + text: `Bulk re-synthesize dispatched: ${res.dispatched}/${res.total} videos`, + ok: true, + }); + setStalePagesCount(null); // will refresh + } catch (err) { + setActionMessage({ + id: "__bulk__", + text: err instanceof Error ? err.message : "Bulk re-synth failed", + ok: false, + }); + } + }; + // Deep-link: auto-expand and scroll to ?video= on first load useEffect(() => { if (deepLinked.current || loading || videos.length === 0) return; @@ -941,6 +1059,34 @@ export default function AdminPipeline() { } }; + const handleRerunStage = async (videoId: string) => { + setActionLoading(videoId); + setActionMessage(null); + try { + const res = await rerunStage( + videoId, + rerunStageSelect, + rerunPromptOverride.trim() || undefined, + ); + setActionMessage({ + id: videoId, + text: `Stage re-run dispatched: ${res.stage}${res.prompt_override ? " (with prompt override)" : ""}`, + ok: true, + }); + setRerunModalVideo(null); + setRerunPromptOverride(""); + setTimeout(() => void load(), 2000); + } catch (err) { + setActionMessage({ + id: videoId, + text: err instanceof Error ? err.message : "Stage re-run failed", + ok: false, + }); + } finally { + setActionLoading(null); + } + }; + const toggleExpand = (id: string) => { setExpandedId((prev) => (prev === id ? null : id)); }; @@ -1033,6 +1179,15 @@ export default function AdminPipeline() {

+ {stalePagesCount !== null && stalePagesCount > 0 && ( + + )}
@@ -1251,6 +1414,7 @@ export default function AdminPipeline() { Updated: {formatDate(video.updated_at)} + )} @@ -1271,6 +1435,59 @@ export default function AdminPipeline() { )} + + {/* Re-run Stage Modal */} + {rerunModalVideo && ( +
setRerunModalVideo(null)}> +
e.stopPropagation()}> +

Re-run Single Stage

+

+ Re-run a specific pipeline stage without re-running predecessors. +

+
+ + +
+
+ +