"""Offline prompt test harness for Chrysopedia synthesis. Loads a fixture JSON (exported by export_fixture.py) and a prompt file, calls the LLM, and outputs the synthesized result. No Docker, no database, no Redis, no Celery — just prompt + fixture + LLM endpoint. Usage: python -m pipeline.test_harness \\ --fixture fixtures/real_video_xyz.json \\ --prompt prompts/stage5_synthesis.txt \\ --output /tmp/result.json # Run all categories in a fixture: python -m pipeline.test_harness --fixture fixtures/video.json # Run a specific category only: python -m pipeline.test_harness --fixture fixtures/video.json --category "Sound Design" Exit codes: 0=success, 1=LLM error, 2=parse error, 3=fixture error """ from __future__ import annotations import argparse import json import sys import time from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import NamedTuple from pydantic import ValidationError from config import get_settings from pipeline.llm_client import LLMClient, estimate_max_tokens from pipeline.schemas import SynthesisResult # ── Lightweight stand-in for KeyMoment ORM model ─────────────────────────── class _MockContentType: """Mimics KeyMomentContentType enum with a .value property.""" def __init__(self, value: str) -> None: self.value = value class MockKeyMoment(NamedTuple): """Lightweight stand-in for the ORM KeyMoment. Has the same attributes that _build_moments_text() accesses: title, summary, content_type, start_time, end_time, plugins, raw_transcript. """ title: str summary: str content_type: object # _MockContentType start_time: float end_time: float plugins: list[str] raw_transcript: str def _log(tag: str, msg: str, level: str = "INFO") -> None: """Write structured log line to stderr.""" print(f"[HARNESS] [{level}] {tag}: {msg}", file=sys.stderr) # ── Moment text builder (mirrors stages.py _build_moments_text) ──────────── def build_moments_text( moment_group: list[tuple[MockKeyMoment, dict]], category: str, ) -> tuple[str, set[str]]: """Build the moments prompt text — matches _build_moments_text in stages.py.""" moments_lines = [] all_tags: set[str] = set() for i, (m, cls_info) in enumerate(moment_group): tags = cls_info.get("topic_tags", []) all_tags.update(tags) moments_lines.append( f"[{i}] Title: {m.title}\n" f" Summary: {m.summary}\n" f" Content type: {m.content_type.value}\n" f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n" f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n" f" Category: {category}\n" f" Tags: {', '.join(tags) if tags else 'none'}\n" f" Transcript excerpt: {(m.raw_transcript or '')[:300]}" ) return "\n\n".join(moments_lines), all_tags # ── Fixture loading ──────────────────────────────────────────────────────── @dataclass class FixtureData: """Parsed fixture with moments grouped by category.""" creator_name: str video_id: str content_type: str filename: str # Groups: category -> list of (MockKeyMoment, cls_info_dict) groups: dict[str, list[tuple[MockKeyMoment, dict]]] total_moments: int def load_fixture(path: str) -> FixtureData: """Load and parse a fixture JSON file into grouped moments.""" fixture_path = Path(path) if not fixture_path.exists(): raise FileNotFoundError(f"Fixture not found: {path}") raw = fixture_path.read_text(encoding="utf-8") size_kb = len(raw.encode("utf-8")) / 1024 data = json.loads(raw) moments_raw = data.get("moments", []) if not moments_raw: raise ValueError(f"Fixture has no moments: {path}") _log("FIXTURE", f"Loading: {path} ({size_kb:.1f} KB, {len(moments_raw)} moments)") # Build MockKeyMoment objects and group by category groups: dict[str, list[tuple[MockKeyMoment, dict]]] = defaultdict(list) for m in moments_raw: cls = m.get("classification", {}) category = cls.get("topic_category", m.get("topic_category", "Uncategorized")) tags = cls.get("topic_tags", m.get("topic_tags", [])) mock = MockKeyMoment( title=m.get("title", m.get("summary", "")[:80]), summary=m.get("summary", ""), content_type=_MockContentType(m.get("content_type", "technique")), start_time=m.get("start_time", 0.0), end_time=m.get("end_time", 0.0), plugins=m.get("plugins", []), raw_transcript=m.get("raw_transcript", m.get("transcript_excerpt", "")), ) cls_info = {"topic_category": category, "topic_tags": tags} groups[category].append((mock, cls_info)) # Log breakdown cat_counts = {cat: len(moms) for cat, moms in groups.items()} counts = list(cat_counts.values()) _log( "FIXTURE", f"Breakdown: {len(groups)} categories, " f"moments per category: min={min(counts)}, max={max(counts)}, " f"avg={sum(counts)/len(counts):.1f}", ) for cat, count in sorted(cat_counts.items(), key=lambda x: -x[1]): _log("FIXTURE", f" {cat}: {count} moments") return FixtureData( creator_name=data.get("creator_name", "Unknown"), video_id=data.get("video_id", "unknown"), content_type=data.get("content_type", "tutorial"), filename=data.get("filename", "unknown"), groups=dict(groups), total_moments=len(moments_raw), ) # ── Synthesis runner ─────────────────────────────────────────────────────── def run_synthesis( fixture: FixtureData, prompt_path: str, category_filter: str | None = None, model_override: str | None = None, modality: str | None = None, ) -> tuple[list[dict], int]: """Run synthesis on fixture data, returns (pages, exit_code). Returns all synthesized pages as dicts plus an exit code. """ # Load prompt prompt_file = Path(prompt_path) if not prompt_file.exists(): _log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR") return [], 3 system_prompt = prompt_file.read_text(encoding="utf-8") _log("PROMPT", f"Loading: {prompt_path} ({len(system_prompt)} chars)") # Setup LLM settings = get_settings() llm = LLMClient(settings) stage_model = model_override or settings.llm_stage5_model or settings.llm_model stage_modality = modality or settings.llm_stage5_modality or "thinking" hard_limit = settings.llm_max_tokens_hard_limit _log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}") # Filter categories if requested categories = fixture.groups if category_filter: if category_filter not in categories: _log("ERROR", f"Category '{category_filter}' not found. Available: {list(categories.keys())}", level="ERROR") return [], 3 categories = {category_filter: categories[category_filter]} all_pages: list[dict] = [] total_prompt_tokens = 0 total_completion_tokens = 0 total_duration_ms = 0 exit_code = 0 for cat_idx, (category, moment_group) in enumerate(categories.items(), 1): _log("SYNTH", f"Category {cat_idx}/{len(categories)}: '{category}' ({len(moment_group)} moments)") # Build user prompt (same format as stages.py _synthesize_chunk) moments_text, all_tags = build_moments_text(moment_group, category) user_prompt = f"{fixture.creator_name}\n\n{moments_text}\n" estimated_tokens = estimate_max_tokens( system_prompt, user_prompt, stage="stage5_synthesis", hard_limit=hard_limit, ) _log( "SYNTH", f" Building prompt: {len(moment_group)} moments, " f"max_tokens={estimated_tokens}, tags={sorted(all_tags)[:5]}{'...' if len(all_tags) > 5 else ''}", ) # Call LLM call_start = time.monotonic() _log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}") try: raw = llm.complete( system_prompt, user_prompt, response_model=SynthesisResult, modality=stage_modality, model_override=stage_model, max_tokens=estimated_tokens, ) except Exception as exc: _log("ERROR", f" LLM call failed: {exc}", level="ERROR") exit_code = 1 continue call_duration_ms = int((time.monotonic() - call_start) * 1000) prompt_tokens = getattr(raw, "prompt_tokens", None) or 0 completion_tokens = getattr(raw, "completion_tokens", None) or 0 finish_reason = getattr(raw, "finish_reason", "unknown") total_prompt_tokens += prompt_tokens total_completion_tokens += completion_tokens total_duration_ms += call_duration_ms _log( "LLM", f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, " f"{call_duration_ms}ms, finish_reason={finish_reason}", ) if finish_reason == "length": _log( "WARN", " finish_reason=length — output likely truncated! " "Consider reducing fixture size or increasing max_tokens.", level="WARN", ) # Parse response try: result = SynthesisResult.model_validate_json(str(raw)) except (ValidationError, json.JSONDecodeError) as exc: _log("ERROR", f" Parse failed: {exc}", level="ERROR") _log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR") exit_code = 2 continue # Log per-page summary _log("SYNTH", f" Parsed: {len(result.pages)} pages synthesized") total_words = 0 for page in result.pages: sections = page.body_sections or {} word_count = sum(len(str(v).split()) for v in sections.values()) total_words += word_count _log( "PAGE", f" '{page.title}' ({page.slug}): " f"{len(sections)} sections, {word_count} words, " f"{len(page.moment_indices)} moments linked, " f"quality={page.source_quality}", ) all_pages.append(page.model_dump()) # Summary _log("SUMMARY", f"Total: {len(all_pages)} pages across {len(categories)} categories") _log("SUMMARY", f"Tokens: {total_prompt_tokens} prompt + {total_completion_tokens} completion = {total_prompt_tokens + total_completion_tokens} total") _log("SUMMARY", f"Duration: {total_duration_ms}ms ({total_duration_ms / 1000:.1f}s)") return all_pages, exit_code # ── Promote: deploy a prompt to production ───────────────────────────────── _STAGE_PROMPT_MAP = { 2: "stage2_segmentation.txt", 3: "stage3_extraction.txt", 4: "stage4_classification.txt", 5: "stage5_synthesis.txt", } def promote_prompt(prompt_path: str, stage: int, reason: str, commit: bool = False) -> int: """Copy a winning prompt to the canonical path and create a backup. The worker reads prompts from disk at runtime — no restart needed. """ import hashlib import shutil if stage not in _STAGE_PROMPT_MAP: _log("ERROR", f"Invalid stage {stage}. Valid: {sorted(_STAGE_PROMPT_MAP)}", level="ERROR") return 1 settings = get_settings() template_name = _STAGE_PROMPT_MAP[stage] canonical = Path(settings.prompts_path) / template_name source = Path(prompt_path) if not source.exists(): _log("ERROR", f"Source prompt not found: {prompt_path}", level="ERROR") return 1 new_prompt = source.read_text(encoding="utf-8") new_hash = hashlib.sha256(new_prompt.encode()).hexdigest()[:12] # Backup current prompt old_prompt = "" old_hash = "none" if canonical.exists(): old_prompt = canonical.read_text(encoding="utf-8") old_hash = hashlib.sha256(old_prompt.encode()).hexdigest()[:12] if old_prompt.strip() == new_prompt.strip(): _log("PROMOTE", "No change — new prompt is identical to current prompt") return 0 archive_dir = Path(settings.prompts_path) / "archive" archive_dir.mkdir(parents=True, exist_ok=True) ts = time.strftime("%Y%m%d_%H%M%S", time.gmtime()) backup = archive_dir / f"{template_name.replace('.txt', '')}_{ts}.txt" shutil.copy2(canonical, backup) _log("PROMOTE", f"Backed up current prompt: {old_hash} -> {backup}") # Write new prompt canonical.write_text(new_prompt, encoding="utf-8") old_lines = old_prompt.strip().splitlines() new_lines = new_prompt.strip().splitlines() _log("PROMOTE", f"Installed new prompt: {new_hash} ({len(new_prompt)} chars, {len(new_lines)} lines)") _log("PROMOTE", f"Previous: {old_hash} ({len(old_prompt)} chars, {len(old_lines)} lines)") _log("PROMOTE", f"Reason: {reason}") _log("PROMOTE", "Worker reads prompts from disk at runtime — no restart needed") if commit: import subprocess try: subprocess.run( ["git", "add", str(canonical)], cwd=str(canonical.parent.parent), check=True, capture_output=True, ) msg = f"prompt: promote stage{stage} — {reason}" subprocess.run( ["git", "commit", "-m", msg], cwd=str(canonical.parent.parent), check=True, capture_output=True, ) _log("PROMOTE", f"Git commit created: {msg}") except subprocess.CalledProcessError as exc: _log("PROMOTE", f"Git commit failed: {exc}", level="WARN") return 0 # ── CLI ──────────────────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser( prog="pipeline.test_harness", description="Offline prompt test harness for Chrysopedia synthesis", ) sub = parser.add_subparsers(dest="command") # -- run subcommand (default behavior) -- run_parser = sub.add_parser("run", help="Run synthesis against a fixture") run_parser.add_argument("--fixture", "-f", type=str, required=True, help="Fixture JSON file") run_parser.add_argument("--prompt", "-p", type=str, default=None, help="Prompt file (default: stage5_synthesis.txt)") run_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path") run_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category") run_parser.add_argument("--model", type=str, default=None, help="Override LLM model") run_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"]) # -- promote subcommand -- promo_parser = sub.add_parser("promote", help="Deploy a winning prompt to production") promo_parser.add_argument("--prompt", "-p", type=str, required=True, help="Path to the winning prompt file") promo_parser.add_argument("--stage", "-s", type=int, default=5, help="Stage number (default: 5)") promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted") promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit") args = parser.parse_args() # If no subcommand, check for --fixture for backward compat if args.command is None: # Support running without subcommand for backward compat parser.print_help() return 1 if args.command == "promote": return promote_prompt(args.prompt, args.stage, args.reason, args.commit) # -- run command -- prompt_path = args.prompt if prompt_path is None: settings = get_settings() prompt_path = str(Path(settings.prompts_path) / "stage5_synthesis.txt") overall_start = time.monotonic() try: fixture = load_fixture(args.fixture) except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc: _log("ERROR", f"Fixture error: {exc}", level="ERROR") return 3 pages, exit_code = run_synthesis( fixture=fixture, prompt_path=prompt_path, category_filter=args.category, model_override=args.model, modality=args.modality, ) if not pages and exit_code != 0: return exit_code output = { "fixture_source": args.fixture, "prompt_source": prompt_path, "creator_name": fixture.creator_name, "video_id": fixture.video_id, "category_filter": args.category, "pages": pages, "metadata": { "page_count": len(pages), "total_words": sum( sum(len(str(v).split()) for v in p.get("body_sections", {}).values()) for p in pages ), "elapsed_seconds": round(time.monotonic() - overall_start, 1), }, } output_json = json.dumps(output, indent=2, ensure_ascii=False) if args.output: Path(args.output).parent.mkdir(parents=True, exist_ok=True) Path(args.output).write_text(output_json, encoding="utf-8") _log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)") else: print(output_json) _log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)") total_elapsed = time.monotonic() - overall_start _log("DONE", f"Completed in {total_elapsed:.1f}s (exit_code={exit_code})") return exit_code if __name__ == "__main__": sys.exit(main())