feat: add pipeline iteration tooling — offline test harness, stage re-runs, chunking inspector

Drops prompt iteration cycles from 20-30 min to under 5 min by enabling
stage-isolated re-runs and offline prompt testing against exported fixtures.

Phase 1: Offline prompt test harness
- export_fixture.py: export stage 5 inputs from DB to reusable JSON fixtures
- test_harness.py: run synthesis offline with any prompt, no Docker needed
- promote subcommand: deploy winning prompts with backup and optional git commit

Phase 2: Classification data persistence
- Dual-write classification to PostgreSQL + Redis (fixes 24hr TTL data loss)
- Clean retrigger now clears Redis cache keys (fixes stale data bug)
- Alembic migration 011: classification_data JSONB column + stage_rerun enum

Phase 3: Stage-isolated re-run
- run_single_stage Celery task with prerequisite validation and prompt overrides
- _load_prompt supports per-video Redis overrides for testing custom prompts
- POST /admin/pipeline/rerun-stage/{video_id}/{stage_name} endpoint
- Frontend: Re-run Stage modal with stage selector and prompt override textarea

Phase 4: Chunking inspector
- GET /admin/pipeline/chunking/{video_id} returns topic boundaries,
  classifications, and synthesis group breakdowns
- Frontend: collapsible Chunking Inspector panel per video

Phase 5: Prompt deployment & stale data cleanup
- GET /admin/pipeline/stale-pages detects pages from older prompts
- POST /admin/pipeline/bulk-resynthesize re-runs a stage on all completed videos
- Frontend: stale pages indicator badge with one-click bulk re-synth

Phase 6: Automated iteration foundation
- Quality CLI --video-id flag auto-exports fixture from DB
- POST /admin/pipeline/optimize-prompt/{stage} dispatches optimization as Celery task

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
jlightner 2026-04-02 15:47:46 +00:00
parent 29f6e74b4f
commit f0d0b8ac1a
10 changed files with 2123 additions and 26 deletions

View file

@ -0,0 +1,35 @@
"""Add classification_data JSONB column to source_videos and stage_rerun trigger.
Persists stage 4 classification data in PostgreSQL alongside Redis cache,
eliminating the 24-hour TTL data loss risk. Also adds the 'stage_rerun'
trigger value for single-stage re-run support.
Revision ID: 011_classification_cache_and_stage_rerun
Revises: 010_add_pipeline_runs
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
revision = "011_classification_cache_and_stage_rerun"
down_revision = "010_add_pipeline_runs"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add classification_data column to source_videos
op.add_column(
"source_videos",
sa.Column("classification_data", JSONB, nullable=True),
)
# Add 'stage_rerun' to the pipeline_run_trigger enum
# PostgreSQL enums require ALTER TYPE to add values
op.execute("ALTER TYPE pipeline_run_trigger ADD VALUE IF NOT EXISTS 'stage_rerun'")
def downgrade() -> None:
op.drop_column("source_videos", "classification_data")
# Note: PostgreSQL does not support removing values from enums.
# The 'stage_rerun' value will remain but be unused after downgrade.

View file

@ -138,6 +138,7 @@ class SourceVideo(Base):
default=ProcessingStatus.not_started,
server_default="not_started",
)
classification_data: Mapped[list | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)
@ -377,6 +378,7 @@ class PipelineRunTrigger(str, enum.Enum):
clean_reprocess = "clean_reprocess"
auto_ingest = "auto_ingest"
bulk = "bulk"
stage_rerun = "stage_rerun"
class PipelineRun(Base):

View file

@ -0,0 +1,306 @@
"""Export pipeline stage inputs for a video as a reusable JSON fixture.
Connects to the live database, queries KeyMoments and classification data,
and writes a fixture file that the test harness can consume offline.
Usage:
python -m pipeline.export_fixture --video-id <uuid> --output fixtures/video.json
python -m pipeline.export_fixture --video-id <uuid> # prints to stdout
python -m pipeline.export_fixture --list # list available videos
Requires: DATABASE_URL, REDIS_URL environment variables (or .env file).
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter
from pathlib import Path
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
def _log(tag: str, msg: str, level: str = "INFO") -> None:
"""Write structured log line to stderr."""
print(f"[EXPORT] [{level}] {tag}: {msg}", file=sys.stderr)
def _get_sync_session(database_url: str) -> Session:
"""Create a sync SQLAlchemy session from the database URL."""
url = database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
engine = create_engine(url, pool_pre_ping=True)
factory = sessionmaker(bind=engine)
return factory()
def _list_videos(database_url: str) -> int:
"""List all videos with their processing status and moment counts."""
from models import Creator, KeyMoment, SourceVideo
session = _get_sync_session(database_url)
try:
videos = (
session.execute(
select(SourceVideo).order_by(SourceVideo.created_at.desc())
)
.scalars()
.all()
)
if not videos:
_log("LIST", "No videos found in database")
return 0
print(f"\n{'ID':<38s} {'Status':<14s} {'Moments':>7s} {'Creator':<20s} {'Filename'}", file=sys.stderr)
print(f"{''*38} {''*14} {''*7} {''*20} {''*40}", file=sys.stderr)
for video in videos:
moment_count = (
session.execute(
select(KeyMoment.id).where(KeyMoment.source_video_id == video.id)
)
.scalars()
.all()
)
creator = session.execute(
select(Creator).where(Creator.id == video.creator_id)
).scalar_one_or_none()
creator_name = creator.name if creator else "?"
print(
f"{str(video.id):<38s} {video.processing_status.value:<14s} "
f"{len(moment_count):>7d} {creator_name:<20s} {video.filename}",
file=sys.stderr,
)
print(f"\nTotal: {len(videos)} videos\n", file=sys.stderr)
return 0
finally:
session.close()
def export_fixture(
database_url: str,
redis_url: str,
video_id: str,
output_path: str | None = None,
) -> int:
"""Export stage 5 inputs for a video as a JSON fixture.
Returns exit code: 0 = success, 1 = error.
"""
from models import Creator, KeyMoment, SourceVideo
start = time.monotonic()
_log("CONNECT", "Connecting to database...")
session = _get_sync_session(database_url)
try:
# ── Load video ──────────────────────────────────────────────────
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video is None:
_log("ERROR", f"Video not found: {video_id}", level="ERROR")
return 1
creator = session.execute(
select(Creator).where(Creator.id == video.creator_id)
).scalar_one_or_none()
creator_name = creator.name if creator else "Unknown"
_log(
"VIDEO",
f"Found: {video.filename} by {creator_name} "
f"({video.duration_seconds or '?'}s, {video.content_type.value}, "
f"status={video.processing_status.value})",
)
# ── Load key moments ────────────────────────────────────────────
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
_log("ERROR", f"No key moments found for video_id={video_id}", level="ERROR")
_log("HINT", "Pipeline stages 2-3 must complete before export is possible", level="ERROR")
return 1
time_min = min(m.start_time for m in moments)
time_max = max(m.end_time for m in moments)
_log("MOMENTS", f"Loaded {len(moments)} key moments (time range: {time_min:.1f}s - {time_max:.1f}s)")
# ── Load classification data ────────────────────────────────────
classification_data: list[dict] = []
cls_source = "missing"
# Try Redis first
try:
import redis as redis_lib
r = redis_lib.Redis.from_url(redis_url)
key = f"chrysopedia:classification:{video_id}"
raw = r.get(key)
if raw is not None:
classification_data = json.loads(raw)
cls_source = "redis"
ttl = r.ttl(key)
_log("CLASSIFY", f"Source: redis ({len(classification_data)} entries, TTL={ttl}s)")
except Exception as exc:
_log("CLASSIFY", f"Redis unavailable: {exc}", level="WARN")
# Fallback: check SourceVideo.classification_data column (Phase 2 addition)
if not classification_data:
video_cls = getattr(video, "classification_data", None)
if video_cls:
classification_data = video_cls
cls_source = "postgresql"
_log("CLASSIFY", f"Source: postgresql ({len(classification_data)} entries)")
if not classification_data:
_log("CLASSIFY", "No classification data found in Redis or PostgreSQL", level="WARN")
_log("HINT", "Pipeline stage 4 must complete before classification data is available", level="WARN")
cls_source = "missing"
# Build classification lookup by moment_id
cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
# Count moments without classification
unclassified = sum(1 for m in moments if str(m.id) not in cls_by_moment_id)
if unclassified > 0:
_log("CLASSIFY", f"WARNING: {unclassified}/{len(moments)} moments have no classification data", level="WARN")
# ── Build fixture ───────────────────────────────────────────────
fixture_moments = []
category_counts: Counter[str] = Counter()
for m in moments:
cls_info = cls_by_moment_id.get(str(m.id), {})
topic_category = cls_info.get("topic_category", "Uncategorized")
topic_tags = cls_info.get("topic_tags", [])
category_counts[topic_category] += 1
fixture_moments.append({
"moment_id": str(m.id),
"title": m.title,
"summary": m.summary,
"content_type": m.content_type.value,
"start_time": m.start_time,
"end_time": m.end_time,
"plugins": m.plugins or [],
"raw_transcript": m.raw_transcript or "",
# Classification data (stage 4 output)
"classification": {
"topic_category": topic_category,
"topic_tags": topic_tags,
},
# Compatibility fields for existing quality/scorer format
"transcript_excerpt": (m.raw_transcript or "")[:500],
"topic_tags": topic_tags,
"topic_category": topic_category,
})
fixture = {
"video_id": str(video.id),
"creator_name": creator_name,
"content_type": video.content_type.value,
"filename": video.filename,
"duration_seconds": video.duration_seconds,
"classification_source": cls_source,
"export_timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"moments": fixture_moments,
}
fixture_json = json.dumps(fixture, indent=2, ensure_ascii=False)
fixture_size_kb = len(fixture_json.encode("utf-8")) / 1024
# ── Output ──────────────────────────────────────────────────────
if output_path:
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
Path(output_path).write_text(fixture_json, encoding="utf-8")
_log(
"OUTPUT",
f"Wrote fixture: {output_path} ({fixture_size_kb:.1f} KB, "
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
)
else:
# Print fixture JSON to stdout
print(fixture_json)
_log(
"OUTPUT",
f"Printed fixture to stdout ({fixture_size_kb:.1f} KB, "
f"{len(fixture_moments)} moments, {len(category_counts)} categories)",
)
# Category breakdown
for cat, count in category_counts.most_common():
_log("CATEGORY", f" {cat}: {count} moments")
elapsed = time.monotonic() - start
_log("DONE", f"Export completed in {elapsed:.1f}s")
return 0
except Exception as exc:
_log("ERROR", f"Export failed: {exc}", level="ERROR")
import traceback
traceback.print_exc(file=sys.stderr)
return 1
finally:
session.close()
def main() -> int:
parser = argparse.ArgumentParser(
prog="pipeline.export_fixture",
description="Export pipeline stage inputs for a video as a reusable JSON fixture",
)
parser.add_argument(
"--video-id",
type=str,
help="UUID of the video to export",
)
parser.add_argument(
"--output", "-o",
type=str,
default=None,
help="Output file path (default: print to stdout)",
)
parser.add_argument(
"--list",
action="store_true",
default=False,
help="List all videos with status and moment counts",
)
args = parser.parse_args()
# Load settings
from config import get_settings
settings = get_settings()
if args.list:
return _list_videos(settings.database_url)
if not args.video_id:
parser.error("--video-id is required (or use --list to see available videos)")
return export_fixture(
database_url=settings.database_url,
redis_url=settings.redis_url,
video_id=args.video_id,
output_path=args.output,
)
if __name__ == "__main__":
sys.exit(main())

