- "backend/pipeline/quality/chat_scorer.py" - "backend/pipeline/quality/chat_eval.py" - "backend/pipeline/quality/fixtures/chat_test_suite.yaml" - "backend/pipeline/quality/__main__.py" GSD-Task: S09/T01
646 lines
21 KiB
Python
646 lines
21 KiB
Python
"""FYN-LLM quality assurance toolkit.
|
|
|
|
Subcommands:
|
|
fitness — Run LLM fitness tests across four categories
|
|
score — Score a Stage 5 technique page across 5 quality dimensions
|
|
optimize — Automated prompt optimization loop with leaderboard output
|
|
|
|
Run with: python -m pipeline.quality <command>
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from config import get_settings
|
|
from pipeline.llm_client import LLMClient
|
|
|
|
from .chat_eval import ChatEvalRunner
|
|
from .chat_scorer import ChatScoreRunner
|
|
from .fitness import FitnessRunner
|
|
from .optimizer import OptimizationLoop, OptimizationResult
|
|
from .scorer import DIMENSIONS, STAGE_CONFIGS, ScoreRunner
|
|
|
|
|
|
# ── Reporting helpers ────────────────────────────────────────────────────────
|
|
|
|
|
|
def print_leaderboard(result: OptimizationResult, stage: int = 5) -> None:
|
|
"""Print a formatted leaderboard of top 5 variants by composite score."""
|
|
dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS
|
|
|
|
# Filter to entries that actually scored (no errors)
|
|
scored = [h for h in result.history if not h.get("error")]
|
|
if not scored:
|
|
print("\n No successfully scored variants to rank.\n")
|
|
return
|
|
|
|
ranked = sorted(scored, key=lambda h: h["composite"], reverse=True)[:5]
|
|
|
|
print(f"\n{'='*72}")
|
|
print(f" LEADERBOARD — Top 5 Variants by Composite Score (Stage {stage})")
|
|
print(f"{'='*72}")
|
|
|
|
# Header
|
|
dim_headers = " ".join(f"{d[:5]:>5s}" for d in dims)
|
|
sep_segments = " ".join("─" * 5 for _ in dims)
|
|
print(f" {'#':>2s} {'Label':<16s} {'Comp':>5s} {dim_headers}")
|
|
print(f" {'─'*2} {'─'*16} {'─'*5} {sep_segments}")
|
|
|
|
for i, entry in enumerate(ranked, 1):
|
|
label = entry.get("label", "?")[:16]
|
|
comp = entry["composite"]
|
|
dim_vals = " ".join(
|
|
f"{entry['scores'].get(d, 0.0):5.2f}" for d in dims
|
|
)
|
|
bar = "█" * int(comp * 20) + "░" * (20 - int(comp * 20))
|
|
print(f" {i:>2d} {label:<16s} {comp:5.3f} {dim_vals} {bar}")
|
|
|
|
print(f"{'='*72}\n")
|
|
|
|
|
|
def print_trajectory(result: OptimizationResult) -> None:
|
|
"""Print an ASCII chart of composite score across iterations."""
|
|
scored = [h for h in result.history if not h.get("error")]
|
|
if len(scored) < 2:
|
|
print(" (Not enough data points for trajectory chart)\n")
|
|
return
|
|
|
|
# Get the best composite per iteration
|
|
iter_best: dict[int, float] = {}
|
|
for h in scored:
|
|
it = h["iteration"]
|
|
if it not in iter_best or h["composite"] > iter_best[it]:
|
|
iter_best[it] = h["composite"]
|
|
|
|
iterations = sorted(iter_best.keys())
|
|
values = [iter_best[it] for it in iterations]
|
|
|
|
# Chart dimensions
|
|
chart_height = 15
|
|
min_val = max(0.0, min(values) - 0.05)
|
|
max_val = min(1.0, max(values) + 0.05)
|
|
val_range = max_val - min_val
|
|
if val_range < 0.01:
|
|
val_range = 0.1
|
|
min_val = max(0.0, values[0] - 0.05)
|
|
max_val = min_val + val_range
|
|
|
|
print(f" {'─'*50}")
|
|
print(" SCORE TRAJECTORY — Best Composite per Iteration")
|
|
print(f" {'─'*50}")
|
|
print()
|
|
|
|
# Render rows top to bottom
|
|
for row in range(chart_height, -1, -1):
|
|
threshold = min_val + (row / chart_height) * val_range
|
|
# Y-axis label every 5 rows
|
|
if row % 5 == 0:
|
|
label = f"{threshold:.2f}"
|
|
else:
|
|
label = " "
|
|
line = f" {label} │"
|
|
|
|
for vi, val in enumerate(values):
|
|
normalized = (val - min_val) / val_range
|
|
filled_rows = int(normalized * chart_height)
|
|
if filled_rows >= row:
|
|
line += " ● "
|
|
else:
|
|
line += " · "
|
|
|
|
print(line)
|
|
|
|
# X-axis
|
|
print(f" ───── ┼{'───' * len(values)}")
|
|
x_labels = " " + " "
|
|
for it in iterations:
|
|
x_labels += f"{it:>2d} "
|
|
print(x_labels)
|
|
print(" " + " iteration →")
|
|
print()
|
|
|
|
|
|
def write_results_json(
|
|
result: OptimizationResult,
|
|
output_dir: str,
|
|
stage: int,
|
|
iterations: int,
|
|
variants_per_iter: int,
|
|
fixture_path: str,
|
|
) -> str:
|
|
"""Write optimization results to a timestamped JSON file. Returns the path."""
|
|
out_path = Path(output_dir)
|
|
out_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
filename = f"optimize_stage{stage}_{timestamp}.json"
|
|
filepath = out_path / filename
|
|
|
|
dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS
|
|
|
|
payload = {
|
|
"config": {
|
|
"stage": stage,
|
|
"iterations": iterations,
|
|
"variants_per_iter": variants_per_iter,
|
|
"fixture_path": fixture_path,
|
|
},
|
|
"best_prompt": result.best_prompt,
|
|
"best_scores": {
|
|
"composite": result.best_score.composite,
|
|
**{d: result.best_score.scores.get(d, 0.0) for d in dims},
|
|
},
|
|
"elapsed_seconds": result.elapsed_seconds,
|
|
"history": result.history,
|
|
}
|
|
|
|
filepath.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
return str(filepath)
|
|
|
|
|
|
# ── CLI ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="pipeline.quality",
|
|
description="FYN-LLM quality assurance toolkit",
|
|
)
|
|
sub = parser.add_subparsers(dest="command")
|
|
|
|
# -- fitness subcommand --
|
|
sub.add_parser("fitness", help="Run LLM fitness tests across four categories")
|
|
|
|
# -- score subcommand --
|
|
score_parser = sub.add_parser(
|
|
"score",
|
|
help="Score a Stage 5 technique page across 5 quality dimensions",
|
|
)
|
|
source_group = score_parser.add_mutually_exclusive_group(required=True)
|
|
source_group.add_argument(
|
|
"--file",
|
|
type=str,
|
|
help="Path to a moments JSON file (creator_name, moments array)",
|
|
)
|
|
source_group.add_argument(
|
|
"--slug",
|
|
type=str,
|
|
help="Technique slug to load from the database",
|
|
)
|
|
score_parser.add_argument(
|
|
"--voice-level",
|
|
type=float,
|
|
default=None,
|
|
help="Voice preservation dial (0.0=clinical, 1.0=maximum voice). Triggers re-synthesis before scoring.",
|
|
)
|
|
|
|
# -- optimize subcommand --
|
|
opt_parser = sub.add_parser(
|
|
"optimize",
|
|
help="Automated prompt optimization loop with leaderboard output",
|
|
)
|
|
|
|
# -- apply subcommand --
|
|
apply_parser = sub.add_parser(
|
|
"apply",
|
|
help="Apply a winning prompt from optimization results to the stage's prompt file",
|
|
)
|
|
apply_parser.add_argument(
|
|
"results_file",
|
|
type=str,
|
|
help="Path to an optimization results JSON file",
|
|
)
|
|
apply_parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
default=False,
|
|
help="Show what would change without writing",
|
|
)
|
|
opt_parser.add_argument(
|
|
"--stage",
|
|
type=int,
|
|
default=5,
|
|
help="Pipeline stage to optimize (default: 5)",
|
|
)
|
|
opt_parser.add_argument(
|
|
"--iterations",
|
|
type=int,
|
|
default=10,
|
|
help="Number of optimization iterations (default: 10)",
|
|
)
|
|
opt_parser.add_argument(
|
|
"--variants-per-iter",
|
|
type=int,
|
|
default=2,
|
|
help="Variants generated per iteration (default: 2)",
|
|
)
|
|
opt_source = opt_parser.add_mutually_exclusive_group(required=True)
|
|
opt_source.add_argument(
|
|
"--file",
|
|
type=str,
|
|
help="Path to moments JSON fixture file",
|
|
)
|
|
opt_source.add_argument(
|
|
"--video-id",
|
|
type=str,
|
|
help="Video UUID — exports fixture from DB automatically (requires DATABASE_URL, REDIS_URL)",
|
|
)
|
|
opt_parser.add_argument(
|
|
"--output-dir",
|
|
type=str,
|
|
default="backend/pipeline/quality/results/",
|
|
help="Directory to write result JSON (default: backend/pipeline/quality/results/)",
|
|
)
|
|
opt_parser.add_argument(
|
|
"--apply",
|
|
action="store_true",
|
|
default=False,
|
|
help="Write the winning prompt back to the stage's prompt file (backs up the original first)",
|
|
)
|
|
|
|
# -- chat_eval subcommand --
|
|
chat_parser = sub.add_parser(
|
|
"chat_eval",
|
|
help="Evaluate chat quality across a test suite of queries",
|
|
)
|
|
chat_parser.add_argument(
|
|
"--suite",
|
|
type=str,
|
|
required=True,
|
|
help="Path to a chat test suite YAML/JSON file",
|
|
)
|
|
chat_parser.add_argument(
|
|
"--base-url",
|
|
type=str,
|
|
default="http://localhost:8096",
|
|
help="Chat API base URL (default: http://localhost:8096)",
|
|
)
|
|
chat_parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
default="backend/pipeline/quality/results/",
|
|
help="Output path for results JSON (default: backend/pipeline/quality/results/)",
|
|
)
|
|
chat_parser.add_argument(
|
|
"--timeout",
|
|
type=float,
|
|
default=120.0,
|
|
help="Request timeout in seconds (default: 120)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command is None:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
if args.command == "fitness":
|
|
settings = get_settings()
|
|
client = LLMClient(settings)
|
|
runner = FitnessRunner(client)
|
|
return runner.run_all()
|
|
|
|
if args.command == "score":
|
|
return _run_score(args)
|
|
|
|
if args.command == "optimize":
|
|
return _run_optimize(args)
|
|
|
|
if args.command == "apply":
|
|
return _run_apply(args)
|
|
|
|
if args.command == "chat_eval":
|
|
return _run_chat_eval(args)
|
|
|
|
return 0
|
|
|
|
|
|
def _run_score(args: argparse.Namespace) -> int:
|
|
"""Execute the score subcommand."""
|
|
# -- Load source data --
|
|
if args.slug:
|
|
print("DB loading not yet implemented", file=sys.stderr)
|
|
return 1
|
|
|
|
try:
|
|
with open(args.file) as f:
|
|
data = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"File not found: {args.file}", file=sys.stderr)
|
|
return 1
|
|
except json.JSONDecodeError as exc:
|
|
print(f"Invalid JSON in {args.file}: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
moments = data.get("moments", [])
|
|
creator_name = data.get("creator_name", "Unknown")
|
|
|
|
if not moments:
|
|
print("No moments found in input file", file=sys.stderr)
|
|
return 1
|
|
|
|
settings = get_settings()
|
|
client = LLMClient(settings)
|
|
runner = ScoreRunner(client)
|
|
|
|
# -- Voice-level mode: re-synthesize then score --
|
|
if args.voice_level is not None:
|
|
voice_level = args.voice_level
|
|
if not (0.0 <= voice_level <= 1.0):
|
|
print("--voice-level must be between 0.0 and 1.0", file=sys.stderr)
|
|
return 1
|
|
|
|
print(f"\nRe-synthesizing + scoring for '{creator_name}' ({len(moments)} moments, voice_level={voice_level})...")
|
|
result = runner.synthesize_and_score(moments, creator_name, voice_level)
|
|
|
|
if result.error:
|
|
runner.print_report(result)
|
|
return 1
|
|
|
|
runner.print_report(result)
|
|
return 0
|
|
|
|
# -- Standard mode: build page stub from moments, score directly --
|
|
page_json = {
|
|
"title": f"{creator_name} — Technique Page",
|
|
"creator_name": creator_name,
|
|
"summary": f"Technique page synthesized from {len(moments)} key moments.",
|
|
"body_sections": [
|
|
{
|
|
"heading": m.get("topic_tags", ["Technique"])[0] if m.get("topic_tags") else "Technique",
|
|
"content": m.get("summary", "") + "\n\n" + m.get("transcript_excerpt", ""),
|
|
}
|
|
for m in moments
|
|
],
|
|
}
|
|
|
|
print(f"\nScoring page for '{creator_name}' ({len(moments)} moments)...")
|
|
|
|
result = runner.score_page(page_json, moments)
|
|
|
|
if result.error:
|
|
runner.print_report(result)
|
|
return 1
|
|
|
|
runner.print_report(result)
|
|
return 0
|
|
|
|
|
|
def _run_optimize(args: argparse.Namespace) -> int:
|
|
"""Execute the optimize subcommand."""
|
|
# Stage validation — stages 2-5 are supported
|
|
if args.stage not in STAGE_CONFIGS:
|
|
print(
|
|
f"Error: unsupported stage {args.stage}. Valid stages: {sorted(STAGE_CONFIGS)}",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
# Resolve fixture: either from --file or auto-export from --video-id
|
|
fixture_path: str
|
|
if args.file:
|
|
fixture_path = args.file
|
|
else:
|
|
# Auto-export from database
|
|
print(f"\n[OPTIMIZE] Exporting fixture from video_id={args.video_id}...", file=sys.stderr)
|
|
import tempfile
|
|
from pipeline.export_fixture import export_fixture
|
|
|
|
settings = get_settings()
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="optimize_fixture_", delete=False)
|
|
tmp.close()
|
|
exit_code = export_fixture(
|
|
database_url=settings.database_url,
|
|
redis_url=settings.redis_url,
|
|
video_id=args.video_id,
|
|
output_path=tmp.name,
|
|
)
|
|
if exit_code != 0:
|
|
print(f"Error: fixture export failed (exit code {exit_code})", file=sys.stderr)
|
|
return 1
|
|
fixture_path = tmp.name
|
|
print(f"[OPTIMIZE] Fixture exported to: {fixture_path}", file=sys.stderr)
|
|
|
|
fixture = Path(fixture_path)
|
|
if not fixture.exists():
|
|
print(f"Error: fixture file not found: {fixture_path}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Ensure output dir
|
|
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
settings = get_settings()
|
|
client = LLMClient(settings)
|
|
|
|
loop = OptimizationLoop(
|
|
client=client,
|
|
stage=args.stage,
|
|
fixture_path=fixture_path,
|
|
iterations=args.iterations,
|
|
variants_per_iter=args.variants_per_iter,
|
|
output_dir=args.output_dir,
|
|
)
|
|
|
|
try:
|
|
result = loop.run()
|
|
except KeyboardInterrupt:
|
|
print("\n Optimization interrupted by user.", file=sys.stderr)
|
|
return 130
|
|
except Exception as exc:
|
|
print(f"\nError: optimization failed: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
# If the loop returned an error on baseline, report and exit
|
|
if result.best_score.error and not result.history:
|
|
print(f"\nError: {result.best_score.error}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Reporting
|
|
print_leaderboard(result, stage=args.stage)
|
|
print_trajectory(result)
|
|
|
|
# Write results JSON
|
|
try:
|
|
json_path = write_results_json(
|
|
result=result,
|
|
output_dir=args.output_dir,
|
|
stage=args.stage,
|
|
iterations=args.iterations,
|
|
variants_per_iter=args.variants_per_iter,
|
|
fixture_path=fixture_path,
|
|
)
|
|
print(f" Results written to: {json_path}")
|
|
except OSError as exc:
|
|
print(f" Warning: failed to write results JSON: {exc}", file=sys.stderr)
|
|
|
|
# Apply winning prompt if requested
|
|
if args.apply:
|
|
baseline_composite = 0.0
|
|
for h in result.history:
|
|
if h.get("label") == "baseline" and not h.get("error"):
|
|
baseline_composite = h["composite"]
|
|
break
|
|
|
|
if result.best_score.composite <= baseline_composite:
|
|
print("\n --apply: Best prompt did not beat baseline — skipping apply.")
|
|
elif result.best_score.error:
|
|
print("\n --apply: Best result has an error — skipping apply.")
|
|
else:
|
|
print("\n --apply: Winning prompt beat baseline — applying...")
|
|
success, msg = apply_prompt(args.stage, result.best_prompt)
|
|
print(f" {msg}")
|
|
if not success:
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def apply_prompt(stage: int, new_prompt: str, dry_run: bool = False) -> tuple[bool, str]:
|
|
"""Apply a new prompt to a stage's prompt file. Returns (success, message).
|
|
|
|
Creates a timestamped backup of the original before overwriting.
|
|
"""
|
|
if stage not in STAGE_CONFIGS:
|
|
return False, f"Unsupported stage {stage}. Valid: {sorted(STAGE_CONFIGS)}"
|
|
|
|
config = STAGE_CONFIGS[stage]
|
|
settings = get_settings()
|
|
prompt_path = Path(settings.prompts_path) / config.prompt_file
|
|
|
|
if not prompt_path.exists():
|
|
return False, f"Prompt file not found: {prompt_path}"
|
|
|
|
original = prompt_path.read_text(encoding="utf-8")
|
|
|
|
if original.strip() == new_prompt.strip():
|
|
return True, "No change — winning prompt is identical to current prompt."
|
|
|
|
# Show diff summary
|
|
orig_lines = original.strip().splitlines()
|
|
new_lines = new_prompt.strip().splitlines()
|
|
print(f"\n Prompt file: {prompt_path}")
|
|
print(f" Original: {len(orig_lines)} lines, {len(original)} chars")
|
|
print(f" New: {len(new_lines)} lines, {len(new_prompt)} chars")
|
|
|
|
# Simple line-level diff summary
|
|
import difflib
|
|
diff = list(difflib.unified_diff(orig_lines, new_lines, lineterm="", n=2))
|
|
added = sum(1 for l in diff if l.startswith("+") and not l.startswith("+++"))
|
|
removed = sum(1 for l in diff if l.startswith("-") and not l.startswith("---"))
|
|
print(f" Changes: +{added} lines, -{removed} lines")
|
|
|
|
if dry_run:
|
|
print("\n [DRY RUN] Would write to:", prompt_path)
|
|
if len(diff) <= 40:
|
|
print()
|
|
for line in diff:
|
|
print(f" {line}")
|
|
else:
|
|
print(f"\n (diff is {len(diff)} lines — showing first 30)")
|
|
for line in diff[:30]:
|
|
print(f" {line}")
|
|
print(" ...")
|
|
return True, "Dry run — no files modified."
|
|
|
|
# Backup original
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
backup_path = prompt_path.with_suffix(f".{timestamp}.bak")
|
|
backup_path.write_text(original, encoding="utf-8")
|
|
print(f" Backup: {backup_path}")
|
|
|
|
# Write new prompt
|
|
prompt_path.write_text(new_prompt, encoding="utf-8")
|
|
print(f" ✓ Written: {prompt_path}")
|
|
|
|
return True, f"Prompt applied. Backup at {backup_path}"
|
|
|
|
|
|
def _run_apply(args: argparse.Namespace) -> int:
|
|
"""Execute the apply subcommand — read a results JSON and apply the winning prompt."""
|
|
results_path = Path(args.results_file)
|
|
if not results_path.exists():
|
|
print(f"Error: results file not found: {args.results_file}", file=sys.stderr)
|
|
return 1
|
|
|
|
try:
|
|
data = json.loads(results_path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
print(f"Error: invalid JSON in {args.results_file}: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
stage = data.get("config", {}).get("stage")
|
|
best_prompt = data.get("best_prompt", "")
|
|
best_scores = data.get("best_scores", {})
|
|
|
|
if not stage:
|
|
print("Error: results JSON missing config.stage", file=sys.stderr)
|
|
return 1
|
|
if not best_prompt:
|
|
print("Error: results JSON missing best_prompt or it's empty", file=sys.stderr)
|
|
return 1
|
|
|
|
composite = best_scores.get("composite", 0)
|
|
print(f"\n Applying results from: {results_path}")
|
|
print(f" Stage: {stage}")
|
|
print(f" Best composite score: {composite:.3f}")
|
|
|
|
success, msg = apply_prompt(stage, best_prompt, dry_run=args.dry_run)
|
|
print(f"\n {msg}")
|
|
return 0 if success else 1
|
|
|
|
|
|
def _run_chat_eval(args: argparse.Namespace) -> int:
|
|
"""Execute the chat_eval subcommand — evaluate chat quality across a test suite."""
|
|
suite_path = Path(args.suite)
|
|
if not suite_path.exists():
|
|
print(f"Error: suite file not found: {args.suite}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Load test cases
|
|
try:
|
|
cases = ChatEvalRunner.load_suite(suite_path)
|
|
except Exception as exc:
|
|
print(f"Error loading test suite: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
if not cases:
|
|
print("Error: test suite contains no queries", file=sys.stderr)
|
|
return 1
|
|
|
|
print(f"\n Chat Evaluation: {len(cases)} queries from {suite_path}")
|
|
print(f" Endpoint: {args.base_url}")
|
|
|
|
# Build scorer and runner
|
|
settings = get_settings()
|
|
client = LLMClient(settings)
|
|
scorer = ChatScoreRunner(client)
|
|
runner = ChatEvalRunner(
|
|
scorer=scorer,
|
|
base_url=args.base_url,
|
|
timeout=args.timeout,
|
|
)
|
|
|
|
# Execute
|
|
results = runner.run_suite(cases)
|
|
|
|
# Print summary
|
|
runner.print_summary(results)
|
|
|
|
# Write results
|
|
try:
|
|
json_path = runner.write_results(results, args.output)
|
|
print(f" Results written to: {json_path}")
|
|
except OSError as exc:
|
|
print(f" Warning: failed to write results: {exc}", file=sys.stderr)
|
|
|
|
# Exit code: 0 if at least one scored, 1 if all errored
|
|
scored = [r for r in results if r.score and not r.score.error and not r.request_error]
|
|
return 0 if scored else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|