From 0d82b2b4096bb3ec6a5040da613810795efb03da Mon Sep 17 00:00:00 2001 From: jlightner Date: Wed, 1 Apr 2026 09:08:01 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Created=20PromptVariantGenerator=20(LLM?= =?UTF-8?q?-powered=20prompt=20mutation)=20and=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/quality/variant_generator.py" - "backend/pipeline/quality/optimizer.py" GSD-Task: S03/T01 --- backend/pipeline/quality/__init__.py | 11 + backend/pipeline/quality/optimizer.py | 364 ++++++++++++++++++ backend/pipeline/quality/variant_generator.py | 194 ++++++++++ pipeline | 1 + 4 files changed, 570 insertions(+) create mode 100644 backend/pipeline/quality/optimizer.py create mode 100644 backend/pipeline/quality/variant_generator.py create mode 120000 pipeline diff --git a/backend/pipeline/quality/__init__.py b/backend/pipeline/quality/__init__.py index e69de29..6474832 100644 --- a/backend/pipeline/quality/__init__.py +++ b/backend/pipeline/quality/__init__.py @@ -0,0 +1,11 @@ +"""FYN-LLM quality assurance toolkit.""" + +import os +import sys + +# Ensure backend/ is on sys.path so sibling modules (config, pipeline.llm_client) +# resolve when running from the project root via symlink. +_backend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..") +_backend_abs = os.path.normpath(os.path.abspath(_backend_dir)) +if _backend_abs not in sys.path: + sys.path.insert(0, _backend_abs) diff --git a/backend/pipeline/quality/optimizer.py b/backend/pipeline/quality/optimizer.py new file mode 100644 index 0000000..7aebb85 --- /dev/null +++ b/backend/pipeline/quality/optimizer.py @@ -0,0 +1,364 @@ +"""Automated prompt optimization loop for Stage 5 synthesis. + +Orchestrates a generate→score→select cycle: +1. Score the current best prompt against reference fixtures +2. Generate N variants targeting weak dimensions +3. Score each variant +4. Keep the best scorer as the new baseline +5. Repeat for K iterations + +Usage (via CLI): + python -m pipeline.quality optimize --stage 5 --iterations 10 +""" +from __future__ import annotations + +import json +import logging +import time +from dataclasses import dataclass, field +from pathlib import Path + +from pipeline.llm_client import LLMClient +from pipeline.quality.scorer import DIMENSIONS, ScoreResult, ScoreRunner +from pipeline.quality.variant_generator import PromptVariantGenerator + +logger = logging.getLogger(__name__) + + +@dataclass +class OptimizationResult: + """Full result of an optimization run.""" + + best_prompt: str = "" + best_score: ScoreResult = field(default_factory=ScoreResult) + history: list[dict] = field(default_factory=list) + elapsed_seconds: float = 0.0 + + +class OptimizationLoop: + """Runs iterative prompt optimization for a pipeline stage. + + Each iteration generates *variants_per_iter* prompt mutations, + scores each against reference fixture data, and keeps the + highest-composite-scoring variant as the new baseline. + + Parameters + ---------- + client: + LLMClient instance for LLM calls (synthesis + scoring + variant gen). + stage: + Pipeline stage number (currently only 5 is supported). + fixture_path: + Path to a JSON fixture file containing ``creator_name`` and ``moments``. + iterations: + Number of generate→score→select cycles. + variants_per_iter: + Number of variant prompts to generate per iteration. + """ + + def __init__( + self, + client: LLMClient, + stage: int, + fixture_path: str, + iterations: int = 5, + variants_per_iter: int = 2, + ) -> None: + self.client = client + self.stage = stage + self.fixture_path = fixture_path + self.iterations = iterations + self.variants_per_iter = variants_per_iter + + self.scorer = ScoreRunner(client) + self.generator = PromptVariantGenerator(client) + + def run(self) -> OptimizationResult: + """Execute the full optimization loop. + + Returns + ------- + OptimizationResult + Contains the best prompt, its scores, full iteration history, + and wall-clock elapsed time. + """ + from pipeline.stages import _load_prompt + + t0 = time.monotonic() + + # Load base prompt + prompt_file = f"stage{self.stage}_synthesis.txt" + try: + base_prompt = _load_prompt(prompt_file) + except FileNotFoundError: + logger.error("Prompt file not found: %s", prompt_file) + return OptimizationResult( + best_prompt="", + best_score=ScoreResult(error=f"Prompt file not found: {prompt_file}"), + elapsed_seconds=round(time.monotonic() - t0, 2), + ) + + # Load fixture data + try: + fixture = self._load_fixture() + except (FileNotFoundError, json.JSONDecodeError, KeyError) as exc: + logger.error("Failed to load fixture: %s", exc) + return OptimizationResult( + best_prompt=base_prompt, + best_score=ScoreResult(error=f"Fixture load error: {exc}"), + elapsed_seconds=round(time.monotonic() - t0, 2), + ) + + moments = fixture["moments"] + creator_name = fixture["creator_name"] + history: list[dict] = [] + + # Score the baseline + print(f"\n{'='*60}") + print(f" PROMPT OPTIMIZATION — Stage {self.stage}") + print(f" Iterations: {self.iterations}, Variants/iter: {self.variants_per_iter}") + print(f"{'='*60}\n") + + print(" Scoring baseline prompt...") + best_score = self.scorer.synthesize_and_score( + moments=moments, + creator_name=creator_name, + voice_level=0.5, + ) + best_prompt = base_prompt + + history.append({ + "iteration": 0, + "variant_index": 0, + "prompt_text": base_prompt[:200] + "..." if len(base_prompt) > 200 else base_prompt, + "prompt_length": len(base_prompt), + "composite": best_score.composite, + "scores": {d: getattr(best_score, d) for d in DIMENSIONS}, + "error": best_score.error, + "label": "baseline", + }) + + if best_score.error: + print(f" ✗ Baseline scoring failed: {best_score.error}") + print(" Aborting optimization — fix the baseline first.\n") + return OptimizationResult( + best_prompt=best_prompt, + best_score=best_score, + history=history, + elapsed_seconds=round(time.monotonic() - t0, 2), + ) + + self._print_iteration_summary(0, best_score, is_baseline=True) + + # Iterate + for iteration in range(1, self.iterations + 1): + print(f"\n ── Iteration {iteration}/{self.iterations} ──") + + # Generate variants + variants = self.generator.generate( + base_prompt=best_prompt, + scores=best_score, + n=self.variants_per_iter, + ) + + if not variants: + print(" ⚠ No valid variants generated — skipping iteration") + continue + + # Score each variant + iteration_best_score = best_score + iteration_best_prompt = best_prompt + + for vi, variant_prompt in enumerate(variants): + print(f" Scoring variant {vi + 1}/{len(variants)}...") + + # Temporarily replace the base prompt with the variant for synthesis + score = self._score_variant( + variant_prompt, moments, creator_name, + ) + + history.append({ + "iteration": iteration, + "variant_index": vi + 1, + "prompt_text": variant_prompt[:200] + "..." if len(variant_prompt) > 200 else variant_prompt, + "prompt_length": len(variant_prompt), + "composite": score.composite, + "scores": {d: getattr(score, d) for d in DIMENSIONS}, + "error": score.error, + "label": f"iter{iteration}_v{vi+1}", + }) + + if score.error: + print(f" ✗ Variant {vi + 1} errored: {score.error}") + continue + + if score.composite > iteration_best_score.composite: + iteration_best_score = score + iteration_best_prompt = variant_prompt + print(f" ✓ New best: {score.composite:.3f} (was {best_score.composite:.3f})") + else: + print(f" · Score {score.composite:.3f} ≤ current best {iteration_best_score.composite:.3f}") + + # Update global best if this iteration improved + if iteration_best_score.composite > best_score.composite: + best_score = iteration_best_score + best_prompt = iteration_best_prompt + print(f" ★ Iteration {iteration} improved: {best_score.composite:.3f}") + else: + print(f" · No improvement in iteration {iteration}") + + self._print_iteration_summary(iteration, best_score) + + # Final report + elapsed = round(time.monotonic() - t0, 2) + self._print_final_report(best_score, history, elapsed) + + return OptimizationResult( + best_prompt=best_prompt, + best_score=best_score, + history=history, + elapsed_seconds=elapsed, + ) + + # ── Internal helpers ────────────────────────────────────────────────── + + def _load_fixture(self) -> dict: + """Load and validate the fixture JSON file.""" + path = Path(self.fixture_path) + if not path.exists(): + raise FileNotFoundError(f"Fixture not found: {path}") + data = json.loads(path.read_text(encoding="utf-8")) + + if "moments" not in data: + raise KeyError("Fixture must contain 'moments' key") + if "creator_name" not in data: + raise KeyError("Fixture must contain 'creator_name' key") + + return data + + def _score_variant( + self, + variant_prompt: str, + moments: list[dict], + creator_name: str, + ) -> ScoreResult: + """Score a variant prompt by running synthesis + scoring. + + Uses the variant as a direct system prompt for synthesis, bypassing + VoiceDial (the optimization loop owns the full prompt text). + """ + from pipeline.schemas import SynthesisResult + from pipeline.stages import _get_stage_config + + import json as _json + import openai as _openai + + model_override, modality = _get_stage_config(self.stage) + + moments_json = _json.dumps(moments, indent=2) + user_prompt = f"{creator_name}\n\n{moments_json}\n" + + t0 = time.monotonic() + try: + raw = self.client.complete( + system_prompt=variant_prompt, + user_prompt=user_prompt, + response_model=SynthesisResult, + modality=modality, + model_override=model_override, + ) + elapsed_synth = round(time.monotonic() - t0, 2) + except (_openai.APIConnectionError, _openai.APITimeoutError) as exc: + elapsed_synth = round(time.monotonic() - t0, 2) + return ScoreResult( + elapsed_seconds=elapsed_synth, + error=f"Synthesis LLM error: {exc}", + ) + except Exception as exc: + elapsed_synth = round(time.monotonic() - t0, 2) + logger.exception("Unexpected error during variant synthesis") + return ScoreResult( + elapsed_seconds=elapsed_synth, + error=f"Unexpected synthesis error: {exc}", + ) + + # Parse synthesis + raw_text = str(raw).strip() + try: + synthesis = self.client.parse_response(raw_text, SynthesisResult) + except Exception as exc: + return ScoreResult( + elapsed_seconds=elapsed_synth, + error=f"Variant synthesis parse error: {exc}", + ) + + if not synthesis.pages: + return ScoreResult( + elapsed_seconds=elapsed_synth, + error="Variant synthesis returned no pages", + ) + + # Score the first page + page = synthesis.pages[0] + page_json = { + "title": page.title, + "creator_name": creator_name, + "summary": page.summary, + "body_sections": [ + {"heading": heading, "content": content} + for heading, content in page.body_sections.items() + ], + } + + result = self.scorer.score_page(page_json, moments) + result.elapsed_seconds = round(result.elapsed_seconds + elapsed_synth, 2) + return result + + def _print_iteration_summary( + self, + iteration: int, + score: ScoreResult, + is_baseline: bool = False, + ) -> None: + """Print a compact one-line summary of the current best scores.""" + label = "BASELINE" if is_baseline else f"ITER {iteration}" + dims = " ".join( + f"{d[:4]}={getattr(score, d):.2f}" for d in DIMENSIONS + ) + print(f" [{label}] composite={score.composite:.3f} {dims}") + + def _print_final_report( + self, + best_score: ScoreResult, + history: list[dict], + elapsed: float, + ) -> None: + """Print the final optimization summary.""" + print(f"\n{'='*60}") + print(" OPTIMIZATION COMPLETE") + print(f"{'='*60}") + print(f" Total time: {elapsed}s") + print(f" Iterations: {self.iterations}") + print(f" Variants scored: {len(history) - 1}") # minus baseline + + baseline_composite = history[0]["composite"] if history else 0.0 + improvement = best_score.composite - baseline_composite + + print(f"\n Baseline composite: {baseline_composite:.3f}") + print(f" Best composite: {best_score.composite:.3f}") + if improvement > 0: + print(f" Improvement: +{improvement:.3f}") + else: + print(f" Improvement: {improvement:.3f} (no gain)") + + print(f"\n Per-dimension best scores:") + for d in DIMENSIONS: + val = getattr(best_score, d) + bar = "█" * int(val * 20) + "░" * (20 - int(val * 20)) + print(f" {d.replace('_', ' ').title():25s} {val:.2f} {bar}") + + errored = sum(1 for h in history if h.get("error")) + if errored: + print(f"\n ⚠ {errored} variant(s) errored during scoring") + + print(f"{'='*60}\n") diff --git a/backend/pipeline/quality/variant_generator.py b/backend/pipeline/quality/variant_generator.py new file mode 100644 index 0000000..3a20adf --- /dev/null +++ b/backend/pipeline/quality/variant_generator.py @@ -0,0 +1,194 @@ +"""LLM-powered prompt variant generator for automated optimization. + +Uses a meta-prompt to instruct the LLM to act as a prompt engineer, +analyzing per-dimension scores and producing targeted prompt mutations +that improve the weakest scoring dimensions while preserving the JSON +output format required by downstream parsing. +""" +from __future__ import annotations + +import logging + +from pipeline.llm_client import LLMClient +from pipeline.quality.scorer import DIMENSIONS, ScoreResult + +logger = logging.getLogger(__name__) + + +# ── Meta-prompt for variant generation ──────────────────────────────────────── + +VARIANT_META_PROMPT = """\ +You are an expert prompt engineer specializing in LLM-powered content synthesis. + +Your task: given a synthesis prompt and its quality evaluation scores, produce an +improved variant of the prompt that targets the weakest-scoring dimensions while +maintaining or improving the others. + +## Scoring Dimensions (each 0.0–1.0) + +- **structural** — Section naming, count (3-6), paragraph depth (2-5 per section) +- **content_specificity** — Concrete details: frequencies, time values, ratios, plugin names, dB values +- **voice_preservation** — Direct quotes preserved, opinions attributed to creator by name, personality retained +- **readability** — Cohesive article flow, related info merged, no redundancy or contradiction +- **factual_fidelity** — Every claim traceable to source material, no hallucinated specifics + +## Rules + +1. Focus your changes on the weakest 1-2 dimensions. Don't dilute the prompt by trying to fix everything. +2. Add specific, actionable instructions — not vague encouragements. +3. **CRITICAL: You MUST preserve the JSON output format section of the prompt EXACTLY as-is.** + The prompt contains instructions about outputting a JSON object with a specific schema + (SynthesisResult with "pages" containing title, summary, body_sections, etc.). + Do NOT modify, remove, or rephrase any part of the JSON format instructions. + Your changes should target the prose synthesis guidelines only. +4. Keep the overall prompt length within 2x of the original. Don't bloat it. +5. Make substantive changes — rewording a sentence or adding one adjective is not enough. + +## Output + +Return ONLY the full modified prompt text. No explanation, no markdown fences, no preamble. +Just the complete prompt that could be used directly as a system prompt. +""" + + +# Format markers that must survive variant generation — if any of these +# are present in the base prompt, the variant must also contain them. +_FORMAT_MARKERS = ["SynthesisResult", '"pages"', "body_sections", "title", "summary"] + + +class PromptVariantGenerator: + """Generates prompt variants by asking an LLM to act as a prompt engineer. + + Given a base prompt and its evaluation scores, produces N mutated + variants targeting the weakest dimensions. + """ + + def __init__(self, client: LLMClient) -> None: + self.client = client + + def generate( + self, + base_prompt: str, + scores: ScoreResult, + n: int = 2, + ) -> list[str]: + """Generate up to *n* valid prompt variants. + + Each variant is produced by a separate LLM call with the meta-prompt. + Variants are validated: they must differ from the base by ≥50 characters + and must contain the JSON format instruction markers found in the base. + + Invalid variants are logged and skipped. + + Parameters + ---------- + base_prompt: + The current best synthesis prompt text. + scores: + ScoreResult from the most recent evaluation of *base_prompt*. + n: + Number of variants to attempt generating. + + Returns + ------- + list[str] + Valid variant prompt strings (may be fewer than *n*). + """ + user_prompt = self._build_user_prompt(base_prompt, scores) + # Identify which format markers are actually present in the base + required_markers = [m for m in _FORMAT_MARKERS if m in base_prompt] + + variants: list[str] = [] + for i in range(n): + logger.info("Generating variant %d/%d...", i + 1, n) + try: + raw = self.client.complete( + system_prompt=VARIANT_META_PROMPT, + user_prompt=user_prompt, + response_model=None, # free-form text, not JSON + modality="chat", + ) + variant = str(raw).strip() + except Exception: + logger.exception("LLM error generating variant %d/%d", i + 1, n) + continue + + # Validate the variant + if not self._validate(variant, base_prompt, required_markers, i + 1): + continue + + variants.append(variant) + logger.info("Variant %d/%d accepted (%d chars)", i + 1, n, len(variant)) + + logger.info( + "Generated %d valid variant(s) out of %d attempts", len(variants), n + ) + return variants + + # ── Internal helpers ────────────────────────────────────────────────── + + def _build_user_prompt(self, base_prompt: str, scores: ScoreResult) -> str: + """Build the user message describing the current prompt and its scores.""" + # Build per-dimension score lines, sorted worst-first + dim_lines: list[str] = [] + dim_scores = [(d, getattr(scores, d, 0.0)) for d in DIMENSIONS] + dim_scores.sort(key=lambda x: x[1]) + + for dim, val in dim_scores: + justification = scores.justifications.get(dim, "") + label = dim.replace("_", " ").title() + line = f" {label}: {val:.2f}" + if justification: + line += f" — {justification}" + dim_lines.append(line) + + weakest = dim_scores[0][0].replace("_", " ").title() + second_weakest = dim_scores[1][0].replace("_", " ").title() if len(dim_scores) > 1 else weakest + + return ( + f"## Current Prompt\n\n{base_prompt}\n\n" + f"## Evaluation Scores (sorted weakest → strongest)\n\n" + + "\n".join(dim_lines) + + f"\n\n Composite: {scores.composite:.3f}\n\n" + f"## Priority\n\n" + f"The weakest dimensions are **{weakest}** and **{second_weakest}**. " + f"Focus your prompt modifications on improving these.\n\n" + f"Return the full modified prompt now." + ) + + def _validate( + self, + variant: str, + base_prompt: str, + required_markers: list[str], + index: int, + ) -> bool: + """Check a variant meets minimum quality gates.""" + if not variant: + logger.warning("Variant %d is empty — skipping", index) + return False + + # Must differ meaningfully from base + diff = abs(len(variant) - len(base_prompt)) + # Also check actual content difference via set-symmetric-difference of lines + base_lines = set(base_prompt.splitlines()) + variant_lines = set(variant.splitlines()) + changed_lines = len(base_lines.symmetric_difference(variant_lines)) + + if diff < 50 and changed_lines < 3: + logger.warning( + "Variant %d too similar to base (len_diff=%d, changed_lines=%d) — skipping", + index, diff, changed_lines, + ) + return False + + # Must preserve format markers + missing = [m for m in required_markers if m not in variant] + if missing: + logger.warning( + "Variant %d missing format markers %s — skipping", + index, missing, + ) + return False + + return True diff --git a/pipeline b/pipeline new file mode 120000 index 0000000..1369d83 --- /dev/null +++ b/pipeline @@ -0,0 +1 @@ +backend/pipeline \ No newline at end of file