From f0d0b8ac1ab1c5c97aedf4c18ab84a869f938df1 Mon Sep 17 00:00:00 2001
From: jlightner
Date: Thu, 2 Apr 2026 15:47:46 +0000
Subject: [PATCH] =?UTF-8?q?feat:=20add=20pipeline=20iteration=20tooling=20?=
=?UTF-8?q?=E2=80=94=20offline=20test=20harness,=20stage=20re-runs,=20chun?=
=?UTF-8?q?king=20inspector?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Drops prompt iteration cycles from 20-30 min to under 5 min by enabling
stage-isolated re-runs and offline prompt testing against exported fixtures.
Phase 1: Offline prompt test harness
- export_fixture.py: export stage 5 inputs from DB to reusable JSON fixtures
- test_harness.py: run synthesis offline with any prompt, no Docker needed
- promote subcommand: deploy winning prompts with backup and optional git commit
Phase 2: Classification data persistence
- Dual-write classification to PostgreSQL + Redis (fixes 24hr TTL data loss)
- Clean retrigger now clears Redis cache keys (fixes stale data bug)
- Alembic migration 011: classification_data JSONB column + stage_rerun enum
Phase 3: Stage-isolated re-run
- run_single_stage Celery task with prerequisite validation and prompt overrides
- _load_prompt supports per-video Redis overrides for testing custom prompts
- POST /admin/pipeline/rerun-stage/{video_id}/{stage_name} endpoint
- Frontend: Re-run Stage modal with stage selector and prompt override textarea
Phase 4: Chunking inspector
- GET /admin/pipeline/chunking/{video_id} returns topic boundaries,
classifications, and synthesis group breakdowns
- Frontend: collapsible Chunking Inspector panel per video
Phase 5: Prompt deployment & stale data cleanup
- GET /admin/pipeline/stale-pages detects pages from older prompts
- POST /admin/pipeline/bulk-resynthesize re-runs a stage on all completed videos
- Frontend: stale pages indicator badge with one-click bulk re-synth
Phase 6: Automated iteration foundation
- Quality CLI --video-id flag auto-exports fixture from DB
- POST /admin/pipeline/optimize-prompt/{stage} dispatches optimization as Celery task
Co-Authored-By: Claude Opus 4.6 (1M context)
---
...11_classification_cache_and_stage_rerun.py | 35 ++
backend/models.py | 2 +
backend/pipeline/export_fixture.py | 306 +++++++++++
backend/pipeline/quality/__main__.py | 43 +-
backend/pipeline/stages.py | 352 ++++++++++++-
backend/pipeline/test_harness.py | 481 ++++++++++++++++++
backend/routers/pipeline.py | 431 +++++++++++++++-
frontend/src/App.css | 175 +++++++
frontend/src/api/public-client.ts | 107 ++++
frontend/src/pages/AdminPipeline.tsx | 217 ++++++++
10 files changed, 2123 insertions(+), 26 deletions(-)
create mode 100644 alembic/versions/011_classification_cache_and_stage_rerun.py
create mode 100644 backend/pipeline/export_fixture.py
create mode 100644 backend/pipeline/test_harness.py
diff --git a/alembic/versions/011_classification_cache_and_stage_rerun.py b/alembic/versions/011_classification_cache_and_stage_rerun.py
new file mode 100644
index 0000000..4cf4cb8
--- /dev/null
+++ b/alembic/versions/011_classification_cache_and_stage_rerun.py
@@ -0,0 +1,35 @@
+"""Add classification_data JSONB column to source_videos and stage_rerun trigger.
+
+Persists stage 4 classification data in PostgreSQL alongside Redis cache,
+eliminating the 24-hour TTL data loss risk. Also adds the 'stage_rerun'
+trigger value for single-stage re-run support.
+
+Revision ID: 011_classification_cache_and_stage_rerun
+Revises: 010_add_pipeline_runs
+"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import JSONB
+
+revision = "011_classification_cache_and_stage_rerun"
+down_revision = "010_add_pipeline_runs"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ # Add classification_data column to source_videos
+ op.add_column(
+ "source_videos",
+ sa.Column("classification_data", JSONB, nullable=True),
+ )
+
+ # Add 'stage_rerun' to the pipeline_run_trigger enum
+ # PostgreSQL enums require ALTER TYPE to add values
+ op.execute("ALTER TYPE pipeline_run_trigger ADD VALUE IF NOT EXISTS 'stage_rerun'")
+
+
+def downgrade() -> None:
+ op.drop_column("source_videos", "classification_data")
+ # Note: PostgreSQL does not support removing values from enums.
+ # The 'stage_rerun' value will remain but be unused after downgrade.
diff --git a/backend/models.py b/backend/models.py
index f2e0fac..5a0e3ed 100644
--- a/backend/models.py
+++ b/backend/models.py
@@ -138,6 +138,7 @@ class SourceVideo(Base):
default=ProcessingStatus.not_started,
server_default="not_started",
)
+ classification_data: Mapped[list | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)
@@ -377,6 +378,7 @@ class PipelineRunTrigger(str, enum.Enum):
clean_reprocess = "clean_reprocess"
auto_ingest = "auto_ingest"
bulk = "bulk"
+ stage_rerun = "stage_rerun"
class PipelineRun(Base):
diff --git a/backend/pipeline/export_fixture.py b/backend/pipeline/export_fixture.py
new file mode 100644
index 0000000..25b7856
--- /dev/null
+++ b/backend/pipeline/export_fixture.py
@@ -0,0 +1,306 @@
+"""Export pipeline stage inputs for a video as a reusable JSON fixture.
+
+Connects to the live database, queries KeyMoments and classification data,
+and writes a fixture file that the test harness can consume offline.
+
+Usage:
+ python -m pipeline.export_fixture --video-id --output fixtures/video.json
+ python -m pipeline.export_fixture --video-id # prints to stdout
+ python -m pipeline.export_fixture --list # list available videos
+
+Requires: DATABASE_URL, REDIS_URL environment variables (or .env file).
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+import time
+from collections import Counter
+from pathlib import Path
+
+from sqlalchemy import create_engine, select
+from sqlalchemy.orm import Session, sessionmaker
+
+
+def _log(tag: str, msg: str, level: str = "INFO") -> None:
+ """Write structured log line to stderr."""
+ print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr)
+
+
+def _get_sync_session(database_url: str) -> Session:
+ """Create a sync SQLAlchemy session from the database URL."""
+ url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
+ engine = create_engine(url, pool_pre_ping=True)
+ factory = sessionmaker(bind=engine)
+ return factory()
+
+
+def _list_videos(database_url: str) -> int:
+ """List all videos with their processing status and moment counts."""
+ from models import Creator, KeyMoment, SourceVideo
+
+ session = _get_sync_session(database_url)
+ try:
+ videos = (
+ session.execute(
+ select(SourceVideo).order_by(SourceVideo.created_at.desc())
+ )
+ .scalars()
+ .all()
+ )
+ if not videos:
+ _log("LIST", "No videos found in database")
+ return 0
+
+ print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr)
+ print(f"{'─'*38} {'─'*14} {'─'*7} {'─'*20} {'─'*40}", file=sys.stderr)
+
+ for video in videos:
+ moment_count = (
+ session.execute(
+ select(KeyMoment.id).where(KeyMoment.source_video_id == video.id)
+ )
+ .scalars()
+ .all()
+ )
+ creator = session.execute(
+ select(Creator).where(Creator.id == video.creator_id)
+ ).scalar_one_or_none()
+ creator_name = creator.name if creator else "?"
+
+ print(
+ f"{str(video.id):<38s} {video.processing_status.value:<14s} "
+ f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}",
+ file=sys.stderr,
+ )
+
+ print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr)
+ return 0
+ finally:
+ session.close()
+
+
+def export_fixture(
+ database_url: str,
+ redis_url: str,
+ video_id: str,
+ output_path: str | None = None,
+) -> int:
+ """Export stage 5 inputs for a video as a JSON fixture.
+
+ Returns exit code: 0 = success, 1 = error.
+ """
+ from models import Creator, KeyMoment, SourceVideo
+
+ start = time.monotonic()
+ _log("CONNECT", "Connecting to database...")
+
+ session = _get_sync_session(database_url)
+ try:
+ # ── Load video ──────────────────────────────────────────────────
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one_or_none()
+
+ if video is None:
+ _log("ERROR", f"Video not found: {video_id}", level="ERROR")
+ return 1
+
+ creator = session.execute(
+ select(Creator).where(Creator.id == video.creator_id)
+ ).scalar_one_or_none()
+ creator_name = creator.name if creator else "Unknown"
+
+ _log(
+ "VIDEO",
+ f"Found: {video.filename} by {creator_name} "
+ f"({video.duration_seconds or '?'}s, {video.content_type.value}, "
+ f"status={video.processing_status.value})",
+ )
+
+ # ── Load key moments ────────────────────────────────────────────
+ moments = (
+ session.execute(
+ select(KeyMoment)
+ .where(KeyMoment.source_video_id == video_id)
+ .order_by(KeyMoment.start_time)
+ )
+ .scalars()
+ .all()
+ )
+
+ if not moments:
+ _log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR")
+ _log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR")
+ return 1
+
+ time_min = min(m.start_time for m in moments)
+ time_max = max(m.end_time for m in moments)
+ _log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)")
+
+ # ── Load classification data ────────────────────────────────────
+ classification_data: list[dict] = []
+ cls_source = "missing"
+
+ # Try Redis first
+ try:
+ import redis as redis_lib
+
+ r = redis_lib.Redis.from_url(redis_url)
+ key = f"chrysopedia:classification:{video_id}"
+ raw = r.get(key)
+ if raw is not None:
+ classification_data = json.loads(raw)
+ cls_source = "redis"
+ ttl = r.ttl(key)
+ _log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)")
+ except Exception as exc:
+ _log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN")
+
+ # Fallback: check SourceVideo.classification_data column (Phase 2 addition)
+ if not classification_data:
+ video_cls = getattr(video, "classification_data", None)
+ if video_cls:
+ classification_data = video_cls
+ cls_source = "postgresql"
+ _log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)")
+
+ if not classification_data:
+ _log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN")
+ _log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN")
+ cls_source = "missing"
+
+ # Build classification lookup by moment_id
+ cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
+
+ # Count moments without classification
+ unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id)
+ if unclassified > 0:
+ _log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN")
+
+ # ── Build fixture ───────────────────────────────────────────────
+ fixture_moments = []
+ category_counts: Counter[str] = Counter()
+
+ for m in moments:
+ cls_info = cls_by_moment_id.get(str(m.id), {})
+ topic_category = cls_info.get("topic_category", "Uncategorized")
+ topic_tags = cls_info.get("topic_tags", [])
+ category_counts[topic_category] += 1
+
+ fixture_moments.append({
+ "moment_id": str(m.id),
+ "title": m.title,
+ "summary": m.summary,
+ "content_type": m.content_type.value,
+ "start_time": m.start_time,
+ "end_time": m.end_time,
+ "plugins": m.plugins or [],
+ "raw_transcript": m.raw_transcript or "",
+ # Classification data (stage 4 output)
+ "classification": {
+ "topic_category": topic_category,
+ "topic_tags": topic_tags,
+ },
+ # Compatibility fields for existing quality/scorer format
+ "transcript_excerpt": (m.raw_transcript or "")[:500],
+ "topic_tags": topic_tags,
+ "topic_category": topic_category,
+ })
+
+ fixture = {
+ "video_id": str(video.id),
+ "creator_name": creator_name,
+ "content_type": video.content_type.value,
+ "filename": video.filename,
+ "duration_seconds": video.duration_seconds,
+ "classification_source": cls_source,
+ "export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ "moments": fixture_moments,
+ }
+
+ fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False)
+ fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024
+
+ # ── Output ──────────────────────────────────────────────────────
+ if output_path:
+ Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+ Path(output_path).write_text(fixture_json, encoding="utf-8")
+ _log(
+ "OUTPUT",
+ f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, "
+ f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
+ )
+ else:
+ # Print fixture JSON to stdout
+ print(fixture_json)
+ _log(
+ "OUTPUT",
+ f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, "
+ f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
+ )
+
+ # Category breakdown
+ for cat, count in category_counts.most_common():
+ _log("CATEGORY", f" {cat}: {count} moments")
+
+ elapsed = time.monotonic() - start
+ _log("DONE", f"Export completed in {elapsed:.1f}s")
+ return 0
+
+ except Exception as exc:
+ _log("ERROR", f"Export failed: {exc}", level="ERROR")
+ import traceback
+ traceback.print_exc(file=sys.stderr)
+ return 1
+ finally:
+ session.close()
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(
+ prog="pipeline.export_fixture",
+ description="Export pipeline stage inputs for a video as a reusable JSON fixture",
+ )
+ parser.add_argument(
+ "--video-id",
+ type=str,
+ help="UUID of the video to export",
+ )
+ parser.add_argument(
+ "--output", "-o",
+ type=str,
+ default=None,
+ help="Output file path (default: print to stdout)",
+ )
+ parser.add_argument(
+ "--list",
+ action="store_true",
+ default=False,
+ help="List all videos with status and moment counts",
+ )
+
+ args = parser.parse_args()
+
+ # Load settings
+ from config import get_settings
+ settings = get_settings()
+
+ if args.list:
+ return _list_videos(settings.database_url)
+
+ if not args.video_id:
+ parser.error("--video-id is required (or use --list to see available videos)")
+
+ return export_fixture(
+ database_url=settings.database_url,
+ redis_url=settings.redis_url,
+ video_id=args.video_id,
+ output_path=args.output,
+ )
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/backend/pipeline/quality/__main__.py b/backend/pipeline/quality/__main__.py
index 091826a..8c6d062 100644
--- a/backend/pipeline/quality/__main__.py
+++ b/backend/pipeline/quality/__main__.py
@@ -236,12 +236,17 @@ def main() -> int:
default=2,
help="Variants generated per iteration (default: 2)",
)
- opt_parser.add_argument(
+ opt_source = opt_parser.add_mutually_exclusive_group(required=True)
+ opt_source.add_argument(
"--file",
type=str,
- required=True,
help="Path to moments JSON fixture file",
)
+ opt_source.add_argument(
+ "--video-id",
+ type=str,
+ help="Video UUID — exports fixture from DB automatically (requires DATABASE_URL, REDIS_URL)",
+ )
opt_parser.add_argument(
"--output-dir",
type=str,
@@ -360,10 +365,34 @@ def _run_optimize(args: argparse.Namespace) -> int:
)
return 1
- # Validate fixture file exists
- fixture = Path(args.file)
+ # Resolve fixture: either from --file or auto-export from --video-id
+ fixture_path: str
+ if args.file:
+ fixture_path = args.file
+ else:
+ # Auto-export from database
+ print(f"\n[OPTIMIZE] Exporting fixture from video_id={args.video_id}...", file=sys.stderr)
+ import tempfile
+ from pipeline.export_fixture import export_fixture
+
+ settings = get_settings()
+ tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="optimize_fixture_", delete=False)
+ tmp.close()
+ exit_code = export_fixture(
+ database_url=settings.database_url,
+ redis_url=settings.redis_url,
+ video_id=args.video_id,
+ output_path=tmp.name,
+ )
+ if exit_code != 0:
+ print(f"Error: fixture export failed (exit code {exit_code})", file=sys.stderr)
+ return 1
+ fixture_path = tmp.name
+ print(f"[OPTIMIZE] Fixture exported to: {fixture_path}", file=sys.stderr)
+
+ fixture = Path(fixture_path)
if not fixture.exists():
- print(f"Error: fixture file not found: {args.file}", file=sys.stderr)
+ print(f"Error: fixture file not found: {fixture_path}", file=sys.stderr)
return 1
# Ensure output dir
@@ -375,7 +404,7 @@ def _run_optimize(args: argparse.Namespace) -> int:
loop = OptimizationLoop(
client=client,
stage=args.stage,
- fixture_path=args.file,
+ fixture_path=fixture_path,
iterations=args.iterations,
variants_per_iter=args.variants_per_iter,
output_dir=args.output_dir,
@@ -407,7 +436,7 @@ def _run_optimize(args: argparse.Namespace) -> int:
stage=args.stage,
iterations=args.iterations,
variants_per_iter=args.variants_per_iter,
- fixture_path=args.file,
+ fixture_path=fixture_path,
)
print(f" Results written to: {json_path}")
except OSError as exc:
diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py
index e148cd6..2e6fe5d 100644
--- a/backend/pipeline/stages.py
+++ b/backend/pipeline/stages.py
@@ -241,11 +241,34 @@ def _get_sync_session() -> Session:
return _SessionLocal()
-def _load_prompt(template_name: str) -> str:
+def _load_prompt(template_name: str, video_id: str | None = None) -> str:
"""Read a prompt template from the prompts directory.
- Raises FileNotFoundError if the template does not exist.
+ If ``video_id`` is provided, checks Redis for a per-video prompt override
+ (key: ``chrysopedia:prompt_override:{video_id}:{template_name}``) before
+ falling back to the on-disk template. Overrides are set by
+ ``run_single_stage`` for single-stage re-runs with custom prompts.
+
+ Raises FileNotFoundError if no override exists and the template is missing.
"""
+ # Check for per-video prompt override in Redis
+ if video_id:
+ try:
+ import redis
+ settings = get_settings()
+ r = redis.Redis.from_url(settings.redis_url)
+ override_key = f"chrysopedia:prompt_override:{video_id}:{template_name}"
+ override = r.get(override_key)
+ if override:
+ prompt_text = override.decode("utf-8")
+ logger.info(
+ "[PROMPT] Using override from Redis: video_id=%s, template=%s (%d chars)",
+ video_id, template_name, len(prompt_text),
+ )
+ return prompt_text
+ except Exception as exc:
+ logger.warning("[PROMPT] Redis override check failed: %s", exc)
+
settings = get_settings()
path = Path(settings.prompts_path) / template_name
if not path.exists():
@@ -398,7 +421,7 @@ def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
transcript_text = "\n".join(transcript_lines)
# Load prompt and call LLM
- system_prompt = _load_prompt("stage2_segmentation.txt")
+ system_prompt = _load_prompt("stage2_segmentation.txt", video_id=video_id)
user_prompt = f"\n{transcript_text}\n"
llm = _get_llm_client()
@@ -477,7 +500,7 @@ def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str:
label = seg.topic_label or "unlabeled"
groups[label].append(seg)
- system_prompt = _load_prompt("stage3_extraction.txt")
+ system_prompt = _load_prompt("stage3_extraction.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(3)
hard_limit = get_settings().llm_max_tokens_hard_limit
@@ -650,7 +673,7 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str
tags_data = _load_canonical_tags()
taxonomy_text = _format_taxonomy_for_prompt(tags_data)
- system_prompt = _load_prompt("stage4_classification.txt")
+ system_prompt = _load_prompt("stage4_classification.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(4)
hard_limit = get_settings().llm_max_tokens_hard_limit
@@ -716,26 +739,108 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str
def _store_classification_data(video_id: str, data: list[dict]) -> None:
- """Store classification data in Redis for cross-stage communication."""
+ """Store classification data in Redis (cache) and PostgreSQL (durable).
+
+ Dual-write ensures classification data survives Redis TTL expiry or flush.
+ Redis serves as the fast-path cache; PostgreSQL is the durable fallback.
+ """
import redis
settings = get_settings()
- r = redis.Redis.from_url(settings.redis_url)
- key = f"chrysopedia:classification:{video_id}"
- r.set(key, json.dumps(data), ex=86400) # Expire after 24 hours
+
+ # Redis: fast cache with 7-day TTL
+ try:
+ r = redis.Redis.from_url(settings.redis_url)
+ key = f"chrysopedia:classification:{video_id}"
+ r.set(key, json.dumps(data), ex=604800) # 7 days
+ logger.info(
+ "[CLASSIFY-STORE] Redis write: video_id=%s, %d entries, ttl=7d",
+ video_id, len(data),
+ )
+ except Exception as exc:
+ logger.warning(
+ "[CLASSIFY-STORE] Redis write failed for video_id=%s: %s", video_id, exc,
+ )
+
+ # PostgreSQL: durable storage on SourceVideo.classification_data
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one_or_none()
+ if video:
+ video.classification_data = data
+ session.commit()
+ logger.info(
+ "[CLASSIFY-STORE] PostgreSQL write: video_id=%s, %d entries",
+ video_id, len(data),
+ )
+ else:
+ logger.warning(
+ "[CLASSIFY-STORE] Video not found for PostgreSQL write: %s", video_id,
+ )
+ except Exception as exc:
+ session.rollback()
+ logger.warning(
+ "[CLASSIFY-STORE] PostgreSQL write failed for video_id=%s: %s", video_id, exc,
+ )
+ finally:
+ session.close()
def _load_classification_data(video_id: str) -> list[dict]:
- """Load classification data from Redis."""
+ """Load classification data from Redis (fast path) or PostgreSQL (fallback).
+
+ Tries Redis first. If the key has expired or Redis is unavailable, falls
+ back to the durable SourceVideo.classification_data column.
+ """
import redis
settings = get_settings()
- r = redis.Redis.from_url(settings.redis_url)
- key = f"chrysopedia:classification:{video_id}"
- raw = r.get(key)
- if raw is None:
- return []
- return json.loads(raw)
+
+ # Try Redis first (fast path)
+ try:
+ r = redis.Redis.from_url(settings.redis_url)
+ key = f"chrysopedia:classification:{video_id}"
+ raw = r.get(key)
+ if raw is not None:
+ data = json.loads(raw)
+ logger.info(
+ "[CLASSIFY-LOAD] Source: redis, video_id=%s, %d entries",
+ video_id, len(data),
+ )
+ return data
+ except Exception as exc:
+ logger.warning(
+ "[CLASSIFY-LOAD] Redis unavailable for video_id=%s: %s", video_id, exc,
+ )
+
+ # Fallback to PostgreSQL
+ logger.info("[CLASSIFY-LOAD] Redis miss, falling back to PostgreSQL for video_id=%s", video_id)
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one_or_none()
+ if video and video.classification_data:
+ data = video.classification_data
+ logger.info(
+ "[CLASSIFY-LOAD] Source: postgresql, video_id=%s, %d entries",
+ video_id, len(data),
+ )
+ return data
+ except Exception as exc:
+ logger.warning(
+ "[CLASSIFY-LOAD] PostgreSQL fallback failed for video_id=%s: %s", video_id, exc,
+ )
+ finally:
+ session.close()
+
+ logger.warning(
+ "[CLASSIFY-LOAD] No classification data found in Redis or PostgreSQL for video_id=%s",
+ video_id,
+ )
+ return []
@@ -969,7 +1074,7 @@ def _merge_pages_by_slug(
indent=2, ensure_ascii=False,
)
- merge_system_prompt = _load_prompt("stage5_merge.txt")
+ merge_system_prompt = _load_prompt("stage5_merge.txt", video_id=video_id)
merge_user_prompt = f"{creator_name}\n\n{pages_json}\n"
max_tokens = estimate_max_tokens(
@@ -1080,7 +1185,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
category = cls_info.get("topic_category", "Uncategorized").strip().title()
groups[category].append((moment, cls_info))
- system_prompt = _load_prompt("stage5_synthesis.txt")
+ system_prompt = _load_prompt("stage5_synthesis.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(5)
hard_limit = settings.llm_max_tokens_hard_limit
@@ -1784,3 +1889,214 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str:
raise
return video_id
+
+
+# ── Single-Stage Re-Run ─────────────────────────────────────────────────────
+
+@celery_app.task
+def run_single_stage(
+ video_id: str,
+ stage_name: str,
+ trigger: str = "stage_rerun",
+ prompt_override: str | None = None,
+) -> str:
+ """Re-run a single pipeline stage without running predecessors.
+
+ Designed for fast prompt iteration — especially stage 5 synthesis.
+ Bypasses the processing_status==complete guard, creates a proper
+ PipelineRun record, and restores status on completion.
+
+ If ``prompt_override`` is provided, it is stored in Redis as a
+ per-video override that ``_load_prompt`` reads before falling back
+ to the on-disk template. The override is cleaned up after the stage runs.
+
+ Returns the video_id.
+ """
+ import redis as redis_lib
+
+ logger.info(
+ "[RERUN] Starting single-stage re-run: video_id=%s, stage=%s, trigger=%s",
+ video_id, stage_name, trigger,
+ )
+
+ # Validate stage name
+ if stage_name not in _PIPELINE_STAGES:
+ raise ValueError(
+ f"[RERUN] Invalid stage '{stage_name}'. "
+ f"Valid stages: {_PIPELINE_STAGES}"
+ )
+
+ # Validate video exists
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one_or_none()
+ if video is None:
+ raise ValueError(f"[RERUN] Video not found: {video_id}")
+ original_status = video.processing_status
+ finally:
+ session.close()
+
+ # Validate prerequisites for the requested stage
+ prereq_ok, prereq_msg = _check_stage_prerequisites(video_id, stage_name)
+ if not prereq_ok:
+ logger.error("[RERUN] Prerequisite check failed: %s", prereq_msg)
+ raise ValueError(f"[RERUN] Prerequisites not met: {prereq_msg}")
+
+ logger.info("[RERUN] Prerequisite check passed: %s", prereq_msg)
+
+ # Store prompt override in Redis if provided
+ override_key = None
+ if prompt_override:
+ settings = get_settings()
+ try:
+ r = redis_lib.Redis.from_url(settings.redis_url)
+ # Map stage name to its prompt template
+ stage_prompt_map = {
+ "stage2_segmentation": "stage2_segmentation.txt",
+ "stage3_extraction": "stage3_extraction.txt",
+ "stage4_classification": "stage4_classification.txt",
+ "stage5_synthesis": "stage5_synthesis.txt",
+ }
+ template = stage_prompt_map.get(stage_name)
+ if template:
+ override_key = f"chrysopedia:prompt_override:{video_id}:{template}"
+ r.set(override_key, prompt_override, ex=3600) # 1-hour TTL
+ logger.info(
+ "[RERUN] Prompt override stored: key=%s (%d chars, first 100: %s)",
+ override_key, len(prompt_override), prompt_override[:100],
+ )
+ except Exception as exc:
+ logger.warning("[RERUN] Failed to store prompt override: %s", exc)
+
+ # Snapshot prior pages (needed for stage 5 page matching)
+ if stage_name in ("stage5_synthesis",):
+ _snapshot_prior_pages(video_id)
+
+ # Create pipeline run record
+ run_id = _create_run(video_id, trigger)
+ logger.info("[RERUN] Created run_id=%s", run_id)
+
+ # Temporarily set status to processing
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one()
+ video.processing_status = ProcessingStatus.processing
+ session.commit()
+ finally:
+ session.close()
+
+ # Run the single stage
+ start = time.monotonic()
+ try:
+ task_func = _STAGE_TASKS[stage_name]
+ task_func(video_id, run_id=run_id)
+
+ elapsed = time.monotonic() - start
+ logger.info(
+ "[RERUN] Stage %s completed: %.1fs, video_id=%s",
+ stage_name, elapsed, video_id,
+ )
+ _finish_run(run_id, "complete")
+
+ # Restore status to complete
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one()
+ video.processing_status = ProcessingStatus.complete
+ session.commit()
+ logger.info("[RERUN] Status restored to complete")
+ finally:
+ session.close()
+
+ except Exception as exc:
+ elapsed = time.monotonic() - start
+ logger.error(
+ "[RERUN] Stage %s FAILED after %.1fs: %s",
+ stage_name, elapsed, exc,
+ )
+ _set_error_status(video_id, stage_name, exc)
+ _finish_run(run_id, "error", error_stage=stage_name)
+ raise
+
+ finally:
+ # Clean up prompt override from Redis
+ if override_key:
+ try:
+ settings = get_settings()
+ r = redis_lib.Redis.from_url(settings.redis_url)
+ r.delete(override_key)
+ logger.info("[RERUN] Prompt override cleaned up: %s", override_key)
+ except Exception as exc:
+ logger.warning("[RERUN] Failed to clean up prompt override: %s", exc)
+
+ return video_id
+
+
+def _check_stage_prerequisites(video_id: str, stage_name: str) -> tuple[bool, str]:
+ """Validate that prerequisite data exists for a stage re-run.
+
+ Returns (ok, message) where message describes what was found or missing.
+ """
+ session = _get_sync_session()
+ try:
+ if stage_name == "stage2_segmentation":
+ # Needs transcript segments
+ count = session.execute(
+ select(func.count(TranscriptSegment.id))
+ .where(TranscriptSegment.source_video_id == video_id)
+ ).scalar() or 0
+ if count == 0:
+ return False, "No transcript segments found"
+ return True, f"transcript_segments={count}"
+
+ if stage_name == "stage3_extraction":
+ # Needs transcript segments with topic_labels
+ count = session.execute(
+ select(func.count(TranscriptSegment.id))
+ .where(
+ TranscriptSegment.source_video_id == video_id,
+ TranscriptSegment.topic_label.isnot(None),
+ )
+ ).scalar() or 0
+ if count == 0:
+ return False, "No labeled transcript segments (stage 2 must complete first)"
+ return True, f"labeled_segments={count}"
+
+ if stage_name == "stage4_classification":
+ # Needs key moments
+ count = session.execute(
+ select(func.count(KeyMoment.id))
+ .where(KeyMoment.source_video_id == video_id)
+ ).scalar() or 0
+ if count == 0:
+ return False, "No key moments found (stage 3 must complete first)"
+ return True, f"key_moments={count}"
+
+ if stage_name == "stage5_synthesis":
+ # Needs key moments + classification data
+ km_count = session.execute(
+ select(func.count(KeyMoment.id))
+ .where(KeyMoment.source_video_id == video_id)
+ ).scalar() or 0
+ if km_count == 0:
+ return False, "No key moments found (stages 2-3 must complete first)"
+
+ cls_data = _load_classification_data(video_id)
+ cls_source = "redis+pg"
+ if not cls_data:
+ return False, f"No classification data found (stage 4 must complete first), key_moments={km_count}"
+
+ return True, f"key_moments={km_count}, classification_entries={len(cls_data)}"
+
+ if stage_name == "stage6_embed_and_index":
+ return True, "stage 6 is non-blocking and always runs"
+
+ return False, f"Unknown stage: {stage_name}"
+ finally:
+ session.close()
diff --git a/backend/pipeline/test_harness.py b/backend/pipeline/test_harness.py
new file mode 100644
index 0000000..074628f
--- /dev/null
+++ b/backend/pipeline/test_harness.py
@@ -0,0 +1,481 @@
+"""Offline prompt test harness for Chrysopedia synthesis.
+
+Loads a fixture JSON (exported by export_fixture.py) and a prompt file,
+calls the LLM, and outputs the synthesized result. No Docker, no database,
+no Redis, no Celery — just prompt + fixture + LLM endpoint.
+
+Usage:
+ python -m pipeline.test_harness \\
+ --fixture fixtures/real_video_xyz.json \\
+ --prompt prompts/stage5_synthesis.txt \\
+ --output /tmp/result.json
+
+ # Run all categories in a fixture:
+ python -m pipeline.test_harness --fixture fixtures/video.json
+
+ # Run a specific category only:
+ python -m pipeline.test_harness --fixture fixtures/video.json --category "Sound Design"
+
+Exit codes: 0=success, 1=LLM error, 2=parse error, 3=fixture error
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+import time
+from collections import Counter, defaultdict
+from dataclasses import dataclass
+from pathlib import Path
+from typing import NamedTuple
+
+from pydantic import ValidationError
+
+from config import get_settings
+from pipeline.llm_client import LLMClient, estimate_max_tokens
+from pipeline.schemas import SynthesisResult
+
+
+# ── Lightweight stand-in for KeyMoment ORM model ───────────────────────────
+
+class _MockContentType:
+ """Mimics KeyMomentContentType enum with a .value property."""
+ def __init__(self, value: str) -> None:
+ self.value = value
+
+
+class MockKeyMoment(NamedTuple):
+ """Lightweight stand-in for the ORM KeyMoment.
+
+ Has the same attributes that _build_moments_text() accesses:
+ title, summary, content_type, start_time, end_time, plugins, raw_transcript.
+ """
+ title: str
+ summary: str
+ content_type: object # _MockContentType
+ start_time: float
+ end_time: float
+ plugins: list[str]
+ raw_transcript: str
+
+
+def _log(tag: str, msg: str, level: str = "INFO") -> None:
+ """Write structured log line to stderr."""
+ print(f"[HARNESS] [{level}] {tag}: {msg}", file=sys.stderr)
+
+
+# ── Moment text builder (mirrors stages.py _build_moments_text) ────────────
+
+def build_moments_text(
+ moment_group: list[tuple[MockKeyMoment, dict]],
+ category: str,
+) -> tuple[str, set[str]]:
+ """Build the moments prompt text — matches _build_moments_text in stages.py."""
+ moments_lines = []
+ all_tags: set[str] = set()
+ for i, (m, cls_info) in enumerate(moment_group):
+ tags = cls_info.get("topic_tags", [])
+ all_tags.update(tags)
+ moments_lines.append(
+ f"[{i}] Title: {m.title}\n"
+ f" Summary: {m.summary}\n"
+ f" Content type: {m.content_type.value}\n"
+ f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
+ f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
+ f" Category: {category}\n"
+ f" Tags: {', '.join(tags) if tags else 'none'}\n"
+ f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
+ )
+ return "\n\n".join(moments_lines), all_tags
+
+
+# ── Fixture loading ────────────────────────────────────────────────────────
+
+@dataclass
+class FixtureData:
+ """Parsed fixture with moments grouped by category."""
+ creator_name: str
+ video_id: str
+ content_type: str
+ filename: str
+ # Groups: category -> list of (MockKeyMoment, cls_info_dict)
+ groups: dict[str, list[tuple[MockKeyMoment, dict]]]
+ total_moments: int
+
+
+def load_fixture(path: str) -> FixtureData:
+ """Load and parse a fixture JSON file into grouped moments."""
+ fixture_path = Path(path)
+ if not fixture_path.exists():
+ raise FileNotFoundError(f"Fixture not found: {path}")
+
+ raw = fixture_path.read_text(encoding="utf-8")
+ size_kb = len(raw.encode("utf-8")) / 1024
+ data = json.loads(raw)
+
+ moments_raw = data.get("moments", [])
+ if not moments_raw:
+ raise ValueError(f"Fixture has no moments: {path}")
+
+ _log("FIXTURE", f"Loading: {path} ({size_kb:.1f} KB, {len(moments_raw)} moments)")
+
+ # Build MockKeyMoment objects and group by category
+ groups: dict[str, list[tuple[MockKeyMoment, dict]]] = defaultdict(list)
+
+ for m in moments_raw:
+ cls = m.get("classification", {})
+ category = cls.get("topic_category", m.get("topic_category", "Uncategorized"))
+ tags = cls.get("topic_tags", m.get("topic_tags", []))
+
+ mock = MockKeyMoment(
+ title=m.get("title", m.get("summary", "")[:80]),
+ summary=m.get("summary", ""),
+ content_type=_MockContentType(m.get("content_type", "technique")),
+ start_time=m.get("start_time", 0.0),
+ end_time=m.get("end_time", 0.0),
+ plugins=m.get("plugins", []),
+ raw_transcript=m.get("raw_transcript", m.get("transcript_excerpt", "")),
+ )
+ cls_info = {"topic_category": category, "topic_tags": tags}
+ groups[category].append((mock, cls_info))
+
+ # Log breakdown
+ cat_counts = {cat: len(moms) for cat, moms in groups.items()}
+ counts = list(cat_counts.values())
+ _log(
+ "FIXTURE",
+ f"Breakdown: {len(groups)} categories, "
+ f"moments per category: min={min(counts)}, max={max(counts)}, "
+ f"avg={sum(counts)/len(counts):.1f}",
+ )
+ for cat, count in sorted(cat_counts.items(), key=lambda x: -x[1]):
+ _log("FIXTURE", f" {cat}: {count} moments")
+
+ return FixtureData(
+ creator_name=data.get("creator_name", "Unknown"),
+ video_id=data.get("video_id", "unknown"),
+ content_type=data.get("content_type", "tutorial"),
+ filename=data.get("filename", "unknown"),
+ groups=dict(groups),
+ total_moments=len(moments_raw),
+ )
+
+
+# ── Synthesis runner ───────────────────────────────────────────────────────
+
+def run_synthesis(
+ fixture: FixtureData,
+ prompt_path: str,
+ category_filter: str | None = None,
+ model_override: str | None = None,
+ modality: str | None = None,
+) -> tuple[list[dict], int]:
+ """Run synthesis on fixture data, returns (pages, exit_code).
+
+ Returns all synthesized pages as dicts plus an exit code.
+ """
+ # Load prompt
+ prompt_file = Path(prompt_path)
+ if not prompt_file.exists():
+ _log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
+ return [], 3
+
+ system_prompt = prompt_file.read_text(encoding="utf-8")
+ _log("PROMPT", f"Loading: {prompt_path} ({len(system_prompt)} chars)")
+
+ # Setup LLM
+ settings = get_settings()
+ llm = LLMClient(settings)
+
+ stage_model = model_override or settings.llm_stage5_model or settings.llm_model
+ stage_modality = modality or settings.llm_stage5_modality or "thinking"
+ hard_limit = settings.llm_max_tokens_hard_limit
+
+ _log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
+
+ # Filter categories if requested
+ categories = fixture.groups
+ if category_filter:
+ if category_filter not in categories:
+ _log("ERROR", f"Category '{category_filter}' not found. Available: {list(categories.keys())}", level="ERROR")
+ return [], 3
+ categories = {category_filter: categories[category_filter]}
+
+ all_pages: list[dict] = []
+ total_prompt_tokens = 0
+ total_completion_tokens = 0
+ total_duration_ms = 0
+ exit_code = 0
+
+ for cat_idx, (category, moment_group) in enumerate(categories.items(), 1):
+ _log("SYNTH", f"Category {cat_idx}/{len(categories)}: '{category}' ({len(moment_group)} moments)")
+
+ # Build user prompt (same format as stages.py _synthesize_chunk)
+ moments_text, all_tags = build_moments_text(moment_group, category)
+ user_prompt = f"{fixture.creator_name}\n\n{moments_text}\n"
+
+ estimated_tokens = estimate_max_tokens(
+ system_prompt, user_prompt,
+ stage="stage5_synthesis",
+ hard_limit=hard_limit,
+ )
+ _log(
+ "SYNTH",
+ f" Building prompt: {len(moment_group)} moments, "
+ f"max_tokens={estimated_tokens}, tags={sorted(all_tags)[:5]}{'...' if len(all_tags) > 5 else ''}",
+ )
+
+ # Call LLM
+ call_start = time.monotonic()
+ _log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
+
+ try:
+ raw = llm.complete(
+ system_prompt,
+ user_prompt,
+ response_model=SynthesisResult,
+ modality=stage_modality,
+ model_override=stage_model,
+ max_tokens=estimated_tokens,
+ )
+ except Exception as exc:
+ _log("ERROR", f" LLM call failed: {exc}", level="ERROR")
+ exit_code = 1
+ continue
+
+ call_duration_ms = int((time.monotonic() - call_start) * 1000)
+ prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
+ completion_tokens = getattr(raw, "completion_tokens", None) or 0
+ finish_reason = getattr(raw, "finish_reason", "unknown")
+
+ total_prompt_tokens += prompt_tokens
+ total_completion_tokens += completion_tokens
+ total_duration_ms += call_duration_ms
+
+ _log(
+ "LLM",
+ f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
+ f"{call_duration_ms}ms, finish_reason={finish_reason}",
+ )
+
+ if finish_reason == "length":
+ _log(
+ "WARN",
+ " finish_reason=length — output likely truncated! "
+ "Consider reducing fixture size or increasing max_tokens.",
+ level="WARN",
+ )
+
+ # Parse response
+ try:
+ result = SynthesisResult.model_validate_json(str(raw))
+ except (ValidationError, json.JSONDecodeError) as exc:
+ _log("ERROR", f" Parse failed: {exc}", level="ERROR")
+ _log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
+ exit_code = 2
+ continue
+
+ # Log per-page summary
+ _log("SYNTH", f" Parsed: {len(result.pages)} pages synthesized")
+ total_words = 0
+ for page in result.pages:
+ sections = page.body_sections or {}
+ word_count = sum(len(str(v).split()) for v in sections.values())
+ total_words += word_count
+ _log(
+ "PAGE",
+ f" '{page.title}' ({page.slug}): "
+ f"{len(sections)} sections, {word_count} words, "
+ f"{len(page.moment_indices)} moments linked, "
+ f"quality={page.source_quality}",
+ )
+
+ all_pages.append(page.model_dump())
+
+ # Summary
+ _log("SUMMARY", f"Total: {len(all_pages)} pages across {len(categories)} categories")
+ _log("SUMMARY", f"Tokens: {total_prompt_tokens} prompt + {total_completion_tokens} completion = {total_prompt_tokens + total_completion_tokens} total")
+ _log("SUMMARY", f"Duration: {total_duration_ms}ms ({total_duration_ms / 1000:.1f}s)")
+
+ return all_pages, exit_code
+
+
+# ── Promote: deploy a prompt to production ─────────────────────────────────
+
+_STAGE_PROMPT_MAP = {
+ 2: "stage2_segmentation.txt",
+ 3: "stage3_extraction.txt",
+ 4: "stage4_classification.txt",
+ 5: "stage5_synthesis.txt",
+}
+
+
+def promote_prompt(prompt_path: str, stage: int, reason: str, commit: bool = False) -> int:
+ """Copy a winning prompt to the canonical path and create a backup.
+
+ The worker reads prompts from disk at runtime — no restart needed.
+ """
+ import hashlib
+ import shutil
+
+ if stage not in _STAGE_PROMPT_MAP:
+ _log("ERROR", f"Invalid stage {stage}. Valid: {sorted(_STAGE_PROMPT_MAP)}", level="ERROR")
+ return 1
+
+ settings = get_settings()
+ template_name = _STAGE_PROMPT_MAP[stage]
+ canonical = Path(settings.prompts_path) / template_name
+ source = Path(prompt_path)
+
+ if not source.exists():
+ _log("ERROR", f"Source prompt not found: {prompt_path}", level="ERROR")
+ return 1
+
+ new_prompt = source.read_text(encoding="utf-8")
+ new_hash = hashlib.sha256(new_prompt.encode()).hexdigest()[:12]
+
+ # Backup current prompt
+ old_prompt = ""
+ old_hash = "none"
+ if canonical.exists():
+ old_prompt = canonical.read_text(encoding="utf-8")
+ old_hash = hashlib.sha256(old_prompt.encode()).hexdigest()[:12]
+
+ if old_prompt.strip() == new_prompt.strip():
+ _log("PROMOTE", "No change — new prompt is identical to current prompt")
+ return 0
+
+ archive_dir = Path(settings.prompts_path) / "archive"
+ archive_dir.mkdir(parents=True, exist_ok=True)
+ ts = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
+ backup = archive_dir / f"{template_name.replace('.txt', '')}_{ts}.txt"
+ shutil.copy2(canonical, backup)
+ _log("PROMOTE", f"Backed up current prompt: {old_hash} -> {backup}")
+
+ # Write new prompt
+ canonical.write_text(new_prompt, encoding="utf-8")
+
+ old_lines = old_prompt.strip().splitlines()
+ new_lines = new_prompt.strip().splitlines()
+ _log("PROMOTE", f"Installed new prompt: {new_hash} ({len(new_prompt)} chars, {len(new_lines)} lines)")
+ _log("PROMOTE", f"Previous: {old_hash} ({len(old_prompt)} chars, {len(old_lines)} lines)")
+ _log("PROMOTE", f"Reason: {reason}")
+ _log("PROMOTE", "Worker reads prompts from disk at runtime — no restart needed")
+
+ if commit:
+ import subprocess
+ try:
+ subprocess.run(
+ ["git", "add", str(canonical)],
+ cwd=str(canonical.parent.parent),
+ check=True, capture_output=True,
+ )
+ msg = f"prompt: promote stage{stage} — {reason}"
+ subprocess.run(
+ ["git", "commit", "-m", msg],
+ cwd=str(canonical.parent.parent),
+ check=True, capture_output=True,
+ )
+ _log("PROMOTE", f"Git commit created: {msg}")
+ except subprocess.CalledProcessError as exc:
+ _log("PROMOTE", f"Git commit failed: {exc}", level="WARN")
+
+ return 0
+
+
+# ── CLI ────────────────────────────────────────────────────────────────────
+
+def main() -> int:
+ parser = argparse.ArgumentParser(
+ prog="pipeline.test_harness",
+ description="Offline prompt test harness for Chrysopedia synthesis",
+ )
+ sub = parser.add_subparsers(dest="command")
+
+ # -- run subcommand (default behavior) --
+ run_parser = sub.add_parser("run", help="Run synthesis against a fixture")
+ run_parser.add_argument("--fixture", "-f", type=str, required=True, help="Fixture JSON file")
+ run_parser.add_argument("--prompt", "-p", type=str, default=None, help="Prompt file (default: stage5_synthesis.txt)")
+ run_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
+ run_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
+ run_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
+ run_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
+
+ # -- promote subcommand --
+ promo_parser = sub.add_parser("promote", help="Deploy a winning prompt to production")
+ promo_parser.add_argument("--prompt", "-p", type=str, required=True, help="Path to the winning prompt file")
+ promo_parser.add_argument("--stage", "-s", type=int, default=5, help="Stage number (default: 5)")
+ promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted")
+ promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit")
+
+ args = parser.parse_args()
+
+ # If no subcommand, check for --fixture for backward compat
+ if args.command is None:
+ # Support running without subcommand for backward compat
+ parser.print_help()
+ return 1
+
+ if args.command == "promote":
+ return promote_prompt(args.prompt, args.stage, args.reason, args.commit)
+
+ # -- run command --
+ prompt_path = args.prompt
+ if prompt_path is None:
+ settings = get_settings()
+ prompt_path = str(Path(settings.prompts_path) / "stage5_synthesis.txt")
+
+ overall_start = time.monotonic()
+ try:
+ fixture = load_fixture(args.fixture)
+ except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
+ _log("ERROR", f"Fixture error: {exc}", level="ERROR")
+ return 3
+
+ pages, exit_code = run_synthesis(
+ fixture=fixture,
+ prompt_path=prompt_path,
+ category_filter=args.category,
+ model_override=args.model,
+ modality=args.modality,
+ )
+
+ if not pages and exit_code != 0:
+ return exit_code
+
+ output = {
+ "fixture_source": args.fixture,
+ "prompt_source": prompt_path,
+ "creator_name": fixture.creator_name,
+ "video_id": fixture.video_id,
+ "category_filter": args.category,
+ "pages": pages,
+ "metadata": {
+ "page_count": len(pages),
+ "total_words": sum(
+ sum(len(str(v).split()) for v in p.get("body_sections", {}).values())
+ for p in pages
+ ),
+ "elapsed_seconds": round(time.monotonic() - overall_start, 1),
+ },
+ }
+
+ output_json = json.dumps(output, indent=2, ensure_ascii=False)
+
+ if args.output:
+ Path(args.output).parent.mkdir(parents=True, exist_ok=True)
+ Path(args.output).write_text(output_json, encoding="utf-8")
+ _log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
+ else:
+ print(output_json)
+ _log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
+
+ total_elapsed = time.monotonic() - overall_start
+ _log("DONE", f"Completed in {total_elapsed:.1f}s (exit_code={exit_code})")
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py
index 7f45a90..af2f01f 100644
--- a/backend/routers/pipeline.py
+++ b/backend/routers/pipeline.py
@@ -251,6 +251,23 @@ async def clean_retrigger_pipeline(
logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc)
deleted_counts["qdrant_vectors"] = f"skipped: {exc}"
+ # Clear Redis classification/prior-pages cache to prevent stale data
+ try:
+ import redis as redis_lib
+ settings = get_settings()
+ r = redis_lib.Redis.from_url(settings.redis_url)
+ r.delete(f"chrysopedia:classification:{video_id}")
+ r.delete(f"chrysopedia:prior_pages:{video_id}")
+ deleted_counts["redis_cache"] = "cleared"
+ logger.info("Redis cache cleared for video_id=%s", video_id)
+ except Exception as exc:
+ logger.warning("Redis cache cleanup failed for video_id=%s: %s", video_id, exc)
+ deleted_counts["redis_cache"] = f"skipped: {exc}"
+
+ # Clear durable classification_data column too
+ video.classification_data = None
+ await db.commit()
+
# Now trigger the pipeline
from pipeline.stages import run_pipeline
try:
@@ -270,6 +287,210 @@ async def clean_retrigger_pipeline(
}
+# ── Admin: Re-run Single Stage ──────────────────────────────────────────────
+
+@router.post("/admin/pipeline/rerun-stage/{video_id}/{stage_name}")
+async def rerun_stage(
+ video_id: str,
+ stage_name: str,
+ prompt_override: str | None = None,
+ db: AsyncSession = Depends(get_session),
+):
+ """Re-run a single pipeline stage without running predecessors.
+
+ Designed for fast prompt iteration — especially stage 5 synthesis.
+ Optionally accepts a prompt_override string that temporarily replaces
+ the on-disk prompt template for this run only.
+
+ Valid stage names: stage2_segmentation, stage3_extraction,
+ stage4_classification, stage5_synthesis, stage6_embed_and_index.
+ """
+ from pipeline.stages import _PIPELINE_STAGES, run_single_stage
+
+ # Validate stage name
+ if stage_name not in _PIPELINE_STAGES:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid stage '{stage_name}'. Valid: {_PIPELINE_STAGES}",
+ )
+
+ # Validate video exists
+ stmt = select(SourceVideo).where(SourceVideo.id == video_id)
+ result = await db.execute(stmt)
+ video = result.scalar_one_or_none()
+ if video is None:
+ raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
+
+ # Dispatch single-stage re-run
+ try:
+ run_single_stage.delay(
+ str(video.id),
+ stage_name,
+ trigger="stage_rerun",
+ prompt_override=prompt_override,
+ )
+ logger.info(
+ "Single-stage re-run dispatched: video_id=%s, stage=%s, prompt_override=%s",
+ video_id, stage_name, "yes" if prompt_override else "no",
+ )
+ except Exception as exc:
+ logger.warning(
+ "Failed to dispatch single-stage re-run for video_id=%s: %s", video_id, exc,
+ )
+ raise HTTPException(
+ status_code=503,
+ detail="Stage re-run dispatch failed — Celery/Redis may be unavailable",
+ ) from exc
+
+ return {
+ "status": "stage_rerun_dispatched",
+ "video_id": str(video.id),
+ "stage": stage_name,
+ "prompt_override": bool(prompt_override),
+ }
+
+
+# ── Admin: Chunking Inspector ───────────────────────────────────────────────
+
+@router.get("/admin/pipeline/chunking/{video_id}")
+async def get_chunking_data(
+ video_id: str,
+ db: AsyncSession = Depends(get_session),
+):
+ """Return chunking/grouping data for a video — topic boundaries, classifications,
+ and synthesis group breakdowns.
+
+ Helps diagnose whether bad synthesis output is a prompt problem or a data shape problem.
+ """
+ from config import get_settings
+
+ stmt = select(SourceVideo).where(SourceVideo.id == video_id)
+ result = await db.execute(stmt)
+ video = result.scalar_one_or_none()
+ if video is None:
+ raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
+
+ settings = get_settings()
+
+ # 1. Topic boundaries (stage 2 output): segments grouped by topic_label
+ segments = (
+ await db.execute(
+ select(TranscriptSegment)
+ .where(TranscriptSegment.source_video_id == video_id)
+ .order_by(TranscriptSegment.segment_index)
+ )
+ ).scalars().all()
+
+ topic_boundaries: list[dict] = []
+ current_label = None
+ current_group: list[dict] = []
+
+ for seg in segments:
+ label = seg.topic_label or "unlabeled"
+ if label != current_label:
+ if current_group:
+ topic_boundaries.append({
+ "topic_label": current_label,
+ "segment_count": len(current_group),
+ "start_time": current_group[0]["start_time"],
+ "end_time": current_group[-1]["end_time"],
+ "start_index": current_group[0]["segment_index"],
+ "end_index": current_group[-1]["segment_index"],
+ })
+ current_label = label
+ current_group = []
+
+ current_group.append({
+ "start_time": seg.start_time,
+ "end_time": seg.end_time,
+ "segment_index": seg.segment_index,
+ })
+
+ if current_group:
+ topic_boundaries.append({
+ "topic_label": current_label,
+ "segment_count": len(current_group),
+ "start_time": current_group[0]["start_time"],
+ "end_time": current_group[-1]["end_time"],
+ "start_index": current_group[0]["segment_index"],
+ "end_index": current_group[-1]["segment_index"],
+ })
+
+ # 2. Key moments (stage 3 output)
+ moments = (
+ await db.execute(
+ select(KeyMoment)
+ .where(KeyMoment.source_video_id == video_id)
+ .order_by(KeyMoment.start_time)
+ )
+ ).scalars().all()
+
+ key_moments = [
+ {
+ "id": str(m.id),
+ "title": m.title,
+ "content_type": m.content_type.value,
+ "start_time": m.start_time,
+ "end_time": m.end_time,
+ "plugins": m.plugins or [],
+ "technique_page_id": str(m.technique_page_id) if m.technique_page_id else None,
+ }
+ for m in moments
+ ]
+
+ # 3. Classification data (stage 4 output)
+ classification: list[dict] = []
+ cls_source = "missing"
+
+ if video.classification_data:
+ classification = video.classification_data
+ cls_source = "postgresql"
+ else:
+ try:
+ import redis as redis_lib
+ r = redis_lib.Redis.from_url(settings.redis_url)
+ raw = r.get(f"chrysopedia:classification:{video_id}")
+ if raw:
+ classification = __import__("json").loads(raw)
+ cls_source = "redis"
+ except Exception:
+ pass
+
+ # 4. Synthesis groups (how stage 5 would group moments)
+ cls_by_moment_id = {c["moment_id"]: c for c in classification}
+ synthesis_groups: dict[str, dict] = {}
+
+ for m in moments:
+ cls_info = cls_by_moment_id.get(str(m.id), {})
+ category = cls_info.get("topic_category", "Uncategorized").strip().title()
+ if category not in synthesis_groups:
+ synthesis_groups[category] = {
+ "category": category,
+ "moment_count": 0,
+ "moment_ids": [],
+ "exceeds_chunk_threshold": False,
+ }
+ synthesis_groups[category]["moment_count"] += 1
+ synthesis_groups[category]["moment_ids"].append(str(m.id))
+
+ chunk_size = settings.synthesis_chunk_size
+ for group in synthesis_groups.values():
+ group["exceeds_chunk_threshold"] = group["moment_count"] > chunk_size
+ group["chunks_needed"] = max(1, -(-group["moment_count"] // chunk_size)) # ceil division
+
+ return {
+ "video_id": str(video.id),
+ "total_segments": len(segments),
+ "total_moments": len(moments),
+ "classification_source": cls_source,
+ "synthesis_chunk_size": chunk_size,
+ "topic_boundaries": topic_boundaries,
+ "key_moments": key_moments,
+ "classification": classification,
+ "synthesis_groups": list(synthesis_groups.values()),
+ }
+
+
# ── Admin: Revoke ────────────────────────────────────────────────────────────
@router.post("/admin/pipeline/revoke/{video_id}")
@@ -578,7 +799,215 @@ async def get_token_summary(
)
-# ── Admin: Worker status ─────────────────────────────────────────────────────
+# ── Admin: Stale Pages ──────────────────────────────────────────────────────
+
+@router.get("/admin/pipeline/stale-pages")
+async def get_stale_pages(
+ db: AsyncSession = Depends(get_session),
+):
+ """Detect technique pages synthesized with an older prompt than the current one.
+
+ Compares the SHA-256 hash of the current stage5_synthesis.txt against the
+ prompt hashes stored in TechniquePageVersion.pipeline_metadata.
+ """
+ import hashlib
+ from pathlib import Path as _Path
+ from models import TechniquePage, TechniquePageVersion
+
+ settings = get_settings()
+ prompt_path = _Path(settings.prompts_path) / "stage5_synthesis.txt"
+
+ if not prompt_path.exists():
+ raise HTTPException(status_code=500, detail="stage5_synthesis.txt not found")
+
+ current_hash = hashlib.sha256(
+ prompt_path.read_text(encoding="utf-8").encode()
+ ).hexdigest()[:12]
+
+ # Get all technique pages
+ pages = (await db.execute(select(TechniquePage))).scalars().all()
+ total = len(pages)
+
+ stale_count = 0
+ fresh_count = 0
+ stale_by_creator: dict[str, dict] = {}
+
+ for page in pages:
+ # Get latest version to check prompt hash
+ latest_version = (
+ await db.execute(
+ select(TechniquePageVersion)
+ .where(TechniquePageVersion.technique_page_id == page.id)
+ .order_by(TechniquePageVersion.version_number.desc())
+ .limit(1)
+ )
+ ).scalar_one_or_none()
+
+ page_hash = None
+ if latest_version and latest_version.pipeline_metadata:
+ meta = latest_version.pipeline_metadata
+ page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash"))
+
+ if page_hash == current_hash:
+ fresh_count += 1
+ else:
+ stale_count += 1
+ # Look up creator name
+ creator = (await db.execute(
+ select(Creator.name).where(Creator.id == page.creator_id)
+ )).scalar_one_or_none() or "Unknown"
+
+ if creator not in stale_by_creator:
+ stale_by_creator[creator] = {"creator": creator, "stale_count": 0, "page_slugs": []}
+ stale_by_creator[creator]["stale_count"] += 1
+ stale_by_creator[creator]["page_slugs"].append(page.slug)
+
+ return {
+ "current_prompt_hash": current_hash,
+ "total_pages": total,
+ "stale_pages": stale_count,
+ "fresh_pages": fresh_count,
+ "stale_by_creator": list(stale_by_creator.values()),
+ }
+
+
+# ── Admin: Bulk Re-Synthesize ───────────────────────────────────────────────
+
+@router.post("/admin/pipeline/bulk-resynthesize")
+async def bulk_resynthesize(
+ video_ids: list[str] | None = None,
+ stage: str = "stage5_synthesis",
+ db: AsyncSession = Depends(get_session),
+):
+ """Re-run a single stage on multiple videos without full pipeline reset.
+
+ If video_ids is None, targets all videos with processing_status='complete'.
+ Rate-limited to avoid overwhelming the worker queue.
+ """
+ from pipeline.stages import _PIPELINE_STAGES, run_single_stage
+
+ if stage not in _PIPELINE_STAGES:
+ raise HTTPException(status_code=400, detail=f"Invalid stage: {stage}")
+
+ if video_ids is None:
+ result = await db.execute(
+ select(SourceVideo.id).where(
+ SourceVideo.processing_status == ProcessingStatus.complete
+ )
+ )
+ video_ids = [str(row[0]) for row in result.all()]
+
+ if not video_ids:
+ return {"status": "no_videos", "dispatched": 0, "skipped": 0, "total": 0}
+
+ dispatched = 0
+ skipped = []
+ for vid in video_ids:
+ try:
+ run_single_stage.delay(vid, stage, trigger="stage_rerun")
+ dispatched += 1
+ except Exception as exc:
+ logger.warning("Bulk re-synth dispatch failed for video_id=%s: %s", vid, exc)
+ skipped.append({"video_id": vid, "reason": str(exc)})
+
+ logger.info(
+ "[BULK-RESYNTH] Dispatched %d/%d %s re-runs",
+ dispatched, len(video_ids), stage,
+ )
+
+ return {
+ "status": "dispatched",
+ "stage": stage,
+ "total": len(video_ids),
+ "dispatched": dispatched,
+ "skipped": skipped if skipped else None,
+ }
+
+
+# ── Admin: Prompt Optimization ──────────────────────────────────────────────
+
+@router.post("/admin/pipeline/optimize-prompt/{stage}")
+async def trigger_optimization(
+ stage: int,
+ video_id: str = Query(..., description="Video UUID to use as fixture source"),
+ iterations: int = Query(5, description="Number of optimization iterations"),
+ variants_per_iter: int = Query(2, description="Variants per iteration"),
+):
+ """Trigger an automated prompt optimization run as a background Celery task.
+
+ Exports a fixture from the specified video, then runs the optimization loop
+ with generate-score-select cycles. Progress is tracked in
+ quality/results/progress_stage{N}.json.
+ """
+ from pipeline.quality.scorer import STAGE_CONFIGS
+
+ if stage not in STAGE_CONFIGS:
+ raise HTTPException(status_code=400, detail=f"Unsupported stage {stage}. Valid: {sorted(STAGE_CONFIGS)}")
+
+ # Dispatch as a Celery task
+ from worker import celery_app
+
+ @celery_app.task
+ def _run_optimization(video_id: str, stage: int, iterations: int, variants_per_iter: int) -> dict:
+ """Background optimization task."""
+ import tempfile
+ from pipeline.export_fixture import export_fixture
+ from pipeline.quality.optimizer import OptimizationLoop
+ from pipeline.quality.__main__ import write_results_json
+ from config import get_settings as _get_settings
+ from pipeline.llm_client import LLMClient as _LLMClient
+
+ settings = _get_settings()
+
+ # Export fixture
+ tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="opt_fixture_", delete=False)
+ tmp.close()
+ exit_code = export_fixture(settings.database_url, settings.redis_url, video_id, tmp.name)
+ if exit_code != 0:
+ return {"error": f"Fixture export failed (exit code {exit_code})"}
+
+ client = _LLMClient(settings)
+ loop = OptimizationLoop(
+ client=client,
+ stage=stage,
+ fixture_path=tmp.name,
+ iterations=iterations,
+ variants_per_iter=variants_per_iter,
+ output_dir="backend/pipeline/quality/results/",
+ )
+ result = loop.run()
+
+ json_path = write_results_json(
+ result=result, output_dir="backend/pipeline/quality/results/",
+ stage=stage, iterations=iterations, variants_per_iter=variants_per_iter,
+ fixture_path=tmp.name,
+ )
+ return {
+ "best_score": result.best_score.composite,
+ "results_path": json_path,
+ "iterations": iterations,
+ }
+
+ try:
+ task = _run_optimization.delay(video_id, stage, iterations, variants_per_iter)
+ logger.info(
+ "[OPTIMIZE] Dispatched optimization: stage=%d, video_id=%s, iterations=%d, task_id=%s",
+ stage, video_id, iterations, task.id,
+ )
+ except Exception as exc:
+ raise HTTPException(status_code=503, detail=f"Failed to dispatch optimization: {exc}") from exc
+
+ return {
+ "status": "dispatched",
+ "task_id": task.id,
+ "stage": stage,
+ "video_id": video_id,
+ "iterations": iterations,
+ "variants_per_iter": variants_per_iter,
+ }
+
+
+# ── Admin: Reindex All ─────────────────────────────────────────────────────
@router.post("/admin/pipeline/reindex-all")
async def reindex_all(
diff --git a/frontend/src/App.css b/frontend/src/App.css
index 1cac18d..4d334df 100644
--- a/frontend/src/App.css
+++ b/frontend/src/App.css
@@ -2895,6 +2895,53 @@ a.app-footer__repo:hover {
overflow-y: auto;
}
+.modal-subtitle {
+ color: var(--color-text-secondary);
+ font-size: 0.85rem;
+ margin: 0.25rem 0 1rem;
+}
+
+.modal-field {
+ margin-bottom: 1rem;
+}
+
+.modal-field label {
+ display: block;
+ margin-bottom: 0.35rem;
+ font-size: 0.85rem;
+ font-weight: 600;
+ color: var(--color-text-primary);
+}
+
+.modal-field select,
+.modal-field textarea {
+ width: 100%;
+ background: var(--color-bg);
+ color: var(--color-text-primary);
+ border: 1px solid var(--color-border);
+ border-radius: 6px;
+ padding: 0.5rem;
+ font-family: inherit;
+ font-size: 0.85rem;
+}
+
+.modal-field textarea {
+ resize: vertical;
+ min-height: 80px;
+}
+
+.modal-optional {
+ font-weight: 400;
+ color: var(--color-text-secondary);
+ font-size: 0.8rem;
+}
+
+.modal-actions {
+ display: flex;
+ gap: 0.5rem;
+ margin-top: 1rem;
+}
+
.report-modal__title {
margin: 0 0 0.75rem;
color: var(--color-text-primary);
@@ -3021,6 +3068,16 @@ a.app-footer__repo:hover {
opacity: 0.85;
}
+.btn--warning {
+ background: var(--color-warning, #f0ad4e);
+ color: #1a1a1a;
+ border-color: var(--color-warning, #f0ad4e);
+}
+
+.btn--warning:hover:not(:disabled) {
+ opacity: 0.85;
+}
+
.btn--random {
background: var(--color-bg-input);
color: var(--color-text-primary);
@@ -4142,6 +4199,124 @@ a.app-footer__repo:hover {
flex-wrap: wrap;
}
+/* ── Chunking Inspector ───────────────────────────────────────────────── */
+
+.chunking-inspector {
+ margin-top: 0.75rem;
+}
+
+.chunking-inspector__toggle {
+ background: none;
+ border: none;
+ color: var(--color-text-secondary);
+ cursor: pointer;
+ font-size: 0.85rem;
+ padding: 0.25rem 0;
+}
+
+.chunking-inspector__toggle:hover {
+ color: var(--color-text-primary);
+}
+
+.chunking-inspector__loading,
+.chunking-inspector__error {
+ font-size: 0.8rem;
+ color: var(--color-text-muted);
+ padding: 0.5rem 0;
+}
+
+.chunking-inspector__error {
+ color: var(--color-danger);
+}
+
+.chunking-inspector__body {
+ margin-top: 0.5rem;
+}
+
+.chunking-inspector__summary {
+ display: flex;
+ gap: 1rem;
+ font-size: 0.8rem;
+ color: var(--color-text-secondary);
+ padding: 0.5rem;
+ background: var(--color-bg);
+ border-radius: 6px;
+ margin-bottom: 0.75rem;
+ flex-wrap: wrap;
+}
+
+.chunking-inspector__section {
+ margin-bottom: 0.75rem;
+}
+
+.chunking-inspector__section h4 {
+ font-size: 0.8rem;
+ color: var(--color-text-secondary);
+ margin: 0 0 0.35rem;
+ font-weight: 600;
+}
+
+.chunking-inspector__topics {
+ display: flex;
+ flex-direction: column;
+ gap: 0.25rem;
+}
+
+.chunking-topic {
+ display: flex;
+ justify-content: space-between;
+ padding: 0.3rem 0.5rem;
+ background: var(--color-bg);
+ border-radius: 4px;
+ font-size: 0.8rem;
+}
+
+.chunking-topic__label {
+ color: var(--color-text-primary);
+ font-weight: 500;
+}
+
+.chunking-topic__meta {
+ color: var(--color-text-muted);
+ white-space: nowrap;
+}
+
+.chunking-inspector__groups {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 0.5rem;
+}
+
+.chunking-group {
+ display: flex;
+ flex-direction: column;
+ padding: 0.5rem 0.75rem;
+ background: var(--color-bg);
+ border-radius: 6px;
+ border-left: 3px solid var(--color-accent);
+ font-size: 0.8rem;
+ min-width: 140px;
+}
+
+.chunking-group--split {
+ border-left-color: var(--color-warning, #f0ad4e);
+}
+
+.chunking-group__category {
+ font-weight: 600;
+ color: var(--color-text-primary);
+}
+
+.chunking-group__count {
+ color: var(--color-text-secondary);
+}
+
+.chunking-group__warn {
+ color: var(--color-warning, #f0ad4e);
+ font-size: 0.75rem;
+ margin-top: 0.15rem;
+}
+
/* ── Pipeline Badges ────────────────────────────────────────────────────── */
.pipeline-badge {
diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts
index 071cd4f..0f18f42 100644
--- a/frontend/src/api/public-client.ts
+++ b/frontend/src/api/public-client.ts
@@ -583,6 +583,113 @@ export async function cleanRetriggerPipeline(videoId: string): Promise;
+ classification: Array>;
+ synthesis_groups: ChunkingSynthesisGroup[];
+}
+
+export async function fetchChunkingData(videoId: string): Promise {
+ return request(`${BASE}/admin/pipeline/chunking/${videoId}`);
+}
+
+// ── Single-Stage Re-Run ────────────────────────────────────────────────────
+
+export interface RerunStageResponse {
+ status: string;
+ video_id: string;
+ stage: string;
+ prompt_override: boolean;
+}
+
+export async function rerunStage(
+ videoId: string,
+ stageName: string,
+ promptOverride?: string,
+): Promise {
+ const body: Record = {};
+ if (promptOverride) {
+ body.prompt_override = promptOverride;
+ }
+ return request(
+ `${BASE}/admin/pipeline/rerun-stage/${videoId}/${stageName}`,
+ {
+ method: "POST",
+ body: Object.keys(body).length > 0 ? JSON.stringify(body) : undefined,
+ },
+ );
+}
+
+// ── Stale Pages & Bulk Re-Synthesize ───────────────────────────────────────
+
+export interface StalePageCreator {
+ creator: string;
+ stale_count: number;
+ page_slugs: string[];
+}
+
+export interface StalePagesResponse {
+ current_prompt_hash: string;
+ total_pages: number;
+ stale_pages: number;
+ fresh_pages: number;
+ stale_by_creator: StalePageCreator[];
+}
+
+export async function fetchStalePages(): Promise {
+ return request(`${BASE}/admin/pipeline/stale-pages`);
+}
+
+export interface BulkResynthResponse {
+ status: string;
+ stage: string;
+ total: number;
+ dispatched: number;
+ skipped: Array<{ video_id: string; reason: string }> | null;
+}
+
+export async function bulkResynthesize(
+ videoIds?: string[],
+ stage = "stage5_synthesis",
+): Promise {
+ return request(`${BASE}/admin/pipeline/bulk-resynthesize`, {
+ method: "POST",
+ body: JSON.stringify({ video_ids: videoIds ?? null, stage }),
+ });
+}
+
// ── Debug Mode ──────────────────────────────────────────────────────────────
export interface DebugModeResponse {
diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx
index ce7322c..e6f2712 100644
--- a/frontend/src/pages/AdminPipeline.tsx
+++ b/frontend/src/pages/AdminPipeline.tsx
@@ -16,6 +16,10 @@ import {
triggerPipeline,
revokePipeline,
cleanRetriggerPipeline,
+ rerunStage,
+ fetchChunkingData,
+ fetchStalePages,
+ bulkResynthesize,
fetchCreators,
fetchRecentActivity,
type PipelineVideoItem,
@@ -771,6 +775,88 @@ interface BulkLogEntry {
message: string;
}
+// ── Chunking Inspector ─────────────────────────────────────────────────────
+
+function ChunkingInspector({ videoId }: { videoId: string }) {
+ const [data, setData] = useState> | null>(null);
+ const [loading, setLoading] = useState(true);
+ const [error, setError] = useState(null);
+ const [expanded, setExpanded] = useState(false);
+
+ useEffect(() => {
+ if (!expanded) return;
+ let cancelled = false;
+ setLoading(true);
+ fetchChunkingData(videoId)
+ .then((res) => { if (!cancelled) { setData(res); setError(null); } })
+ .catch((err) => { if (!cancelled) setError(err instanceof Error ? err.message : "Failed"); })
+ .finally(() => { if (!cancelled) setLoading(false); });
+ return () => { cancelled = true; };
+ }, [videoId, expanded]);
+
+ return (
+
+
+
+ {expanded && loading &&
Loading chunking data...
}
+ {expanded && error &&
{error}
}
+
+ {expanded && data && (
+
+
+ {data.total_segments} segments
+ {data.total_moments} moments
+ Classification: {data.classification_source}
+ Chunk size: {data.synthesis_chunk_size}
+
+
+ {/* Topic Boundaries */}
+
+
Topic Boundaries (Stage 2)
+
+ {data.topic_boundaries.map((tb, i) => (
+
+ {tb.topic_label}
+
+ {tb.segment_count} segs · {tb.start_time.toFixed(0)}s-{tb.end_time.toFixed(0)}s
+
+
+ ))}
+
+
+
+ {/* Synthesis Groups */}
+
+
Synthesis Groups (Stage 5 Input)
+
+ {data.synthesis_groups.map((g) => (
+
+ {g.category}
+ {g.moment_count} moments
+ {g.exceeds_chunk_threshold && (
+
+ Will split into {g.chunks_needed} chunks
+
+ )}
+
+ ))}
+
+
+
+ )}
+
+ );
+}
+
+
export default function AdminPipeline() {
useDocumentTitle("Pipeline Management — Chrysopedia");
const [searchParams] = useSearchParams();
@@ -786,6 +872,12 @@ export default function AdminPipeline() {
const [creatorFilter, setCreatorFilter] = useState(null);
const [searchQuery, setSearchQuery] = useState("");
const [selectedIds, setSelectedIds] = useState>(new Set());
+ // Re-run stage modal state
+ const [rerunModalVideo, setRerunModalVideo] = useState(null);
+ const [rerunStageSelect, setRerunStageSelect] = useState("stage5_synthesis");
+ const [rerunPromptOverride, setRerunPromptOverride] = useState("");
+ // Stale pages state
+ const [stalePagesCount, setStalePagesCount] = useState(null);
const [bulkProgress, setBulkProgress] = useState(null);
const [bulkLog, setBulkLog] = useState([]);
const [changedIds, setChangedIds] = useState>(new Set());
@@ -869,6 +961,32 @@ export default function AdminPipeline() {
});
}, []);
+ // Load stale page count
+ useEffect(() => {
+ fetchStalePages()
+ .then((res) => setStalePagesCount(res.stale_pages))
+ .catch(() => { /* silently fail */ });
+ }, []);
+
+ const handleBulkResynth = async () => {
+ if (!confirm("Re-run stage 5 synthesis on all completed videos with the current prompt?")) return;
+ try {
+ const res = await bulkResynthesize();
+ setActionMessage({
+ id: "__bulk__",
+ text: `Bulk re-synthesize dispatched: ${res.dispatched}/${res.total} videos`,
+ ok: true,
+ });
+ setStalePagesCount(null); // will refresh
+ } catch (err) {
+ setActionMessage({
+ id: "__bulk__",
+ text: err instanceof Error ? err.message : "Bulk re-synth failed",
+ ok: false,
+ });
+ }
+ };
+
// Deep-link: auto-expand and scroll to ?video= on first load
useEffect(() => {
if (deepLinked.current || loading || videos.length === 0) return;
@@ -941,6 +1059,34 @@ export default function AdminPipeline() {
}
};
+ const handleRerunStage = async (videoId: string) => {
+ setActionLoading(videoId);
+ setActionMessage(null);
+ try {
+ const res = await rerunStage(
+ videoId,
+ rerunStageSelect,
+ rerunPromptOverride.trim() || undefined,
+ );
+ setActionMessage({
+ id: videoId,
+ text: `Stage re-run dispatched: ${res.stage}${res.prompt_override ? " (with prompt override)" : ""}`,
+ ok: true,
+ });
+ setRerunModalVideo(null);
+ setRerunPromptOverride("");
+ setTimeout(() => void load(), 2000);
+ } catch (err) {
+ setActionMessage({
+ id: videoId,
+ text: err instanceof Error ? err.message : "Stage re-run failed",
+ ok: false,
+ });
+ } finally {
+ setActionLoading(null);
+ }
+ };
+
const toggleExpand = (id: string) => {
setExpandedId((prev) => (prev === id ? null : id));
};
@@ -1033,6 +1179,15 @@ export default function AdminPipeline() {
+ {stalePagesCount !== null && stalePagesCount > 0 && (
+
+ )}
@@ -1251,6 +1414,7 @@ export default function AdminPipeline() {
Updated: {formatDate(video.updated_at)}
+
)}
@@ -1271,6 +1435,59 @@ export default function AdminPipeline() {
>
)}
+
+ {/* Re-run Stage Modal */}
+ {rerunModalVideo && (
+ setRerunModalVideo(null)}>
+
e.stopPropagation()}>
+
Re-run Single Stage
+
+ Re-run a specific pipeline stage without re-running predecessors.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ )}
);
}