"""FYN-LLM quality assurance toolkit. Subcommands: fitness — Run LLM fitness tests across four categories score — Score a Stage 5 technique page across 5 quality dimensions optimize — Automated prompt optimization loop with leaderboard output Run with: python -m pipeline.quality """ from __future__ import annotations import argparse import json import sys from datetime import datetime, timezone from pathlib import Path from config import get_settings from pipeline.llm_client import LLMClient from .fitness import FitnessRunner from .optimizer import OptimizationLoop, OptimizationResult from .scorer import DIMENSIONS, STAGE_CONFIGS, ScoreRunner # ── Reporting helpers ──────────────────────────────────────────────────────── def print_leaderboard(result: OptimizationResult, stage: int = 5) -> None: """Print a formatted leaderboard of top 5 variants by composite score.""" dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS # Filter to entries that actually scored (no errors) scored = [h for h in result.history if not h.get("error")] if not scored: print("\n No successfully scored variants to rank.\n") return ranked = sorted(scored, key=lambda h: h["composite"], reverse=True)[:5] print(f"\n{'='*72}") print(f" LEADERBOARD — Top 5 Variants by Composite Score (Stage {stage})") print(f"{'='*72}") # Header dim_headers = " ".join(f"{d[:5]:>5s}" for d in dims) sep_segments = " ".join("─" * 5 for _ in dims) print(f" {'#':>2s} {'Label':<16s} {'Comp':>5s} {dim_headers}") print(f" {'─'*2} {'─'*16} {'─'*5} {sep_segments}") for i, entry in enumerate(ranked, 1): label = entry.get("label", "?")[:16] comp = entry["composite"] dim_vals = " ".join( f"{entry['scores'].get(d, 0.0):5.2f}" for d in dims ) bar = "█" * int(comp * 20) + "░" * (20 - int(comp * 20)) print(f" {i:>2d} {label:<16s} {comp:5.3f} {dim_vals} {bar}") print(f"{'='*72}\n") def print_trajectory(result: OptimizationResult) -> None: """Print an ASCII chart of composite score across iterations.""" scored = [h for h in result.history if not h.get("error")] if len(scored) < 2: print(" (Not enough data points for trajectory chart)\n") return # Get the best composite per iteration iter_best: dict[int, float] = {} for h in scored: it = h["iteration"] if it not in iter_best or h["composite"] > iter_best[it]: iter_best[it] = h["composite"] iterations = sorted(iter_best.keys()) values = [iter_best[it] for it in iterations] # Chart dimensions chart_height = 15 min_val = max(0.0, min(values) - 0.05) max_val = min(1.0, max(values) + 0.05) val_range = max_val - min_val if val_range < 0.01: val_range = 0.1 min_val = max(0.0, values[0] - 0.05) max_val = min_val + val_range print(f" {'─'*50}") print(" SCORE TRAJECTORY — Best Composite per Iteration") print(f" {'─'*50}") print() # Render rows top to bottom for row in range(chart_height, -1, -1): threshold = min_val + (row / chart_height) * val_range # Y-axis label every 5 rows if row % 5 == 0: label = f"{threshold:.2f}" else: label = " " line = f" {label} │" for vi, val in enumerate(values): normalized = (val - min_val) / val_range filled_rows = int(normalized * chart_height) if filled_rows >= row: line += " ● " else: line += " · " print(line) # X-axis print(f" ───── ┼{'───' * len(values)}") x_labels = " " + " " for it in iterations: x_labels += f"{it:>2d} " print(x_labels) print(" " + " iteration →") print() def write_results_json( result: OptimizationResult, output_dir: str, stage: int, iterations: int, variants_per_iter: int, fixture_path: str, ) -> str: """Write optimization results to a timestamped JSON file. Returns the path.""" out_path = Path(output_dir) out_path.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") filename = f"optimize_stage{stage}_{timestamp}.json" filepath = out_path / filename dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS payload = { "config": { "stage": stage, "iterations": iterations, "variants_per_iter": variants_per_iter, "fixture_path": fixture_path, }, "best_prompt": result.best_prompt, "best_scores": { "composite": result.best_score.composite, **{d: result.best_score.scores.get(d, 0.0) for d in dims}, }, "elapsed_seconds": result.elapsed_seconds, "history": result.history, } filepath.write_text(json.dumps(payload, indent=2), encoding="utf-8") return str(filepath) # ── CLI ────────────────────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser( prog="pipeline.quality", description="FYN-LLM quality assurance toolkit", ) sub = parser.add_subparsers(dest="command") # -- fitness subcommand -- sub.add_parser("fitness", help="Run LLM fitness tests across four categories") # -- score subcommand -- score_parser = sub.add_parser( "score", help="Score a Stage 5 technique page across 5 quality dimensions", ) source_group = score_parser.add_mutually_exclusive_group(required=True) source_group.add_argument( "--file", type=str, help="Path to a moments JSON file (creator_name, moments array)", ) source_group.add_argument( "--slug", type=str, help="Technique slug to load from the database", ) score_parser.add_argument( "--voice-level", type=float, default=None, help="Voice preservation dial (0.0=clinical, 1.0=maximum voice). Triggers re-synthesis before scoring.", ) # -- optimize subcommand -- opt_parser = sub.add_parser( "optimize", help="Automated prompt optimization loop with leaderboard output", ) opt_parser.add_argument( "--stage", type=int, default=5, help="Pipeline stage to optimize (default: 5)", ) opt_parser.add_argument( "--iterations", type=int, default=10, help="Number of optimization iterations (default: 10)", ) opt_parser.add_argument( "--variants-per-iter", type=int, default=2, help="Variants generated per iteration (default: 2)", ) opt_parser.add_argument( "--file", type=str, required=True, help="Path to moments JSON fixture file", ) opt_parser.add_argument( "--output-dir", type=str, default="backend/pipeline/quality/results/", help="Directory to write result JSON (default: backend/pipeline/quality/results/)", ) args = parser.parse_args() if args.command is None: parser.print_help() return 1 if args.command == "fitness": settings = get_settings() client = LLMClient(settings) runner = FitnessRunner(client) return runner.run_all() if args.command == "score": return _run_score(args) if args.command == "optimize": return _run_optimize(args) return 0 def _run_score(args: argparse.Namespace) -> int: """Execute the score subcommand.""" # -- Load source data -- if args.slug: print("DB loading not yet implemented", file=sys.stderr) return 1 try: with open(args.file) as f: data = json.load(f) except FileNotFoundError: print(f"File not found: {args.file}", file=sys.stderr) return 1 except json.JSONDecodeError as exc: print(f"Invalid JSON in {args.file}: {exc}", file=sys.stderr) return 1 moments = data.get("moments", []) creator_name = data.get("creator_name", "Unknown") if not moments: print("No moments found in input file", file=sys.stderr) return 1 settings = get_settings() client = LLMClient(settings) runner = ScoreRunner(client) # -- Voice-level mode: re-synthesize then score -- if args.voice_level is not None: voice_level = args.voice_level if not (0.0 <= voice_level <= 1.0): print("--voice-level must be between 0.0 and 1.0", file=sys.stderr) return 1 print(f"\nRe-synthesizing + scoring for '{creator_name}' ({len(moments)} moments, voice_level={voice_level})...") result = runner.synthesize_and_score(moments, creator_name, voice_level) if result.error: runner.print_report(result) return 1 runner.print_report(result) return 0 # -- Standard mode: build page stub from moments, score directly -- page_json = { "title": f"{creator_name} — Technique Page", "creator_name": creator_name, "summary": f"Technique page synthesized from {len(moments)} key moments.", "body_sections": [ { "heading": m.get("topic_tags", ["Technique"])[0] if m.get("topic_tags") else "Technique", "content": m.get("summary", "") + "\n\n" + m.get("transcript_excerpt", ""), } for m in moments ], } print(f"\nScoring page for '{creator_name}' ({len(moments)} moments)...") result = runner.score_page(page_json, moments) if result.error: runner.print_report(result) return 1 runner.print_report(result) return 0 def _run_optimize(args: argparse.Namespace) -> int: """Execute the optimize subcommand.""" # Stage validation — stages 2-5 are supported if args.stage not in STAGE_CONFIGS: print( f"Error: unsupported stage {args.stage}. Valid stages: {sorted(STAGE_CONFIGS)}", file=sys.stderr, ) return 1 # Validate fixture file exists fixture = Path(args.file) if not fixture.exists(): print(f"Error: fixture file not found: {args.file}", file=sys.stderr) return 1 # Ensure output dir Path(args.output_dir).mkdir(parents=True, exist_ok=True) settings = get_settings() client = LLMClient(settings) loop = OptimizationLoop( client=client, stage=args.stage, fixture_path=args.file, iterations=args.iterations, variants_per_iter=args.variants_per_iter, ) try: result = loop.run() except KeyboardInterrupt: print("\n Optimization interrupted by user.", file=sys.stderr) return 130 except Exception as exc: print(f"\nError: optimization failed: {exc}", file=sys.stderr) return 1 # If the loop returned an error on baseline, report and exit if result.best_score.error and not result.history: print(f"\nError: {result.best_score.error}", file=sys.stderr) return 1 # Reporting print_leaderboard(result, stage=args.stage) print_trajectory(result) # Write results JSON try: json_path = write_results_json( result=result, output_dir=args.output_dir, stage=args.stage, iterations=args.iterations, variants_per_iter=args.variants_per_iter, fixture_path=args.file, ) print(f" Results written to: {json_path}") except OSError as exc: print(f" Warning: failed to write results JSON: {exc}", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main())