"""Offline prompt test harness for Chrysopedia synthesis.
Loads a fixture JSON (exported by export_fixture.py) and a prompt file,
calls the LLM, and outputs the synthesized result. No Docker, no database,
no Redis, no Celery — just prompt + fixture + LLM endpoint.
Usage:
python -m pipeline.test_harness \\
--fixture fixtures/real_video_xyz.json \\
--prompt prompts/stage5_synthesis.txt \\
--output /tmp/result.json
# Run all categories in a fixture:
python -m pipeline.test_harness --fixture fixtures/video.json
# Run a specific category only:
python -m pipeline.test_harness --fixture fixtures/video.json --category "Sound Design"
Exit codes: 0=success, 1=LLM error, 2=parse error, 3=fixture error
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import NamedTuple
from pydantic import ValidationError
from config import get_settings
from pipeline.citation_utils import validate_citations
from pipeline.llm_client import LLMClient, estimate_max_tokens
from pipeline.schemas import SynthesizedPage, SynthesisResult
# ── Lightweight stand-in for KeyMoment ORM model ───────────────────────────
class _MockContentType:
"""Mimics KeyMomentContentType enum with a .value property."""
def __init__(self, value: str) -> None:
self.value = value
class MockKeyMoment(NamedTuple):
"""Lightweight stand-in for the ORM KeyMoment.
Has the same attributes that _build_moments_text() accesses:
title, summary, content_type, start_time, end_time, plugins, raw_transcript.
"""
title: str
summary: str
content_type: object # _MockContentType
start_time: float
end_time: float
plugins: list[str]
raw_transcript: str
def _log(tag: str, msg: str, level: str = "INFO") -> None:
"""Write structured log line to stderr."""
print(f"[HARNESS] [{level}] {tag}: {msg}", file=sys.stderr)
# ── Moment text builder (mirrors stages.py _build_moments_text) ────────────
def build_moments_text(
moment_group: list[tuple[MockKeyMoment, dict]],
category: str,
) -> tuple[str, set[str]]:
"""Build the moments prompt text — matches _build_moments_text in stages.py."""
moments_lines = []
all_tags: set[str] = set()
for i, (m, cls_info) in enumerate(moment_group):
tags = cls_info.get("topic_tags", [])
all_tags.update(tags)
moments_lines.append(
f"[{i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
return "\n\n".join(moments_lines), all_tags
# ── Fixture loading ────────────────────────────────────────────────────────
@dataclass
class FixtureData:
"""Parsed fixture with moments grouped by category."""
creator_name: str
video_id: str
content_type: str
filename: str
# Groups: category -> list of (MockKeyMoment, cls_info_dict)
groups: dict[str, list[tuple[MockKeyMoment, dict]]]
total_moments: int
def load_fixture(path: str) -> FixtureData:
"""Load and parse a fixture JSON file into grouped moments."""
fixture_path = Path(path)
if not fixture_path.exists():
raise FileNotFoundError(f"Fixture not found: {path}")
raw = fixture_path.read_text(encoding="utf-8")
size_kb = len(raw.encode("utf-8")) / 1024
data = json.loads(raw)
moments_raw = data.get("moments", [])
if not moments_raw:
raise ValueError(f"Fixture has no moments: {path}")
_log("FIXTURE", f"Loading: {path} ({size_kb:.1f} KB, {len(moments_raw)} moments)")
# Build MockKeyMoment objects and group by category
groups: dict[str, list[tuple[MockKeyMoment, dict]]] = defaultdict(list)
for m in moments_raw:
cls = m.get("classification", {})
category = cls.get("topic_category", m.get("topic_category", "Uncategorized"))
tags = cls.get("topic_tags", m.get("topic_tags", []))
mock = MockKeyMoment(
title=m.get("title", m.get("summary", "")[:80]),
summary=m.get("summary", ""),
content_type=_MockContentType(m.get("content_type", "technique")),
start_time=m.get("start_time", 0.0),
end_time=m.get("end_time", 0.0),
plugins=m.get("plugins", []),
raw_transcript=m.get("raw_transcript", m.get("transcript_excerpt", "")),
)
cls_info = {"topic_category": category, "topic_tags": tags}
groups[category].append((mock, cls_info))
# Log breakdown
cat_counts = {cat: len(moms) for cat, moms in groups.items()}
counts = list(cat_counts.values())
_log(
"FIXTURE",
f"Breakdown: {len(groups)} categories, "
f"moments per category: min={min(counts)}, max={max(counts)}, "
f"avg={sum(counts)/len(counts):.1f}",
)
for cat, count in sorted(cat_counts.items(), key=lambda x: -x[1]):
_log("FIXTURE", f" {cat}: {count} moments")
return FixtureData(
creator_name=data.get("creator_name", "Unknown"),
video_id=data.get("video_id", "unknown"),
content_type=data.get("content_type", "tutorial"),
filename=data.get("filename", "unknown"),
groups=dict(groups),
total_moments=len(moments_raw),
)
# ── Synthesis runner ───────────────────────────────────────────────────────
def run_synthesis(
fixture: FixtureData,
prompt_path: str,
category_filter: str | None = None,
model_override: str | None = None,
modality: str | None = None,
) -> tuple[list[dict], int]:
"""Run synthesis on fixture data, returns (pages, exit_code).
Returns all synthesized pages as dicts plus an exit code.
"""
# Load prompt
prompt_file = Path(prompt_path)
if not prompt_file.exists():
_log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
return [], 3
system_prompt = prompt_file.read_text(encoding="utf-8")
_log("PROMPT", f"Loading: {prompt_path} ({len(system_prompt)} chars)")
# Setup LLM
settings = get_settings()
llm = LLMClient(settings)
stage_model = model_override or settings.llm_stage5_model or settings.llm_model
stage_modality = modality or settings.llm_stage5_modality or "thinking"
hard_limit = settings.llm_max_tokens_hard_limit
_log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
# Filter categories if requested
categories = fixture.groups
if category_filter:
if category_filter not in categories:
_log("ERROR", f"Category '{category_filter}' not found. Available: {list(categories.keys())}", level="ERROR")
return [], 3
categories = {category_filter: categories[category_filter]}
all_pages: list[dict] = []
total_prompt_tokens = 0
total_completion_tokens = 0
total_duration_ms = 0
exit_code = 0
for cat_idx, (category, moment_group) in enumerate(categories.items(), 1):
_log("SYNTH", f"Category {cat_idx}/{len(categories)}: '{category}' ({len(moment_group)} moments)")
# Build user prompt (same format as stages.py _synthesize_chunk)
moments_text, all_tags = build_moments_text(moment_group, category)
user_prompt = f"{fixture.creator_name}\n\n{moments_text}\n"
estimated_tokens = estimate_max_tokens(
system_prompt, user_prompt,
stage="stage5_synthesis",
hard_limit=hard_limit,
)
_log(
"SYNTH",
f" Building prompt: {len(moment_group)} moments, "
f"max_tokens={estimated_tokens}, tags={sorted(all_tags)[:5]}{'...' if len(all_tags) > 5 else ''}",
)
# Call LLM
call_start = time.monotonic()
_log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
try:
raw = llm.complete(
system_prompt,
user_prompt,
response_model=SynthesisResult,
modality=stage_modality,
model_override=stage_model,
max_tokens=estimated_tokens,
)
except Exception as exc:
_log("ERROR", f" LLM call failed: {exc}", level="ERROR")
exit_code = 1
continue
call_duration_ms = int((time.monotonic() - call_start) * 1000)
prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
completion_tokens = getattr(raw, "completion_tokens", None) or 0
finish_reason = getattr(raw, "finish_reason", "unknown")
total_prompt_tokens += prompt_tokens
total_completion_tokens += completion_tokens
total_duration_ms += call_duration_ms
_log(
"LLM",
f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
f"{call_duration_ms}ms, finish_reason={finish_reason}",
)
if finish_reason == "length":
_log(
"WARN",
" finish_reason=length — output likely truncated! "
"Consider reducing fixture size or increasing max_tokens.",
level="WARN",
)
# Parse response
try:
result = SynthesisResult.model_validate_json(str(raw))
except (ValidationError, json.JSONDecodeError) as exc:
_log("ERROR", f" Parse failed: {exc}", level="ERROR")
_log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
exit_code = 2
continue
# Log per-page summary
_log("SYNTH", f" Parsed: {len(result.pages)} pages synthesized")
total_words = 0
for page in result.pages:
sections = page.body_sections or []
section_count = len(sections)
subsection_count = sum(len(s.subsections) for s in sections)
word_count = sum(
len(s.content.split()) + sum(len(sub.content.split()) for sub in s.subsections)
for s in sections
)
total_words += word_count
_log(
"PAGE",
f" '{page.title}' ({page.slug}): "
f"{section_count} sections ({subsection_count} subsections), "
f"{word_count} words, "
f"{len(page.moment_indices)} moments linked, "
f"quality={page.source_quality}",
)
# Citation coverage reporting
cit = validate_citations(sections, len(page.moment_indices))
_log(
"CITE",
f" Citations: {cit['total_citations']}/{len(page.moment_indices)} moments cited "
f"({cit['coverage_pct']}% coverage)"
+ (f", invalid indices: {cit['invalid_indices']}" if cit['invalid_indices'] else "")
+ (f", uncited: {cit['uncited_moments']}" if cit['uncited_moments'] else ""),
)
all_pages.append(page.model_dump())
# Summary
_log("SUMMARY", f"Total: {len(all_pages)} pages across {len(categories)} categories")
_log("SUMMARY", f"Tokens: {total_prompt_tokens} prompt + {total_completion_tokens} completion = {total_prompt_tokens + total_completion_tokens} total")
_log("SUMMARY", f"Duration: {total_duration_ms}ms ({total_duration_ms / 1000:.1f}s)")
return all_pages, exit_code
# ── Compose: merge new moments into existing page ──────────────────────────
def _count_page_words(page_dict: dict) -> int:
"""Count total words in a page's body sections."""
return sum(
len(s.get("content", "").split())
+ sum(len(sub.get("content", "").split()) for sub in s.get("subsections", []))
for s in page_dict.get("body_sections", [])
)
def build_compose_prompt(
existing_page: dict,
existing_moments: list[tuple[MockKeyMoment, dict]],
new_moments: list[tuple[MockKeyMoment, dict]],
creator_name: str,
) -> str:
"""Build the user prompt for composition (merging new moments into an existing page).
Existing moments keep indices [0]-[N-1].
New moments get indices [N]-[N+M-1].
Uses build_moments_text() for formatting, with index offsets applied for new moments.
"""
category = existing_page.get("topic_category", "Uncategorized")
# Format existing moments [0]-[N-1]
existing_text, _ = build_moments_text(existing_moments, category)
# Format new moments with offset indices [N]-[N+M-1]
n = len(existing_moments)
new_lines = []
for i, (m, cls_info) in enumerate(new_moments):
tags = cls_info.get("topic_tags", [])
new_lines.append(
f"[{n + i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
new_text = "\n\n".join(new_lines)
page_json = json.dumps(existing_page, indent=2, ensure_ascii=False)
return (
f"\n{page_json}\n\n"
f"\n{existing_text}\n\n"
f"\n{new_text}\n\n"
f"{creator_name}"
)
def run_compose(
existing_page_path: str,
existing_fixture_path: str,
new_fixture_path: str,
prompt_path: str,
category_filter: str | None = None,
model_override: str | None = None,
modality: str | None = None,
) -> tuple[list[dict], int]:
"""Run composition: merge new fixture moments into an existing page.
Returns (pages, exit_code) — same shape as run_synthesis().
"""
# Load existing page JSON
existing_page_file = Path(existing_page_path)
if not existing_page_file.exists():
_log("ERROR", f"Existing page not found: {existing_page_path}", level="ERROR")
return [], 3
try:
existing_raw = json.loads(existing_page_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
_log("ERROR", f"Invalid JSON in existing page: {exc}", level="ERROR")
return [], 3
# The existing page file might be a harness output (with .pages[]) or a raw SynthesizedPage
if "pages" in existing_raw and isinstance(existing_raw["pages"], list):
page_dicts = existing_raw["pages"]
_log("COMPOSE", f"Loaded harness output with {len(page_dicts)} pages")
elif "title" in existing_raw and "body_sections" in existing_raw:
page_dicts = [existing_raw]
_log("COMPOSE", "Loaded single SynthesizedPage")
else:
_log("ERROR", "Existing page JSON must be a SynthesizedPage or harness output with 'pages' key", level="ERROR")
return [], 3
# Validate each page against SynthesizedPage
validated_pages: list[dict] = []
for pd in page_dicts:
try:
SynthesizedPage.model_validate(pd)
validated_pages.append(pd)
except ValidationError as exc:
_log("WARN", f"Skipping invalid page '{pd.get('title', '?')}': {exc}", level="WARN")
if not validated_pages:
_log("ERROR", "No valid SynthesizedPage found in existing page file", level="ERROR")
return [], 3
# Apply category filter
if category_filter:
validated_pages = [p for p in validated_pages if p.get("topic_category") == category_filter]
if not validated_pages:
_log("ERROR", f"No pages match category '{category_filter}'", level="ERROR")
return [], 3
# Load existing moments fixture (the original moments the page was built from)
try:
existing_fixture = load_fixture(existing_fixture_path)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"Existing fixture error: {exc}", level="ERROR")
return [], 3
# Load new moments fixture
try:
new_fixture = load_fixture(new_fixture_path)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"New fixture error: {exc}", level="ERROR")
return [], 3
# Load prompt
prompt_file = Path(prompt_path)
if not prompt_file.exists():
_log("ERROR", f"Prompt file not found: {prompt_path}", level="ERROR")
return [], 3
system_prompt = prompt_file.read_text(encoding="utf-8")
_log("PROMPT", f"Loading compose prompt: {prompt_path} ({len(system_prompt)} chars)")
# Setup LLM
settings = get_settings()
llm = LLMClient(settings)
stage_model = model_override or settings.llm_stage5_model or settings.llm_model
stage_modality = modality or settings.llm_stage5_modality or "thinking"
hard_limit = settings.llm_max_tokens_hard_limit
_log("LLM", f"Model: {stage_model}, modality: {stage_modality}, hard_limit: {hard_limit}")
all_pages: list[dict] = []
exit_code = 0
for page_idx, existing_page in enumerate(validated_pages, 1):
page_category = existing_page.get("topic_category", "Uncategorized")
page_title = existing_page.get("title", "Untitled")
_log("COMPOSE", f"Page {page_idx}/{len(validated_pages)}: '{page_title}' ({page_category})")
# Get existing moments for this page's category
existing_moments = existing_fixture.groups.get(page_category, [])
if not existing_moments:
_log("WARN", f" No existing moments found for category '{page_category}' — skipping", level="WARN")
continue
# Get new moments for this page's category
new_moments = new_fixture.groups.get(page_category, [])
if not new_moments:
_log("WARN", f" No new moments for category '{page_category}' — nothing to compose", level="WARN")
all_pages.append(existing_page)
continue
n_existing = len(existing_moments)
n_new = len(new_moments)
total_moments = n_existing + n_new
# Before metrics
before_words = _count_page_words(existing_page)
before_sections = len(existing_page.get("body_sections", []))
_log(
"COMPOSE",
f" Existing: {n_existing} moments, {before_sections} sections, {before_words} words | "
f"New: {n_new} moments | Total citation space: [0]-[{total_moments - 1}]",
)
# Build compose prompt
user_prompt = build_compose_prompt(
existing_page=existing_page,
existing_moments=existing_moments,
new_moments=new_moments,
creator_name=existing_fixture.creator_name,
)
estimated_tokens = estimate_max_tokens(
system_prompt, user_prompt,
stage="stage5_synthesis",
hard_limit=hard_limit,
)
_log("COMPOSE", f" Prompt built: {len(user_prompt)} chars, max_tokens={estimated_tokens}")
# Call LLM
call_start = time.monotonic()
_log("LLM", f" Calling: model={stage_model}, max_tokens={estimated_tokens}, modality={stage_modality}")
try:
raw = llm.complete(
system_prompt,
user_prompt,
response_model=SynthesisResult,
modality=stage_modality,
model_override=stage_model,
max_tokens=estimated_tokens,
)
except Exception as exc:
_log("ERROR", f" LLM call failed: {exc}", level="ERROR")
exit_code = 1
continue
call_duration_ms = int((time.monotonic() - call_start) * 1000)
prompt_tokens = getattr(raw, "prompt_tokens", None) or 0
completion_tokens = getattr(raw, "completion_tokens", None) or 0
finish_reason = getattr(raw, "finish_reason", "unknown")
_log(
"LLM",
f" Response: {prompt_tokens} prompt + {completion_tokens} completion tokens, "
f"{call_duration_ms}ms, finish_reason={finish_reason}",
)
if finish_reason == "length":
_log("WARN", " finish_reason=length — output likely truncated!", level="WARN")
# Parse response
try:
result = SynthesisResult.model_validate_json(str(raw))
except (ValidationError, json.JSONDecodeError) as exc:
_log("ERROR", f" Parse failed: {exc}", level="ERROR")
_log("ERROR", f" Raw response (first 2000 chars): {str(raw)[:2000]}", level="ERROR")
exit_code = 2
continue
# Log compose-specific metrics
for page in result.pages:
page_dict = page.model_dump()
after_words = _count_page_words(page_dict)
after_sections = len(page.body_sections or [])
# Identify new sections (headings not in the original)
existing_headings = {s.get("heading", "") for s in existing_page.get("body_sections", [])}
new_section_headings = [
s.heading for s in (page.body_sections or []) if s.heading not in existing_headings
]
_log(
"COMPOSE",
f" Result: '{page.title}' — "
f"words {before_words}→{after_words} ({after_words - before_words:+d}), "
f"sections {before_sections}→{after_sections} ({after_sections - before_sections:+d})"
+ (f", new sections: {new_section_headings}" if new_section_headings else ""),
)
# Citation validation with unified moment count
cit = validate_citations(page.body_sections or [], total_moments)
_log(
"CITE",
f" Citations: {cit['total_citations']}/{total_moments} moments cited "
f"({cit['coverage_pct']}% coverage)"
+ (f", invalid indices: {cit['invalid_indices']}" if cit['invalid_indices'] else "")
+ (f", uncited: {cit['uncited_moments']}" if cit['uncited_moments'] else ""),
)
all_pages.append(page_dict)
_log("SUMMARY", f"Compose complete: {len(all_pages)} pages")
return all_pages, exit_code
# ── Promote: deploy a prompt to production ─────────────────────────────────
_STAGE_PROMPT_MAP = {
2: "stage2_segmentation.txt",
3: "stage3_extraction.txt",
4: "stage4_classification.txt",
5: "stage5_synthesis.txt",
}
def promote_prompt(prompt_path: str, stage: int, reason: str, commit: bool = False) -> int:
"""Copy a winning prompt to the canonical path and create a backup.
The worker reads prompts from disk at runtime — no restart needed.
"""
import hashlib
import shutil
if stage not in _STAGE_PROMPT_MAP:
_log("ERROR", f"Invalid stage {stage}. Valid: {sorted(_STAGE_PROMPT_MAP)}", level="ERROR")
return 1
settings = get_settings()
template_name = _STAGE_PROMPT_MAP[stage]
canonical = Path(settings.prompts_path) / template_name
source = Path(prompt_path)
if not source.exists():
_log("ERROR", f"Source prompt not found: {prompt_path}", level="ERROR")
return 1
new_prompt = source.read_text(encoding="utf-8")
new_hash = hashlib.sha256(new_prompt.encode()).hexdigest()[:12]
# Backup current prompt
old_prompt = ""
old_hash = "none"
if canonical.exists():
old_prompt = canonical.read_text(encoding="utf-8")
old_hash = hashlib.sha256(old_prompt.encode()).hexdigest()[:12]
if old_prompt.strip() == new_prompt.strip():
_log("PROMOTE", "No change — new prompt is identical to current prompt")
return 0
archive_dir = Path(settings.prompts_path) / "archive"
archive_dir.mkdir(parents=True, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
backup = archive_dir / f"{template_name.replace('.txt', '')}_{ts}.txt"
shutil.copy2(canonical, backup)
_log("PROMOTE", f"Backed up current prompt: {old_hash} -> {backup}")
# Write new prompt
canonical.write_text(new_prompt, encoding="utf-8")
old_lines = old_prompt.strip().splitlines()
new_lines = new_prompt.strip().splitlines()
_log("PROMOTE", f"Installed new prompt: {new_hash} ({len(new_prompt)} chars, {len(new_lines)} lines)")
_log("PROMOTE", f"Previous: {old_hash} ({len(old_prompt)} chars, {len(old_lines)} lines)")
_log("PROMOTE", f"Reason: {reason}")
_log("PROMOTE", "Worker reads prompts from disk at runtime — no restart needed")
if commit:
import subprocess
try:
subprocess.run(
["git", "add", str(canonical)],
cwd=str(canonical.parent.parent),
check=True, capture_output=True,
)
msg = f"prompt: promote stage{stage} — {reason}"
subprocess.run(
["git", "commit", "-m", msg],
cwd=str(canonical.parent.parent),
check=True, capture_output=True,
)
_log("PROMOTE", f"Git commit created: {msg}")
except subprocess.CalledProcessError as exc:
_log("PROMOTE", f"Git commit failed: {exc}", level="WARN")
return 0
# ── CLI ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(
prog="pipeline.test_harness",
description="Offline prompt test harness for Chrysopedia synthesis",
)
sub = parser.add_subparsers(dest="command")
# -- run subcommand (default behavior) --
run_parser = sub.add_parser("run", help="Run synthesis against a fixture")
run_parser.add_argument("--fixture", "-f", type=str, required=True, help="Fixture JSON file")
run_parser.add_argument("--prompt", "-p", type=str, default=None, help="Prompt file (default: stage5_synthesis.txt)")
run_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
run_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
run_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
run_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
# -- promote subcommand --
promo_parser = sub.add_parser("promote", help="Deploy a winning prompt to production")
promo_parser.add_argument("--prompt", "-p", type=str, required=True, help="Path to the winning prompt file")
promo_parser.add_argument("--stage", "-s", type=int, default=5, help="Stage number (default: 5)")
promo_parser.add_argument("--reason", "-r", type=str, required=True, help="Why this prompt is being promoted")
promo_parser.add_argument("--commit", action="store_true", help="Also create a git commit")
# -- compose subcommand --
compose_parser = sub.add_parser("compose", help="Merge new moments into an existing page")
compose_parser.add_argument("--existing-page", type=str, required=True, help="Existing page JSON (harness output or raw SynthesizedPage)")
compose_parser.add_argument("--fixture", "-f", type=str, required=True, help="New moments fixture JSON")
compose_parser.add_argument("--existing-fixture", type=str, required=True, help="Original moments fixture JSON (for citation context)")
compose_parser.add_argument("--prompt", "-p", type=str, default=None, help="Compose prompt file (default: stage5_compose.txt)")
compose_parser.add_argument("--output", "-o", type=str, default=None, help="Output file path")
compose_parser.add_argument("--category", "-c", type=str, default=None, help="Filter to a specific category")
compose_parser.add_argument("--model", type=str, default=None, help="Override LLM model")
compose_parser.add_argument("--modality", type=str, default=None, choices=["chat", "thinking"])
args = parser.parse_args()
# If no subcommand, check for --fixture for backward compat
if args.command is None:
# Support running without subcommand for backward compat
parser.print_help()
return 1
if args.command == "promote":
return promote_prompt(args.prompt, args.stage, args.reason, args.commit)
if args.command == "compose":
# Resolve default compose prompt
prompt_path = args.prompt
if prompt_path is None:
settings = get_settings()
prompt_path = str(Path(settings.prompts_path) / "stage5_compose.txt")
overall_start = time.monotonic()
pages, exit_code = run_compose(
existing_page_path=args.existing_page,
existing_fixture_path=args.existing_fixture,
new_fixture_path=args.fixture,
prompt_path=prompt_path,
category_filter=args.category,
model_override=args.model,
modality=args.modality,
)
if not pages and exit_code != 0:
return exit_code
output = {
"existing_page_source": args.existing_page,
"existing_fixture_source": args.existing_fixture,
"new_fixture_source": args.fixture,
"prompt_source": prompt_path,
"category_filter": args.category,
"pages": pages,
"metadata": {
"page_count": len(pages),
"total_words": sum(_count_page_words(p) for p in pages),
"elapsed_seconds": round(time.monotonic() - overall_start, 1),
},
}
output_json = json.dumps(output, indent=2, ensure_ascii=False)
if args.output:
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
Path(args.output).write_text(output_json, encoding="utf-8")
_log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
else:
print(output_json)
_log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
total_elapsed = time.monotonic() - overall_start
_log("DONE", f"Compose completed in {total_elapsed:.1f}s (exit_code={exit_code})")
return exit_code
# -- run command --
prompt_path = args.prompt
if prompt_path is None:
settings = get_settings()
prompt_path = str(Path(settings.prompts_path) / "stage5_synthesis.txt")
overall_start = time.monotonic()
try:
fixture = load_fixture(args.fixture)
except (FileNotFoundError, ValueError, json.JSONDecodeError) as exc:
_log("ERROR", f"Fixture error: {exc}", level="ERROR")
return 3
pages, exit_code = run_synthesis(
fixture=fixture,
prompt_path=prompt_path,
category_filter=args.category,
model_override=args.model,
modality=args.modality,
)
if not pages and exit_code != 0:
return exit_code
output = {
"fixture_source": args.fixture,
"prompt_source": prompt_path,
"creator_name": fixture.creator_name,
"video_id": fixture.video_id,
"category_filter": args.category,
"pages": pages,
"metadata": {
"page_count": len(pages),
"total_words": sum(
sum(
len(s.get("content", "").split())
+ sum(len(sub.get("content", "").split()) for sub in s.get("subsections", []))
for s in p.get("body_sections", [])
)
for p in pages
),
"elapsed_seconds": round(time.monotonic() - overall_start, 1),
},
}
output_json = json.dumps(output, indent=2, ensure_ascii=False)
if args.output:
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
Path(args.output).write_text(output_json, encoding="utf-8")
_log("OUTPUT", f"Written to: {args.output} ({len(output_json) / 1024:.1f} KB)")
else:
print(output_json)
_log("OUTPUT", f"Printed to stdout ({len(output_json) / 1024:.1f} KB)")
total_elapsed = time.monotonic() - overall_start
_log("DONE", f"Completed in {total_elapsed:.1f}s (exit_code={exit_code})")
return exit_code
if __name__ == "__main__":
sys.exit(main())