View file

@ -236,12 +236,17 @@ def main() -> int:
default=2,
help="Variants generated per iteration (default: 2)",
)
opt_parser.add_argument(
opt_source = opt_parser.add_mutually_exclusive_group(required=True)
opt_source.add_argument(
"--file",
type=str,
required=True,
help="Path to moments JSON fixture file",
)
opt_source.add_argument(
"--video-id",
type=str,
help="Video UUID — exports fixture from DB automatically (requires DATABASE_URL, REDIS_URL)",
)
opt_parser.add_argument(
"--output-dir",
type=str,
@ -360,10 +365,34 @@ def _run_optimize(args: argparse.Namespace) -> int:
)
return 1
# Validate fixture file exists
fixture = Path(args.file)
# Resolve fixture: either from --file or auto-export from --video-id
fixture_path: str
if args.file:
fixture_path = args.file
else:
# Auto-export from database
print(f"\n[OPTIMIZE] Exporting fixture from video_id={args.video_id}...", file=sys.stderr)
import tempfile
from pipeline.export_fixture import export_fixture
settings = get_settings()
tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="optimize_fixture_", delete=False)
tmp.close()
exit_code = export_fixture(
database_url=settings.database_url,
redis_url=settings.redis_url,
video_id=args.video_id,
output_path=tmp.name,
)
if exit_code != 0:
print(f"Error: fixture export failed (exit code {exit_code})", file=sys.stderr)
return 1
fixture_path = tmp.name
print(f"[OPTIMIZE] Fixture exported to: {fixture_path}", file=sys.stderr)
fixture = Path(fixture_path)
if not fixture.exists():
print(f"Error: fixture file not found: {args.file}", file=sys.stderr)
print(f"Error: fixture file not found: {fixture_path}", file=sys.stderr)
return 1
# Ensure output dir
@ -375,7 +404,7 @@ def _run_optimize(args: argparse.Namespace) -> int:
loop = OptimizationLoop(
client=client,
stage=args.stage,
fixture_path=args.file,
fixture_path=fixture_path,
iterations=args.iterations,
variants_per_iter=args.variants_per_iter,
output_dir=args.output_dir,
@ -407,7 +436,7 @@ def _run_optimize(args: argparse.Namespace) -> int:
stage=args.stage,
iterations=args.iterations,
variants_per_iter=args.variants_per_iter,
fixture_path=args.file,
fixture_path=fixture_path,
)
print(f" Results written to: {json_path}")
except OSError as exc:

View file

@ -241,11 +241,34 @@ def _get_sync_session() -> Session:
return _SessionLocal()
def _load_prompt(template_name: str) -> str:
def _load_prompt(template_name: str, video_id: str | None = None) -> str:
"""Read a prompt template from the prompts directory.
Raises FileNotFoundError if the template does not exist.
If ``video_id`` is provided, checks Redis for a per-video prompt override
(key: ``chrysopedia:prompt_override:{video_id}:{template_name}``) before
falling back to the on-disk template. Overrides are set by
``run_single_stage`` for single-stage re-runs with custom prompts.
Raises FileNotFoundError if no override exists and the template is missing.
"""
# Check for per-video prompt override in Redis
if video_id:
try:
import redis
settings = get_settings()
r = redis.Redis.from_url(settings.redis_url)
override_key = f"chrysopedia:prompt_override:{video_id}:{template_name}"
override = r.get(override_key)
if override:
prompt_text = override.decode("utf-8")
logger.info(
"[PROMPT] Using override from Redis: video_id=%s, template=%s (%d chars)",
video_id, template_name, len(prompt_text),
)
return prompt_text
except Exception as exc:
logger.warning("[PROMPT] Redis override check failed: %s", exc)
settings = get_settings()
path = Path(settings.prompts_path) / template_name
if not path.exists():
@ -398,7 +421,7 @@ def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
transcript_text = "\n".join(transcript_lines)
# Load prompt and call LLM
system_prompt = _load_prompt("stage2_segmentation.txt")
system_prompt = _load_prompt("stage2_segmentation.txt", video_id=video_id)
user_prompt = f"<transcript>\n{transcript_text}\n</transcript>"
llm = _get_llm_client()
@ -477,7 +500,7 @@ def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str:
label = seg.topic_label or "unlabeled"
groups[label].append(seg)
system_prompt = _load_prompt("stage3_extraction.txt")
system_prompt = _load_prompt("stage3_extraction.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(3)
hard_limit = get_settings().llm_max_tokens_hard_limit
@ -650,7 +673,7 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str
tags_data = _load_canonical_tags()
taxonomy_text = _format_taxonomy_for_prompt(tags_data)
system_prompt = _load_prompt("stage4_classification.txt")
system_prompt = _load_prompt("stage4_classification.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(4)
hard_limit = get_settings().llm_max_tokens_hard_limit
@ -716,26 +739,108 @@ def stage4_classification(self, video_id: str, run_id: str | None = None) -> str
def _store_classification_data(video_id: str, data: list[dict]) -> None:
"""Store classification data in Redis for cross-stage communication."""
"""Store classification data in Redis (cache) and PostgreSQL (durable).
Dual-write ensures classification data survives Redis TTL expiry or flush.
Redis serves as the fast-path cache; PostgreSQL is the durable fallback.
"""
import redis
settings = get_settings()
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
r.set(key, json.dumps(data), ex=86400) # Expire after 24 hours
# Redis: fast cache with 7-day TTL
try:
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
r.set(key, json.dumps(data), ex=604800) # 7 days
logger.info(
"[CLASSIFY-STORE] Redis write: video_id=%s, %d entries, ttl=7d",
video_id, len(data),
)
except Exception as exc:
logger.warning(
"[CLASSIFY-STORE] Redis write failed for video_id=%s: %s", video_id, exc,
)
# PostgreSQL: durable storage on SourceVideo.classification_data
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video:
video.classification_data = data
session.commit()
logger.info(
"[CLASSIFY-STORE] PostgreSQL write: video_id=%s, %d entries",
video_id, len(data),
)
else:
logger.warning(
"[CLASSIFY-STORE] Video not found for PostgreSQL write: %s", video_id,
)
except Exception as exc:
session.rollback()
logger.warning(
"[CLASSIFY-STORE] PostgreSQL write failed for video_id=%s: %s", video_id, exc,
)
finally:
session.close()
def _load_classification_data(video_id: str) -> list[dict]:
"""Load classification data from Redis."""
"""Load classification data from Redis (fast path) or PostgreSQL (fallback).
Tries Redis first. If the key has expired or Redis is unavailable, falls
back to the durable SourceVideo.classification_data column.
"""
import redis
settings = get_settings()
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
raw = r.get(key)
if raw is None:
return []
return json.loads(raw)
# Try Redis first (fast path)
try:
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
raw = r.get(key)
if raw is not None:
data = json.loads(raw)
logger.info(
"[CLASSIFY-LOAD] Source: redis, video_id=%s, %d entries",
video_id, len(data),
)
return data
except Exception as exc:
logger.warning(
"[CLASSIFY-LOAD] Redis unavailable for video_id=%s: %s", video_id, exc,
)
# Fallback to PostgreSQL
logger.info("[CLASSIFY-LOAD] Redis miss, falling back to PostgreSQL for video_id=%s", video_id)
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video and video.classification_data:
data = video.classification_data
logger.info(
"[CLASSIFY-LOAD] Source: postgresql, video_id=%s, %d entries",
video_id, len(data),
)
return data
except Exception as exc:
logger.warning(
"[CLASSIFY-LOAD] PostgreSQL fallback failed for video_id=%s: %s", video_id, exc,
)
finally:
session.close()
logger.warning(
"[CLASSIFY-LOAD] No classification data found in Redis or PostgreSQL for video_id=%s",
video_id,
)
return []
@ -969,7 +1074,7 @@ def _merge_pages_by_slug(
indent=2, ensure_ascii=False,
)
merge_system_prompt = _load_prompt("stage5_merge.txt")
merge_system_prompt = _load_prompt("stage5_merge.txt", video_id=video_id)
merge_user_prompt = f"<creator>{creator_name}</creator>\n<pages>\n{pages_json}\n</pages>"
max_tokens = estimate_max_tokens(
@ -1080,7 +1185,7 @@ def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
category = cls_info.get("topic_category", "Uncategorized").strip().title()
groups[category].append((moment, cls_info))
system_prompt = _load_prompt("stage5_synthesis.txt")
system_prompt = _load_prompt("stage5_synthesis.txt", video_id=video_id)
llm = _get_llm_client()
model_override, modality = _get_stage_config(5)
hard_limit = settings.llm_max_tokens_hard_limit
@ -1784,3 +1889,214 @@ def run_pipeline(video_id: str, trigger: str = "manual") -> str:
raise
return video_id
# ── Single-Stage Re-Run ─────────────────────────────────────────────────────
@celery_app.task
def run_single_stage(
video_id: str,
stage_name: str,
trigger: str = "stage_rerun",
prompt_override: str | None = None,
) -> str:
"""Re-run a single pipeline stage without running predecessors.
Designed for fast prompt iteration especially stage 5 synthesis.
Bypasses the processing_status==complete guard, creates a proper
PipelineRun record, and restores status on completion.
If ``prompt_override`` is provided, it is stored in Redis as a
per-video override that ``_load_prompt`` reads before falling back
to the on-disk template. The override is cleaned up after the stage runs.
Returns the video_id.
"""
import redis as redis_lib
logger.info(
"[RERUN] Starting single-stage re-run: video_id=%s, stage=%s, trigger=%s",
video_id, stage_name, trigger,
)
# Validate stage name
if stage_name not in _PIPELINE_STAGES:
raise ValueError(
f"[RERUN] Invalid stage '{stage_name}'. "
f"Valid stages: {_PIPELINE_STAGES}"
)
# Validate video exists
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video is None:
raise ValueError(f"[RERUN] Video not found: {video_id}")
original_status = video.processing_status
finally:
session.close()
# Validate prerequisites for the requested stage
prereq_ok, prereq_msg = _check_stage_prerequisites(video_id, stage_name)
if not prereq_ok:
logger.error("[RERUN] Prerequisite check failed: %s", prereq_msg)
raise ValueError(f"[RERUN] Prerequisites not met: {prereq_msg}")
logger.info("[RERUN] Prerequisite check passed: %s", prereq_msg)
# Store prompt override in Redis if provided
override_key = None
if prompt_override:
settings = get_settings()
try:
r = redis_lib.Redis.from_url(settings.redis_url)
# Map stage name to its prompt template
stage_prompt_map = {
"stage2_segmentation": "stage2_segmentation.txt",
"stage3_extraction": "stage3_extraction.txt",
"stage4_classification": "stage4_classification.txt",
"stage5_synthesis": "stage5_synthesis.txt",
}
template = stage_prompt_map.get(stage_name)
if template:
override_key = f"chrysopedia:prompt_override:{video_id}:{template}"
r.set(override_key, prompt_override, ex=3600) # 1-hour TTL
logger.info(
"[RERUN] Prompt override stored: key=%s (%d chars, first 100: %s)",
override_key, len(prompt_override), prompt_override[:100],
)
except Exception as exc:
logger.warning("[RERUN] Failed to store prompt override: %s", exc)
# Snapshot prior pages (needed for stage 5 page matching)
if stage_name in ("stage5_synthesis",):
_snapshot_prior_pages(video_id)
# Create pipeline run record
run_id = _create_run(video_id, trigger)
logger.info("[RERUN] Created run_id=%s", run_id)
# Temporarily set status to processing
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.processing
session.commit()
finally:
session.close()
# Run the single stage
start = time.monotonic()
try:
task_func = _STAGE_TASKS[stage_name]
task_func(video_id, run_id=run_id)
elapsed = time.monotonic() - start
logger.info(
"[RERUN] Stage %s completed: %.1fs, video_id=%s",
stage_name, elapsed, video_id,
)
_finish_run(run_id, "complete")
# Restore status to complete
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.complete
session.commit()
logger.info("[RERUN] Status restored to complete")
finally:
session.close()
except Exception as exc:
elapsed = time.monotonic() - start
logger.error(
"[RERUN] Stage %s FAILED after %.1fs: %s",
stage_name, elapsed, exc,
)
_set_error_status(video_id, stage_name, exc)
_finish_run(run_id, "error", error_stage=stage_name)
raise
finally:
# Clean up prompt override from Redis
if override_key:
try:
settings = get_settings()
r = redis_lib.Redis.from_url(settings.redis_url)
r.delete(override_key)
logger.info("[RERUN] Prompt override cleaned up: %s", override_key)
except Exception as exc:
logger.warning("[RERUN] Failed to clean up prompt override: %s", exc)
return video_id
def _check_stage_prerequisites(video_id: str, stage_name: str) -> tuple[bool, str]:
"""Validate that prerequisite data exists for a stage re-run.
Returns (ok, message) where message describes what was found or missing.
"""
session = _get_sync_session()
try:
if stage_name == "stage2_segmentation":
# Needs transcript segments
count = session.execute(
select(func.count(TranscriptSegment.id))
.where(TranscriptSegment.source_video_id == video_id)
).scalar() or 0
if count == 0:
return False, "No transcript segments found"
return True, f"transcript_segments={count}"
if stage_name == "stage3_extraction":
# Needs transcript segments with topic_labels
count = session.execute(
select(func.count(TranscriptSegment.id))
.where(
TranscriptSegment.source_video_id == video_id,
TranscriptSegment.topic_label.isnot(None),
)
).scalar() or 0
if count == 0:
return False, "No labeled transcript segments (stage 2 must complete first)"
return True, f"labeled_segments={count}"
if stage_name == "stage4_classification":
# Needs key moments
count = session.execute(
select(func.count(KeyMoment.id))
.where(KeyMoment.source_video_id == video_id)
).scalar() or 0
if count == 0:
return False, "No key moments found (stage 3 must complete first)"
return True, f"key_moments={count}"
if stage_name == "stage5_synthesis":
# Needs key moments + classification data
km_count = session.execute(
select(func.count(KeyMoment.id))
.where(KeyMoment.source_video_id == video_id)
).scalar() or 0
if km_count == 0:
return False, "No key moments found (stages 2-3 must complete first)"
cls_data = _load_classification_data(video_id)
cls_source = "redis+pg"
if not cls_data:
return False, f"No classification data found (stage 4 must complete first), key_moments={km_count}"
return True, f"key_moments={km_count}, classification_entries={len(cls_data)}"
if stage_name == "stage6_embed_and_index":
return True, "stage 6 is non-blocking and always runs"
return False, f"Unknown stage: {stage_name}"
finally:
session.close()

View file

@ -0,0 +1,481 @@
"""Offline prompt test harness for Chrysopedia synthesis.
Loads a fixture JSON (exported by export_fixture.py) and a prompt file,
calls the LLM, and outputs the synthesized result. No Docker, no database,
no Redis, no Celery just prompt + fixture + LLM endpoint.
Usage:
python -m pipeline.test_harness \\
--fixture fixtures/real_video_xyz.json \\
--prompt prompts/stage5_synthesis.txt \\
--output /tmp/result.json
# Run all categories in a fixture:
python -m pipeline.test_harness --fixture fixtures/video.json
# Run a specific category only:
python -m pipeline.test_harness --fixture fixtures/video.json --category "Sound Design"
Exit codes: 0=success, 1=LLM error, 2=parse error, 3=fixture error
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import NamedTuple
from pydantic import ValidationError
from config import get_settings
from pipeline.llm_client import LLMClient, estimate_max_tokens
from pipeline.schemas import SynthesisResult
# ── Lightweight stand-in for KeyMoment ORM model ───────────────────────────
class _MockContentType:
"""Mimics KeyMomentContentType enum with a .value property."""
def __init__(self, value: str) -> None:
self.value = value
class MockKeyMoment(NamedTuple):
"""Lightweight stand-in for the ORM KeyMoment.
Has the same attributes that _build_moments_text() accesses:
title, summary, content_type, start_time, end_time, plugins, raw_transcript.
"""
title: str
summary: str
content_type: object # _MockContentType
start_time: float
end_time: float
plugins: list[str]
raw_transcript: str
def _log(tag: str, msg: str, level: str = "INFO") -> None:
"""Write structured log line to stderr."""
print(f"[HARNESS] [{level}] {tag}: {msg}", file=sys.stderr)
# ── Moment text builder (mirrors stages.py _build_moments_text) ────────────
def build_moments_text(
moment_group: list[tuple[MockKeyMoment, dict]],
category: str,
) -> tuple[str, set[str]]:
"""Build the moments prompt text — matches _build_moments_text in stages.py."""
moments_lines = []
all_tags: set[str] = set()
for i, (m, cls_info) in enumerate(moment_group):
tags = cls_info.get("topic_tags", [])
all_tags.update(tags)
moments_lines.append(
f"[{i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
return "\n\n".join(moments_lines), all_tags
# ── Fixture loading ────────────────────────────────────────────────────────
@dataclass
class FixtureData:
"""Parsed fixture with moments grouped by category."""
creator_name: str
video_id: str
content_type: str
filename: str
# Groups: category -> list of (MockKeyMoment, cls_info_dict)
groups: dict[str, list[tuple[MockKeyMoment, dict]]]
total_moments: int
def load_fixture(path: str) -> FixtureData:
"""Load and parse a fixture JSON file into grouped moments."""
fixture_path = Path(path)
if not fixture_path.exists():
raise FileNotFoundError(f"Fixture not found: {path}")
raw = fixture_path.read_text(encoding="utf-8")
size_kb = len(raw.encode("utf-8")) / 1024
data = json.loads(raw)
moments_raw = data.get("moments", [])
if not moments_raw:
raise ValueError(f"Fixture has no moments: {path}")
_log("FIXTURE", f"Loading: {path} ({size_kb:.1f} KB, {len(moments_raw)} moments)")
# Build MockKeyMoment objects and group by category
groups: dict[str, list[tuple[MockKeyMoment, dict]]] = defaultdict(list)
for m in moments_raw:
cls = m.get("classification", {})
category = cls.get("topic_category", m.get("topic_category", "Uncategorized"))
tags = cls.get("topic_tags", m.get("topic_tags", []))
mock = MockKeyMoment(
title=m.get("title", m.get("summary", "")[:80]),
summary=m.get("summary", ""),
content_type=_MockContentType(m.get("content_type", "technique")),
start_time=m.get("start_time", 0.0),
end_time=m.get("end_time", 0.0),
plugins=m.get("plugins", []),
raw_transcript=m.get("raw_transcript", m.get("transcript_excerpt", "")),
)
cls_info = {"topic_category": category, "topic_tags": tags}
groups[category].append((mock, cls_info))
# Log breakdown
cat_counts = {cat: len(moms) for cat, moms in groups.items()}
counts = list(cat_counts.values())
_log(
"FIXTURE",
f"Breakdown: {len(groups)} categories, "
f"moments per category: min={min(counts)}, max={max(counts)}, "
f"avg={sum(counts)/len(counts):.1f}",
)
for cat, count in sorted(cat_counts.items(), key=lambda x: -x[1]):
_log("FIXTURE", f" {cat}: {count} moments")
return FixtureData(
creator_name=data.get("creator_name", "Unknown"),
video_id=data.get("video_id", "unknown"),
content_type=data.get("content_type", "tutorial"),
filename=data.get("filename", "unknown"),
groups=dict(groups),
total_moments=len(moments_raw),
)
# ── Synthesis runner ───────────────────────────────────────────────────────
def run_synthesis(
fixture: FixtureData,
prompt_path: str,
category_filter: str | None = None,
model_override: str | None = None,
modality: str | None = None,
) -> tuple[list[dict], int]:
"""Run synthesis on fixture data, returns (pages, exit_code).
Returns all synthesized pages as dicts plus an exit code.
"""
# Load prompt
prompt_file = Path(prompt_path)
if not prompt_file.exists():
_log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
return [], 3
system_prompt = prompt_file.read_text(encoding="utf-8")
_log("PROMPT", f"Loading: {prompt_path} ({len(system_prompt)} chars)")
# Setup LLM
settings = get_settings()
llm = LLMClient(settings)
stage_model = model_override or settings.llm_stage5_model or settings.llm_model
stage_modality = modality or settings.llm_stage5_modality or "thinking"
hard_limit = settings.llm_max_tokens_hard_limit
_log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
# Filter categories if requested
categories = fixture.groups
if category_filter:
if category_filter not in categories:
_log("ERROR", f"Category '{category_filter}' not found. Available: {list(categories.keys())}", level="ERROR")
return [], 3
categories = {category_filter: categories[category_filter]}
all_pages: list[dict] = []
total_prompt_tokens = 0
total_completion_tokens = 0
total_duration_ms = 0
exit_code = 0
for cat_idx, (category, moment_group) in enumerate(categories.items(), 1):
_log("SYNTH", f"Category {cat_idx}/{len(categories)}: '{category}' ({len(moment_group)} moments)")
# Build user prompt (same format as stages.py _synthesize_chunk)
moments_text, all_tags = build_moments_text(moment_group, category)
user_prompt = f"<creator>{fixture.creator_name}</creator>\n<moments>\n{moments_text}\n</moments>"
estimated_tokens = estimate_max_tokens(
system_prompt, user_prompt,
stage="stage5_synthesis",
hard_limit=hard_limit,
)
_log(
"SYNTH",
f" Building prompt: {len(moment_group)} moments, "
f"max_tokens={estimated_tokens}, tags={sorted(all_tags)[:5]}{'...' if len(all_tags) > 5 else ''}",
)
# Call LLM
call_start = time.monotonic()
_log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
try:
raw = llm.complete(
system_prompt,
user_prompt,
response_model=SynthesisResult,
modality=stage_modality,
model_override=stage_model,
max_tokens=estimated_tokens,
)
except Exception as exc:
_log("ERROR", f" LLM call failed: {exc}", level="ERROR")
exit_code = 1
continue
call_duration_ms = int((time.monotonic() - call_start) * 1000)
prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
completion_tokens = getattr(raw, "completion_tokens", None) or 0
finish_reason = getattr(raw, "finish_reason", "unknown")
total_prompt_tokens += prompt_tokens
total_completion_tokens += completion_tokens
total_duration_ms += call_duration_ms
_log(
"LLM",
f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
f"{call_duration_ms}ms, finish_reason={finish_reason}",
)
if finish_reason == "length":
_log(
"WARN",
" finish_reason=length — output likely truncated! "
"Consider reducing fixture size or increasing max_tokens.",
level="WARN",
)
# Parse response
try:
result = SynthesisResult.model_validate_json(str(raw))
except (ValidationError, json.JSONDecodeError) as exc:
_log("ERROR", f" Parse failed: {exc}", level="ERROR")
_log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
exit_code = 2
continue
# Log per-page summary
_log("SYNTH", f" Parsed: {len(result.pages)} pages synthesized")
total_words = 0
for page in result.pages:
sections = page.body_sections or {}
word_count = sum(len(str(v).split()) for v in sections.values())
total_words += word_count
_log(
"PAGE",
f" '{page.title}' ({page.slug}): "
f"{len(sections)} sections, {word_count} words, "
f"{len(page.moment_indices)} moments linked, "
f"quality={page.source_quality}",
)
all_pages.append(page.model_dump())
# Summary
_log("SUMMARY", f"Total: {len(all_pages)} pages across {len(categories)} categories")
_log("SUMMARY", f"Tokens: {total_prompt_tokens} prompt + {total_completion_tokens} completion = {total_prompt_tokens + total_completion_tokens} total")
_log("SUMMARY", f"Duration: {total_duration_ms}ms ({total_duration_ms / 1000:.1f}s)")
return all_pages, exit_code
# ── Promote: deploy a prompt to production ─────────────────────────────────
_STAGE_PROMPT_MAP = {
2: "stage2_segmentation.txt",
3: "stage3_extraction.txt",
4: "stage4_classification.txt",
5: "stage5_synthesis.txt",
}
def promote_prompt(prompt_path: str, stage: int, reason: str, commit: bool = False) -> int:
"""Copy a winning prompt to the canonical path and create a backup.
The worker reads prompts from disk at runtime no restart needed.
"""
import hashlib
import shutil
if stage not in _STAGE_PROMPT_MAP:
_log("ERROR", f"Invalid stage {stage}. Valid: {sorted(_STAGE_PROMPT_MAP)}", level="ERROR")
return 1
settings = get_settings()
template_name = _STAGE_PROMPT_MAP[stage]
canonical = Path(settings.prompts_path) / template_name
source = Path(prompt_path)
if not source.exists():
_log("ERROR", f"Source prompt not found: {prompt_path}", level="ERROR")
return 1
new_prompt = source.read_text(encoding="utf-8")
new_hash = hashlib.sha256(new_prompt.encode()).hexdigest()[:12]
# Backup current prompt
old_prompt = ""
old_hash = "none"
if canonical.exists():
old_prompt = canonical.read_text(encoding="utf-8")
old_hash = hashlib.sha256(old_prompt.encode()).hexdigest()[:12]
if old_prompt.strip() == new_prompt.strip():
_log("PROMOTE", "No change — new prompt is identical to current prompt")
return 0
archive_dir = Path(settings.prompts_path) / "archive"
archive_dir.mkdir(parents=True, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
backup = archive_dir / f"{template_name.replace('.txt', '')}_{ts}.txt"
shutil.copy2(canonical, backup)
_log("PROMOTE", f"Backed up current prompt: {old_hash} -> {backup}")
# Write new prompt
canonical.write_text(new_prompt, encoding="utf-8")
old_lines = old_prompt.strip().splitlines()
new_lines = new_prompt.strip().splitlines()
_log("PROMOTE", f"Installed new prompt: {new_hash} ({len(new_prompt)} chars, {len(new_lines)} lines)")
_log("PROMOTE", f"Previous: {old_hash} ({len(old_prompt)} chars, {len(old_lines)} lines)")
_log("PROMOTE", f"Reason: {reason}")
_log("PROMOTE", "Worker reads prompts from disk at runtime — no restart needed")
if commit:
import subprocess
try:
subprocess.run(
["git", "add", str(canonical)],
cwd=str(canonical.parent.parent),
check=True, capture_output=True,
)
msg = f"prompt: promote stage{stage}{reason}"
subprocess.run(
["git", "commit", "-m", msg],
cwd=str(canonical.parent.parent),
check=True, capture_output=True,
)
_log("PROMOTE", f"Git commit created: {msg}")
except subprocess.CalledProcessError as exc:
_log("PROMOTE", f"Git commit failed: {exc}", level="WARN")
return 0
# ── CLI ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(
prog="pipeline.test_harness",
description="Offline prompt test harness for Chrysopedia synthesis",
)
sub = parser.add_subparsers(dest="command")
# -- run subcommand (default behavior) --
run_parser = sub.add_parser("run", help="Run synthesis against a fixture")
run_parser.add_argument("--fixture", "-f", type=str, required=True, help="Fixture JSON file")
run_parser.add_argument("--prompt", "-p", type=str, default=None, help="Prompt file (default: stage5_synthesis.txt)")
run_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
run_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
run_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
run_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
# -- promote subcommand --
promo_parser = sub.add_parser("promote", help="Deploy a winning prompt to production")
promo_parser.add_argument("--prompt", "-p", type=str, required=True, help="Path to the winning prompt file")
promo_parser.add_argument("--stage", "-s", type=int, default=5, help="Stage number (default: 5)")
promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted")
promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit")
args = parser.parse_args()
# If no subcommand, check for --fixture for backward compat
if args.command is None:
# Support running without subcommand for backward compat
parser.print_help()
return 1
if args.command == "promote":
return promote_prompt(args.prompt, args.stage, args.reason, args.commit)
# -- run command --
prompt_path = args.prompt
if prompt_path is None:
settings = get_settings()
prompt_path = str(Path(settings.prompts_path) / "stage5_synthesis.txt")
overall_start = time.monotonic()
try:
fixture = load_fixture(args.fixture)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"Fixture error: {exc}", level="ERROR")
return 3
pages, exit_code = run_synthesis(
fixture=fixture,
prompt_path=prompt_path,
category_filter=args.category,
model_override=args.model,
modality=args.modality,
)
if not pages and exit_code != 0:
return exit_code
output = {
"fixture_source": args.fixture,
"prompt_source": prompt_path,
"creator_name": fixture.creator_name,
"video_id": fixture.video_id,
"category_filter": args.category,
"pages": pages,
"metadata": {
"page_count": len(pages),
"total_words": sum(
sum(len(str(v).split()) for v in p.get("body_sections", {}).values())
for p in pages
),
"elapsed_seconds": round(time.monotonic() - overall_start, 1),
},
}
output_json = json.dumps(output, indent=2, ensure_ascii=False)
if args.output:
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
Path(args.output).write_text(output_json, encoding="utf-8")
_log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
else:
print(output_json)
_log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
total_elapsed = time.monotonic() - overall_start
_log("DONE", f"Completed in {total_elapsed:.1f}s (exit_code={exit_code})")
return exit_code
if __name__ == "__main__":
sys.exit(main())

View file

@ -251,6 +251,23 @@ async def clean_retrigger_pipeline(
logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc)
deleted_counts["qdrant_vectors"] = f"skipped: {exc}"
# Clear Redis classification/prior-pages cache to prevent stale data
try:
import redis as redis_lib
settings = get_settings()
r = redis_lib.Redis.from_url(settings.redis_url)
r.delete(f"chrysopedia:classification:{video_id}")
r.delete(f"chrysopedia:prior_pages:{video_id}")
deleted_counts["redis_cache"] = "cleared"
logger.info("Redis cache cleared for video_id=%s", video_id)
except Exception as exc:
logger.warning("Redis cache cleanup failed for video_id=%s: %s", video_id, exc)
deleted_counts["redis_cache"] = f"skipped: {exc}"
# Clear durable classification_data column too
video.classification_data = None
await db.commit()
# Now trigger the pipeline
from pipeline.stages import run_pipeline
try:
@ -270,6 +287,210 @@ async def clean_retrigger_pipeline(
}
# ── Admin: Re-run Single Stage ──────────────────────────────────────────────
@router.post("/admin/pipeline/rerun-stage/{video_id}/{stage_name}")
async def rerun_stage(
video_id: str,
stage_name: str,
prompt_override: str | None = None,
db: AsyncSession = Depends(get_session),
):
"""Re-run a single pipeline stage without running predecessors.
Designed for fast prompt iteration especially stage 5 synthesis.
Optionally accepts a prompt_override string that temporarily replaces
the on-disk prompt template for this run only.
Valid stage names: stage2_segmentation, stage3_extraction,
stage4_classification, stage5_synthesis, stage6_embed_and_index.
"""
from pipeline.stages import _PIPELINE_STAGES, run_single_stage
# Validate stage name
if stage_name not in _PIPELINE_STAGES:
raise HTTPException(
status_code=400,
detail=f"Invalid stage '{stage_name}'. Valid: {_PIPELINE_STAGES}",
)
# Validate video exists
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
# Dispatch single-stage re-run
try:
run_single_stage.delay(
str(video.id),
stage_name,
trigger="stage_rerun",
prompt_override=prompt_override,
)
logger.info(
"Single-stage re-run dispatched: video_id=%s, stage=%s, prompt_override=%s",
video_id, stage_name, "yes" if prompt_override else "no",
)
except Exception as exc:
logger.warning(
"Failed to dispatch single-stage re-run for video_id=%s: %s", video_id, exc,
)
raise HTTPException(
status_code=503,
detail="Stage re-run dispatch failed — Celery/Redis may be unavailable",
) from exc
return {
"status": "stage_rerun_dispatched",
"video_id": str(video.id),
"stage": stage_name,
"prompt_override": bool(prompt_override),
}
# ── Admin: Chunking Inspector ───────────────────────────────────────────────
@router.get("/admin/pipeline/chunking/{video_id}")
async def get_chunking_data(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Return chunking/grouping data for a video — topic boundaries, classifications,
and synthesis group breakdowns.
Helps diagnose whether bad synthesis output is a prompt problem or a data shape problem.
"""
from config import get_settings
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
settings = get_settings()
# 1. Topic boundaries (stage 2 output): segments grouped by topic_label
segments = (
await db.execute(
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
).scalars().all()
topic_boundaries: list[dict] = []
current_label = None
current_group: list[dict] = []
for seg in segments:
label = seg.topic_label or "unlabeled"
if label != current_label:
if current_group:
topic_boundaries.append({
"topic_label": current_label,
"segment_count": len(current_group),
"start_time": current_group[0]["start_time"],
"end_time": current_group[-1]["end_time"],
"start_index": current_group[0]["segment_index"],
"end_index": current_group[-1]["segment_index"],
})
current_label = label
current_group = []
current_group.append({
"start_time": seg.start_time,
"end_time": seg.end_time,
"segment_index": seg.segment_index,
})
if current_group:
topic_boundaries.append({
"topic_label": current_label,
"segment_count": len(current_group),
"start_time": current_group[0]["start_time"],
"end_time": current_group[-1]["end_time"],
"start_index": current_group[0]["segment_index"],
"end_index": current_group[-1]["segment_index"],
})
# 2. Key moments (stage 3 output)
moments = (
await db.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
).scalars().all()
key_moments = [
{
"id": str(m.id),
"title": m.title,
"content_type": m.content_type.value,
"start_time": m.start_time,
"end_time": m.end_time,
"plugins": m.plugins or [],
"technique_page_id": str(m.technique_page_id) if m.technique_page_id else None,
}
for m in moments
]
# 3. Classification data (stage 4 output)
classification: list[dict] = []
cls_source = "missing"
if video.classification_data:
classification = video.classification_data
cls_source = "postgresql"
else:
try:
import redis as redis_lib
r = redis_lib.Redis.from_url(settings.redis_url)
raw = r.get(f"chrysopedia:classification:{video_id}")
if raw:
classification = __import__("json").loads(raw)
cls_source = "redis"
except Exception:
pass
# 4. Synthesis groups (how stage 5 would group moments)
cls_by_moment_id = {c["moment_id"]: c for c in classification}
synthesis_groups: dict[str, dict] = {}
for m in moments:
cls_info = cls_by_moment_id.get(str(m.id), {})
category = cls_info.get("topic_category", "Uncategorized").strip().title()
if category not in synthesis_groups:
synthesis_groups[category] = {
"category": category,
"moment_count": 0,
"moment_ids": [],
"exceeds_chunk_threshold": False,
}
synthesis_groups[category]["moment_count"] += 1
synthesis_groups[category]["moment_ids"].append(str(m.id))
chunk_size = settings.synthesis_chunk_size
for group in synthesis_groups.values():
group["exceeds_chunk_threshold"] = group["moment_count"] > chunk_size
group["chunks_needed"] = max(1, -(-group["moment_count"] // chunk_size)) # ceil division
return {
"video_id": str(video.id),
"total_segments": len(segments),
"total_moments": len(moments),
"classification_source": cls_source,
"synthesis_chunk_size": chunk_size,
"topic_boundaries": topic_boundaries,
"key_moments": key_moments,
"classification": classification,
"synthesis_groups": list(synthesis_groups.values()),
}
# ── Admin: Revoke ────────────────────────────────────────────────────────────
@router.post("/admin/pipeline/revoke/{video_id}")
@ -578,7 +799,215 @@ async def get_token_summary(
)
# ── Admin: Worker status ─────────────────────────────────────────────────────
# ── Admin: Stale Pages ──────────────────────────────────────────────────────
@router.get("/admin/pipeline/stale-pages")
async def get_stale_pages(
db: AsyncSession = Depends(get_session),
):
"""Detect technique pages synthesized with an older prompt than the current one.
Compares the SHA-256 hash of the current stage5_synthesis.txt against the
prompt hashes stored in TechniquePageVersion.pipeline_metadata.
"""
import hashlib
from pathlib import Path as _Path
from models import TechniquePage, TechniquePageVersion
settings = get_settings()
prompt_path = _Path(settings.prompts_path) / "stage5_synthesis.txt"
if not prompt_path.exists():
raise HTTPException(status_code=500, detail="stage5_synthesis.txt not found")
current_hash = hashlib.sha256(
prompt_path.read_text(encoding="utf-8").encode()
).hexdigest()[:12]
# Get all technique pages
pages = (await db.execute(select(TechniquePage))).scalars().all()
total = len(pages)
stale_count = 0
fresh_count = 0
stale_by_creator: dict[str, dict] = {}
for page in pages:
# Get latest version to check prompt hash
latest_version = (
await db.execute(
select(TechniquePageVersion)
.where(TechniquePageVersion.technique_page_id == page.id)
.order_by(TechniquePageVersion.version_number.desc())
.limit(1)
)
).scalar_one_or_none()
page_hash = None
if latest_version and latest_version.pipeline_metadata:
meta = latest_version.pipeline_metadata
page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash"))
if page_hash == current_hash:
fresh_count += 1
else:
stale_count += 1
# Look up creator name
creator = (await db.execute(
select(Creator.name).where(Creator.id == page.creator_id)
)).scalar_one_or_none() or "Unknown"
if creator not in stale_by_creator:
stale_by_creator[creator] = {"creator": creator, "stale_count": 0, "page_slugs": []}
stale_by_creator[creator]["stale_count"] += 1
stale_by_creator[creator]["page_slugs"].append(page.slug)
return {
"current_prompt_hash": current_hash,
"total_pages": total,
"stale_pages": stale_count,
"fresh_pages": fresh_count,
"stale_by_creator": list(stale_by_creator.values()),
}
# ── Admin: Bulk Re-Synthesize ───────────────────────────────────────────────
@router.post("/admin/pipeline/bulk-resynthesize")
async def bulk_resynthesize(
video_ids: list[str] | None = None,
stage: str = "stage5_synthesis",
db: AsyncSession = Depends(get_session),
):
"""Re-run a single stage on multiple videos without full pipeline reset.
If video_ids is None, targets all videos with processing_status='complete'.
Rate-limited to avoid overwhelming the worker queue.
"""
from pipeline.stages import _PIPELINE_STAGES, run_single_stage
if stage not in _PIPELINE_STAGES:
raise HTTPException(status_code=400, detail=f"Invalid stage: {stage}")
if video_ids is None:
result = await db.execute(
select(SourceVideo.id).where(
SourceVideo.processing_status == ProcessingStatus.complete
)
)
video_ids = [str(row[0]) for row in result.all()]
if not video_ids:
return {"status": "no_videos", "dispatched": 0, "skipped": 0, "total": 0}
dispatched = 0
skipped = []
for vid in video_ids:
try:
run_single_stage.delay(vid, stage, trigger="stage_rerun")
dispatched += 1
except Exception as exc:
logger.warning("Bulk re-synth dispatch failed for video_id=%s: %s", vid, exc)
skipped.append({"video_id": vid, "reason": str(exc)})
logger.info(
"[BULK-RESYNTH] Dispatched %d/%d %s re-runs",
dispatched, len(video_ids), stage,
)
return {
"status": "dispatched",
"stage": stage,
"total": len(video_ids),
"dispatched": dispatched,
"skipped": skipped if skipped else None,
}
# ── Admin: Prompt Optimization ──────────────────────────────────────────────
@router.post("/admin/pipeline/optimize-prompt/{stage}")
async def trigger_optimization(
stage: int,
video_id: str = Query(..., description="Video UUID to use as fixture source"),
iterations: int = Query(5, description="Number of optimization iterations"),
variants_per_iter: int = Query(2, description="Variants per iteration"),
):
"""Trigger an automated prompt optimization run as a background Celery task.
Exports a fixture from the specified video, then runs the optimization loop
with generate-score-select cycles. Progress is tracked in
quality/results/progress_stage{N}.json.
"""
from pipeline.quality.scorer import STAGE_CONFIGS
if stage not in STAGE_CONFIGS:
raise HTTPException(status_code=400, detail=f"Unsupported stage {stage}. Valid: {sorted(STAGE_CONFIGS)}")
# Dispatch as a Celery task
from worker import celery_app
@celery_app.task
def _run_optimization(video_id: str, stage: int, iterations: int, variants_per_iter: int) -> dict:
"""Background optimization task."""
import tempfile
from pipeline.export_fixture import export_fixture
from pipeline.quality.optimizer import OptimizationLoop
from pipeline.quality.__main__ import write_results_json
from config import get_settings as _get_settings
from pipeline.llm_client import LLMClient as _LLMClient
settings = _get_settings()
# Export fixture
tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="opt_fixture_", delete=False)
tmp.close()
exit_code = export_fixture(settings.database_url, settings.redis_url, video_id, tmp.name)
if exit_code != 0:
return {"error": f"Fixture export failed (exit code {exit_code})"}
client = _LLMClient(settings)
loop = OptimizationLoop(
client=client,
stage=stage,
fixture_path=tmp.name,
iterations=iterations,
variants_per_iter=variants_per_iter,
output_dir="backend/pipeline/quality/results/",
)
result = loop.run()
json_path = write_results_json(
result=result, output_dir="backend/pipeline/quality/results/",
stage=stage, iterations=iterations, variants_per_iter=variants_per_iter,
fixture_path=tmp.name,
)
return {
"best_score": result.best_score.composite,
"results_path": json_path,
"iterations": iterations,
}
try:
task = _run_optimization.delay(video_id, stage, iterations, variants_per_iter)
logger.info(
"[OPTIMIZE] Dispatched optimization: stage=%d, video_id=%s, iterations=%d, task_id=%s",
stage, video_id, iterations, task.id,
)
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Failed to dispatch optimization: {exc}") from exc
return {
"status": "dispatched",
"task_id": task.id,
"stage": stage,
"video_id": video_id,
"iterations": iterations,
"variants_per_iter": variants_per_iter,
}
# ── Admin: Reindex All ─────────────────────────────────────────────────────
@router.post("/admin/pipeline/reindex-all")
async def reindex_all(

View file

@ -2895,6 +2895,53 @@ a.app-footer__repo:hover {
overflow-y: auto;
}
.modal-subtitle {
color: var(--color-text-secondary);
font-size: 0.85rem;
margin: 0.25rem 0 1rem;
}
.modal-field {
margin-bottom: 1rem;
}
.modal-field label {
display: block;
margin-bottom: 0.35rem;
font-size: 0.85rem;
font-weight: 600;
color: var(--color-text-primary);
}
.modal-field select,
.modal-field textarea {
width: 100%;
background: var(--color-bg);
color: var(--color-text-primary);
border: 1px solid var(--color-border);
border-radius: 6px;
padding: 0.5rem;
font-family: inherit;
font-size: 0.85rem;
}
.modal-field textarea {
resize: vertical;
min-height: 80px;
}
.modal-optional {
font-weight: 400;
color: var(--color-text-secondary);
font-size: 0.8rem;
}
.modal-actions {
display: flex;
gap: 0.5rem;
margin-top: 1rem;
}
.report-modal__title {
margin: 0 0 0.75rem;
color: var(--color-text-primary);
@ -3021,6 +3068,16 @@ a.app-footer__repo:hover {
opacity: 0.85;
}
.btn--warning {
background: var(--color-warning, #f0ad4e);
color: #1a1a1a;
border-color: var(--color-warning, #f0ad4e);
}
.btn--warning:hover:not(:disabled) {
opacity: 0.85;
}
.btn--random {
background: var(--color-bg-input);
color: var(--color-text-primary);
@ -4142,6 +4199,124 @@ a.app-footer__repo:hover {
flex-wrap: wrap;
}
/* ── Chunking Inspector ───────────────────────────────────────────────── */
.chunking-inspector {
margin-top: 0.75rem;
}
.chunking-inspector__toggle {
background: none;
border: none;
color: var(--color-text-secondary);
cursor: pointer;
font-size: 0.85rem;
padding: 0.25rem 0;
}
.chunking-inspector__toggle:hover {
color: var(--color-text-primary);
}
.chunking-inspector__loading,
.chunking-inspector__error {
font-size: 0.8rem;
color: var(--color-text-muted);
padding: 0.5rem 0;
}
.chunking-inspector__error {
color: var(--color-danger);
}
.chunking-inspector__body {
margin-top: 0.5rem;
}
.chunking-inspector__summary {
display: flex;
gap: 1rem;
font-size: 0.8rem;
color: var(--color-text-secondary);
padding: 0.5rem;
background: var(--color-bg);
border-radius: 6px;
margin-bottom: 0.75rem;
flex-wrap: wrap;
}
.chunking-inspector__section {
margin-bottom: 0.75rem;
}
.chunking-inspector__section h4 {
font-size: 0.8rem;
color: var(--color-text-secondary);
margin: 0 0 0.35rem;
font-weight: 600;
}
.chunking-inspector__topics {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.chunking-topic {
display: flex;
justify-content: space-between;
padding: 0.3rem 0.5rem;
background: var(--color-bg);
border-radius: 4px;
font-size: 0.8rem;
}
.chunking-topic__label {
color: var(--color-text-primary);
font-weight: 500;
}
.chunking-topic__meta {
color: var(--color-text-muted);
white-space: nowrap;
}
.chunking-inspector__groups {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
}
.chunking-group {
display: flex;
flex-direction: column;
padding: 0.5rem 0.75rem;
background: var(--color-bg);
border-radius: 6px;
border-left: 3px solid var(--color-accent);
font-size: 0.8rem;
min-width: 140px;
}
.chunking-group--split {
border-left-color: var(--color-warning, #f0ad4e);
}
.chunking-group__category {
font-weight: 600;
color: var(--color-text-primary);
}
.chunking-group__count {
color: var(--color-text-secondary);
}
.chunking-group__warn {
color: var(--color-warning, #f0ad4e);
font-size: 0.75rem;
margin-top: 0.15rem;
}
/* ── Pipeline Badges ────────────────────────────────────────────────────── */
.pipeline-badge {

View file

@ -583,6 +583,113 @@ export async function cleanRetriggerPipeline(videoId: string): Promise<CleanRetr
});
}
// ── Chunking Inspector ─────────────────────────────────────────────────────
export interface ChunkingTopicBoundary {
topic_label: string;
segment_count: number;
start_time: number;
end_time: number;
start_index: number;
end_index: number;
}
export interface ChunkingSynthesisGroup {
category: string;
moment_count: number;
exceeds_chunk_threshold: boolean;
chunks_needed: number;
}
export interface ChunkingDataResponse {
video_id: string;
total_segments: number;
total_moments: number;
classification_source: string;
synthesis_chunk_size: number;
topic_boundaries: ChunkingTopicBoundary[];
key_moments: Array<{
id: string;
title: string;
content_type: string;
start_time: number;
end_time: number;
plugins: string[];
technique_page_id: string | null;
}>;
classification: Array<Record<string, unknown>>;
synthesis_groups: ChunkingSynthesisGroup[];
}
export async function fetchChunkingData(videoId: string): Promise<ChunkingDataResponse> {
return request<ChunkingDataResponse>(`${BASE}/admin/pipeline/chunking/${videoId}`);
}
// ── Single-Stage Re-Run ────────────────────────────────────────────────────
export interface RerunStageResponse {
status: string;
video_id: string;
stage: string;
prompt_override: boolean;
}
export async function rerunStage(
videoId: string,
stageName: string,
promptOverride?: string,
): Promise<RerunStageResponse> {
const body: Record<string, string | undefined> = {};
if (promptOverride) {
body.prompt_override = promptOverride;
}
return request<RerunStageResponse>(
`${BASE}/admin/pipeline/rerun-stage/${videoId}/${stageName}`,
{
method: "POST",
body: Object.keys(body).length > 0 ? JSON.stringify(body) : undefined,
},
);
}
// ── Stale Pages & Bulk Re-Synthesize ───────────────────────────────────────
export interface StalePageCreator {
creator: string;
stale_count: number;
page_slugs: string[];
}
export interface StalePagesResponse {
current_prompt_hash: string;
total_pages: number;
stale_pages: number;
fresh_pages: number;
stale_by_creator: StalePageCreator[];
}
export async function fetchStalePages(): Promise<StalePagesResponse> {
return request<StalePagesResponse>(`${BASE}/admin/pipeline/stale-pages`);
}
export interface BulkResynthResponse {
status: string;
stage: string;
total: number;
dispatched: number;
skipped: Array<{ video_id: string; reason: string }> | null;
}
export async function bulkResynthesize(
videoIds?: string[],
stage = "stage5_synthesis",
): Promise<BulkResynthResponse> {
return request<BulkResynthResponse>(`${BASE}/admin/pipeline/bulk-resynthesize`, {
method: "POST",
body: JSON.stringify({ video_ids: videoIds ?? null, stage }),
});
}
// ── Debug Mode ──────────────────────────────────────────────────────────────
export interface DebugModeResponse {

View file

@ -16,6 +16,10 @@ import {
triggerPipeline,
revokePipeline,
cleanRetriggerPipeline,
rerunStage,
fetchChunkingData,
fetchStalePages,
bulkResynthesize,
fetchCreators,
fetchRecentActivity,
type PipelineVideoItem,
@ -771,6 +775,88 @@ interface BulkLogEntry {
message: string;
}
// ── Chunking Inspector ─────────────────────────────────────────────────────
function ChunkingInspector({ videoId }: { videoId: string }) {
const [data, setData] = useState<Awaited<ReturnType<typeof fetchChunkingData>> | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [expanded, setExpanded] = useState(false);
useEffect(() => {
if (!expanded) return;
let cancelled = false;
setLoading(true);
fetchChunkingData(videoId)
.then((res) => { if (!cancelled) { setData(res); setError(null); } })
.catch((err) => { if (!cancelled) setError(err instanceof Error ? err.message : "Failed"); })
.finally(() => { if (!cancelled) setLoading(false); });
return () => { cancelled = true; };
}, [videoId, expanded]);
return (
<div className="chunking-inspector">
<button
className="chunking-inspector__toggle"
onClick={() => setExpanded(!expanded)}
>
{expanded ? "▾" : "▸"} Chunking Inspector
</button>
{expanded && loading && <div className="chunking-inspector__loading">Loading chunking data...</div>}
{expanded && error && <div className="chunking-inspector__error">{error}</div>}
{expanded && data && (
<div className="chunking-inspector__body">
<div className="chunking-inspector__summary">
<span>{data.total_segments} segments</span>
<span>{data.total_moments} moments</span>
<span>Classification: {data.classification_source}</span>
<span>Chunk size: {data.synthesis_chunk_size}</span>
</div>
{/* Topic Boundaries */}
<div className="chunking-inspector__section">
<h4>Topic Boundaries (Stage 2)</h4>
<div className="chunking-inspector__topics">
{data.topic_boundaries.map((tb, i) => (
<div key={i} className="chunking-topic" title={tb.topic_label}>
<span className="chunking-topic__label">{tb.topic_label}</span>
<span className="chunking-topic__meta">
{tb.segment_count} segs &middot; {tb.start_time.toFixed(0)}s-{tb.end_time.toFixed(0)}s
</span>
</div>
))}
</div>
</div>
{/* Synthesis Groups */}
<div className="chunking-inspector__section">
<h4>Synthesis Groups (Stage 5 Input)</h4>
<div className="chunking-inspector__groups">
{data.synthesis_groups.map((g) => (
<div
key={g.category}
className={`chunking-group ${g.exceeds_chunk_threshold ? "chunking-group--split" : ""}`}
>
<span className="chunking-group__category">{g.category}</span>
<span className="chunking-group__count">{g.moment_count} moments</span>
{g.exceeds_chunk_threshold && (
<span className="chunking-group__warn">
Will split into {g.chunks_needed} chunks
</span>
)}
</div>
))}
</div>
</div>
</div>
)}
</div>
);
}
export default function AdminPipeline() {
useDocumentTitle("Pipeline Management — Chrysopedia");
const [searchParams] = useSearchParams();
@ -786,6 +872,12 @@ export default function AdminPipeline() {
const [creatorFilter, setCreatorFilter] = useState<string | null>(null);
const [searchQuery, setSearchQuery] = useState("");
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
// Re-run stage modal state
const [rerunModalVideo, setRerunModalVideo] = useState<string | null>(null);
const [rerunStageSelect, setRerunStageSelect] = useState("stage5_synthesis");
const [rerunPromptOverride, setRerunPromptOverride] = useState("");
// Stale pages state
const [stalePagesCount, setStalePagesCount] = useState<number | null>(null);
const [bulkProgress, setBulkProgress] = useState<BulkProgress | null>(null);
const [bulkLog, setBulkLog] = useState<BulkLogEntry[]>([]);
const [changedIds, setChangedIds] = useState<Set<string>>(new Set());
@ -869,6 +961,32 @@ export default function AdminPipeline() {
});
}, []);
// Load stale page count
useEffect(() => {
fetchStalePages()
.then((res) => setStalePagesCount(res.stale_pages))
.catch(() => { /* silently fail */ });
}, []);
const handleBulkResynth = async () => {
if (!confirm("Re-run stage 5 synthesis on all completed videos with the current prompt?")) return;
try {
const res = await bulkResynthesize();
setActionMessage({
id: "__bulk__",
text: `Bulk re-synthesize dispatched: ${res.dispatched}/${res.total} videos`,
ok: true,
});
setStalePagesCount(null); // will refresh
} catch (err) {
setActionMessage({
id: "__bulk__",
text: err instanceof Error ? err.message : "Bulk re-synth failed",
ok: false,
});
}
};
// Deep-link: auto-expand and scroll to ?video=<id> on first load
useEffect(() => {
if (deepLinked.current || loading || videos.length === 0) return;
@ -941,6 +1059,34 @@ export default function AdminPipeline() {
}
};
const handleRerunStage = async (videoId: string) => {
setActionLoading(videoId);
setActionMessage(null);
try {
const res = await rerunStage(
videoId,
rerunStageSelect,
rerunPromptOverride.trim() || undefined,
);
setActionMessage({
id: videoId,
text: `Stage re-run dispatched: ${res.stage}${res.prompt_override ? " (with prompt override)" : ""}`,
ok: true,
});
setRerunModalVideo(null);
setRerunPromptOverride("");
setTimeout(() => void load(), 2000);
} catch (err) {
setActionMessage({
id: videoId,
text: err instanceof Error ? err.message : "Stage re-run failed",
ok: false,
});
} finally {
setActionLoading(null);
}
};
const toggleExpand = (id: string) => {
setExpandedId((prev) => (prev === id ? null : id));
};
@ -1033,6 +1179,15 @@ export default function AdminPipeline() {
</p>
</div>
<div className="admin-pipeline__header-right">
{stalePagesCount !== null && stalePagesCount > 0 && (
<button
className="btn btn--small btn--warning"
onClick={() => void handleBulkResynth()}
title={`${stalePagesCount} pages synthesized with an older prompt`}
>
{stalePagesCount} stale pages
</button>
)}
<DebugModeToggle debugMode={debugMode} onDebugModeChange={setDebugModeState} />
<WorkerStatus />
<label className="auto-refresh-toggle" title="Auto-refresh video list every 15 seconds">
@ -1235,6 +1390,14 @@ export default function AdminPipeline() {
>
{actionLoading === video.id ? "…" : "■ Revoke"}
</button>
<button
className="btn btn--small"
onClick={() => setRerunModalVideo(video.id)}
disabled={actionLoading === video.id}
title="Re-run a single pipeline stage"
>
Re-run Stage
</button>
</div>
</div>
@ -1251,6 +1414,7 @@ export default function AdminPipeline() {
<span>Updated: {formatDate(video.updated_at)}</span>
</div>
<RunList videoId={video.id} videoStatus={video.processing_status} />
<ChunkingInspector videoId={video.id} />
</div>
)}
</div>
@ -1271,6 +1435,59 @@ export default function AdminPipeline() {
</div>
</>
)}
{/* Re-run Stage Modal */}
{rerunModalVideo && (
<div className="modal-overlay" onClick={() => setRerunModalVideo(null)}>
<div className="modal-content" onClick={(e) => e.stopPropagation()}>
<h3>Re-run Single Stage</h3>
<p className="modal-subtitle">
Re-run a specific pipeline stage without re-running predecessors.
</p>
<div className="modal-field">
<label htmlFor="rerun-stage-select">Stage</label>
<select
id="rerun-stage-select"
value={rerunStageSelect}
onChange={(e) => setRerunStageSelect(e.target.value)}
>
<option value="stage2_segmentation">Stage 2: Segmentation</option>
<option value="stage3_extraction">Stage 3: Extraction</option>
<option value="stage4_classification">Stage 4: Classification</option>
<option value="stage5_synthesis">Stage 5: Synthesis</option>
<option value="stage6_embed_and_index">Stage 6: Embed & Index</option>
</select>
</div>
<div className="modal-field">
<label htmlFor="rerun-prompt-override">
Prompt Override <span className="modal-optional">(optional)</span>
</label>
<textarea
id="rerun-prompt-override"
value={rerunPromptOverride}
onChange={(e) => setRerunPromptOverride(e.target.value)}
placeholder="Paste a modified prompt here to test it for this run only. Leave empty to use the current on-disk prompt."
rows={6}
/>
</div>
<div className="modal-actions">
<button
className="btn btn--primary"
onClick={() => void handleRerunStage(rerunModalVideo)}
disabled={actionLoading === rerunModalVideo}
>
{actionLoading === rerunModalVideo ? "Dispatching..." : "Run Stage"}
</button>
<button
className="btn"
onClick={() => { setRerunModalVideo(null); setRerunPromptOverride(""); }}
>
Cancel
</button>
</div>
</div>
</div>
)}
</div>
);
}