feat: Created PromptVariantGenerator (LLM-powered prompt mutation) and…

- "backend/pipeline/quality/variant_generator.py"
- "backend/pipeline/quality/optimizer.py"

GSD-Task: S03/T01
This commit is contained in:
jlightner 2026-04-01 09:08:01 +00:00
parent 0086573af5
commit 0d82b2b409
4 changed files with 570 additions and 0 deletions

View file

@ -0,0 +1,11 @@
"""FYN-LLM quality assurance toolkit."""
import os
import sys
# Ensure backend/ is on sys.path so sibling modules (config, pipeline.llm_client)
# resolve when running from the project root via symlink.
_backend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..")
_backend_abs = os.path.normpath(os.path.abspath(_backend_dir))
if _backend_abs not in sys.path:
sys.path.insert(0, _backend_abs)

View file

@ -0,0 +1,364 @@
"""Automated prompt optimization loop for Stage 5 synthesis.
Orchestrates a generatescoreselect cycle:
1. Score the current best prompt against reference fixtures
2. Generate N variants targeting weak dimensions
3. Score each variant
4. Keep the best scorer as the new baseline
5. Repeat for K iterations
Usage (via CLI):
python -m pipeline.quality optimize --stage 5 --iterations 10
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from pipeline.llm_client import LLMClient
from pipeline.quality.scorer import DIMENSIONS, ScoreResult, ScoreRunner
from pipeline.quality.variant_generator import PromptVariantGenerator
logger = logging.getLogger(__name__)
@dataclass
class OptimizationResult:
"""Full result of an optimization run."""
best_prompt: str = ""
best_score: ScoreResult = field(default_factory=ScoreResult)
history: list[dict] = field(default_factory=list)
elapsed_seconds: float = 0.0
class OptimizationLoop:
"""Runs iterative prompt optimization for a pipeline stage.
Each iteration generates *variants_per_iter* prompt mutations,
scores each against reference fixture data, and keeps the
highest-composite-scoring variant as the new baseline.
Parameters
----------
client:
LLMClient instance for LLM calls (synthesis + scoring + variant gen).
stage:
Pipeline stage number (currently only 5 is supported).
fixture_path:
Path to a JSON fixture file containing ``creator_name`` and ``moments``.
iterations:
Number of generatescoreselect cycles.
variants_per_iter:
Number of variant prompts to generate per iteration.
"""
def __init__(
self,
client: LLMClient,
stage: int,
fixture_path: str,
iterations: int = 5,
variants_per_iter: int = 2,
) -> None:
self.client = client
self.stage = stage
self.fixture_path = fixture_path
self.iterations = iterations
self.variants_per_iter = variants_per_iter
self.scorer = ScoreRunner(client)
self.generator = PromptVariantGenerator(client)
def run(self) -> OptimizationResult:
"""Execute the full optimization loop.
Returns
-------
OptimizationResult
Contains the best prompt, its scores, full iteration history,
and wall-clock elapsed time.
"""
from pipeline.stages import _load_prompt
t0 = time.monotonic()
# Load base prompt
prompt_file = f"stage{self.stage}_synthesis.txt"
try:
base_prompt = _load_prompt(prompt_file)
except FileNotFoundError:
logger.error("Prompt file not found: %s", prompt_file)
return OptimizationResult(
best_prompt="",
best_score=ScoreResult(error=f"Prompt file not found: {prompt_file}"),
elapsed_seconds=round(time.monotonic() - t0, 2),
)
# Load fixture data
try:
fixture = self._load_fixture()
except (FileNotFoundError, json.JSONDecodeError, KeyError) as exc:
logger.error("Failed to load fixture: %s", exc)
return OptimizationResult(
best_prompt=base_prompt,
best_score=ScoreResult(error=f"Fixture load error: {exc}"),
elapsed_seconds=round(time.monotonic() - t0, 2),
)
moments = fixture["moments"]
creator_name = fixture["creator_name"]
history: list[dict] = []
# Score the baseline
print(f"\n{'='*60}")
print(f" PROMPT OPTIMIZATION — Stage {self.stage}")
print(f" Iterations: {self.iterations}, Variants/iter: {self.variants_per_iter}")
print(f"{'='*60}\n")
print(" Scoring baseline prompt...")
best_score = self.scorer.synthesize_and_score(
moments=moments,
creator_name=creator_name,
voice_level=0.5,
)
best_prompt = base_prompt
history.append({
"iteration": 0,
"variant_index": 0,
"prompt_text": base_prompt[:200] + "..." if len(base_prompt) > 200 else base_prompt,
"prompt_length": len(base_prompt),
"composite": best_score.composite,
"scores": {d: getattr(best_score, d) for d in DIMENSIONS},
"error": best_score.error,
"label": "baseline",
})
if best_score.error:
print(f" ✗ Baseline scoring failed: {best_score.error}")
print(" Aborting optimization — fix the baseline first.\n")
return OptimizationResult(
best_prompt=best_prompt,
best_score=best_score,
history=history,
elapsed_seconds=round(time.monotonic() - t0, 2),
)
self._print_iteration_summary(0, best_score, is_baseline=True)
# Iterate
for iteration in range(1, self.iterations + 1):
print(f"\n ── Iteration {iteration}/{self.iterations} ──")
# Generate variants
variants = self.generator.generate(
base_prompt=best_prompt,
scores=best_score,
n=self.variants_per_iter,
)
if not variants:
print(" ⚠ No valid variants generated — skipping iteration")
continue
# Score each variant
iteration_best_score = best_score
iteration_best_prompt = best_prompt
for vi, variant_prompt in enumerate(variants):
print(f" Scoring variant {vi + 1}/{len(variants)}...")
# Temporarily replace the base prompt with the variant for synthesis
score = self._score_variant(
variant_prompt, moments, creator_name,
)
history.append({
"iteration": iteration,
"variant_index": vi + 1,
"prompt_text": variant_prompt[:200] + "..." if len(variant_prompt) > 200 else variant_prompt,
"prompt_length": len(variant_prompt),
"composite": score.composite,
"scores": {d: getattr(score, d) for d in DIMENSIONS},
"error": score.error,
"label": f"iter{iteration}_v{vi+1}",
})
if score.error:
print(f" ✗ Variant {vi + 1} errored: {score.error}")
continue
if score.composite > iteration_best_score.composite:
iteration_best_score = score
iteration_best_prompt = variant_prompt
print(f" ✓ New best: {score.composite:.3f} (was {best_score.composite:.3f})")
else:
print(f" · Score {score.composite:.3f} ≤ current best {iteration_best_score.composite:.3f}")
# Update global best if this iteration improved
if iteration_best_score.composite > best_score.composite:
best_score = iteration_best_score
best_prompt = iteration_best_prompt
print(f" ★ Iteration {iteration} improved: {best_score.composite:.3f}")
else:
print(f" · No improvement in iteration {iteration}")
self._print_iteration_summary(iteration, best_score)
# Final report
elapsed = round(time.monotonic() - t0, 2)
self._print_final_report(best_score, history, elapsed)
return OptimizationResult(
best_prompt=best_prompt,
best_score=best_score,
history=history,
elapsed_seconds=elapsed,
)
# ── Internal helpers ──────────────────────────────────────────────────
def _load_fixture(self) -> dict:
"""Load and validate the fixture JSON file."""
path = Path(self.fixture_path)
if not path.exists():
raise FileNotFoundError(f"Fixture not found: {path}")
data = json.loads(path.read_text(encoding="utf-8"))
if "moments" not in data:
raise KeyError("Fixture must contain 'moments' key")
if "creator_name" not in data:
raise KeyError("Fixture must contain 'creator_name' key")
return data
def _score_variant(
self,
variant_prompt: str,
moments: list[dict],
creator_name: str,
) -> ScoreResult:
"""Score a variant prompt by running synthesis + scoring.
Uses the variant as a direct system prompt for synthesis, bypassing
VoiceDial (the optimization loop owns the full prompt text).
"""
from pipeline.schemas import SynthesisResult
from pipeline.stages import _get_stage_config
import json as _json
import openai as _openai
model_override, modality = _get_stage_config(self.stage)
moments_json = _json.dumps(moments, indent=2)
user_prompt = f"<creator>{creator_name}</creator>\n<moments>\n{moments_json}\n</moments>"
t0 = time.monotonic()
try:
raw = self.client.complete(
system_prompt=variant_prompt,
user_prompt=user_prompt,
response_model=SynthesisResult,
modality=modality,
model_override=model_override,
)
elapsed_synth = round(time.monotonic() - t0, 2)
except (_openai.APIConnectionError, _openai.APITimeoutError) as exc:
elapsed_synth = round(time.monotonic() - t0, 2)
return ScoreResult(
elapsed_seconds=elapsed_synth,
error=f"Synthesis LLM error: {exc}",
)
except Exception as exc:
elapsed_synth = round(time.monotonic() - t0, 2)
logger.exception("Unexpected error during variant synthesis")
return ScoreResult(
elapsed_seconds=elapsed_synth,
error=f"Unexpected synthesis error: {exc}",
)
# Parse synthesis
raw_text = str(raw).strip()
try:
synthesis = self.client.parse_response(raw_text, SynthesisResult)
except Exception as exc:
return ScoreResult(
elapsed_seconds=elapsed_synth,
error=f"Variant synthesis parse error: {exc}",
)
if not synthesis.pages:
return ScoreResult(
elapsed_seconds=elapsed_synth,
error="Variant synthesis returned no pages",
)
# Score the first page
page = synthesis.pages[0]
page_json = {
"title": page.title,
"creator_name": creator_name,
"summary": page.summary,
"body_sections": [
{"heading": heading, "content": content}
for heading, content in page.body_sections.items()
],
}
result = self.scorer.score_page(page_json, moments)
result.elapsed_seconds = round(result.elapsed_seconds + elapsed_synth, 2)
return result
def _print_iteration_summary(
self,
iteration: int,
score: ScoreResult,
is_baseline: bool = False,
) -> None:
"""Print a compact one-line summary of the current best scores."""
label = "BASELINE" if is_baseline else f"ITER {iteration}"
dims = " ".join(
f"{d[:4]}={getattr(score, d):.2f}" for d in DIMENSIONS
)
print(f" [{label}] composite={score.composite:.3f} {dims}")
def _print_final_report(
self,
best_score: ScoreResult,
history: list[dict],
elapsed: float,
) -> None:
"""Print the final optimization summary."""
print(f"\n{'='*60}")
print(" OPTIMIZATION COMPLETE")
print(f"{'='*60}")
print(f" Total time: {elapsed}s")
print(f" Iterations: {self.iterations}")
print(f" Variants scored: {len(history) - 1}") # minus baseline
baseline_composite = history[0]["composite"] if history else 0.0
improvement = best_score.composite - baseline_composite
print(f"\n Baseline composite: {baseline_composite:.3f}")
print(f" Best composite: {best_score.composite:.3f}")
if improvement > 0:
print(f" Improvement: +{improvement:.3f}")
else:
print(f" Improvement: {improvement:.3f} (no gain)")
print(f"\n Per-dimension best scores:")
for d in DIMENSIONS:
val = getattr(best_score, d)
bar = "" * int(val * 20) + "" * (20 - int(val * 20))
print(f" {d.replace('_', ' ').title():25s} {val:.2f} {bar}")
errored = sum(1 for h in history if h.get("error"))
if errored:
print(f"\n{errored} variant(s) errored during scoring")
print(f"{'='*60}\n")

View file

@ -0,0 +1,194 @@
"""LLM-powered prompt variant generator for automated optimization.
Uses a meta-prompt to instruct the LLM to act as a prompt engineer,
analyzing per-dimension scores and producing targeted prompt mutations
that improve the weakest scoring dimensions while preserving the JSON
output format required by downstream parsing.
"""
from __future__ import annotations
import logging
from pipeline.llm_client import LLMClient
from pipeline.quality.scorer import DIMENSIONS, ScoreResult
logger = logging.getLogger(__name__)
# ── Meta-prompt for variant generation ────────────────────────────────────────
VARIANT_META_PROMPT = """\
You are an expert prompt engineer specializing in LLM-powered content synthesis.
Your task: given a synthesis prompt and its quality evaluation scores, produce an
improved variant of the prompt that targets the weakest-scoring dimensions while
maintaining or improving the others.
## Scoring Dimensions (each 0.01.0)
- **structural** Section naming, count (3-6), paragraph depth (2-5 per section)
- **content_specificity** Concrete details: frequencies, time values, ratios, plugin names, dB values
- **voice_preservation** Direct quotes preserved, opinions attributed to creator by name, personality retained
- **readability** Cohesive article flow, related info merged, no redundancy or contradiction
- **factual_fidelity** Every claim traceable to source material, no hallucinated specifics
## Rules
1. Focus your changes on the weakest 1-2 dimensions. Don't dilute the prompt by trying to fix everything.
2. Add specific, actionable instructions not vague encouragements.
3. **CRITICAL: You MUST preserve the JSON output format section of the prompt EXACTLY as-is.**
The prompt contains instructions about outputting a JSON object with a specific schema
(SynthesisResult with "pages" containing title, summary, body_sections, etc.).
Do NOT modify, remove, or rephrase any part of the JSON format instructions.
Your changes should target the prose synthesis guidelines only.
4. Keep the overall prompt length within 2x of the original. Don't bloat it.
5. Make substantive changes rewording a sentence or adding one adjective is not enough.
## Output
Return ONLY the full modified prompt text. No explanation, no markdown fences, no preamble.
Just the complete prompt that could be used directly as a system prompt.
"""
# Format markers that must survive variant generation — if any of these
# are present in the base prompt, the variant must also contain them.
_FORMAT_MARKERS = ["SynthesisResult", '"pages"', "body_sections", "title", "summary"]
class PromptVariantGenerator:
"""Generates prompt variants by asking an LLM to act as a prompt engineer.
Given a base prompt and its evaluation scores, produces N mutated
variants targeting the weakest dimensions.
"""
def __init__(self, client: LLMClient) -> None:
self.client = client
def generate(
self,
base_prompt: str,
scores: ScoreResult,
n: int = 2,
) -> list[str]:
"""Generate up to *n* valid prompt variants.
Each variant is produced by a separate LLM call with the meta-prompt.
Variants are validated: they must differ from the base by 50 characters
and must contain the JSON format instruction markers found in the base.
Invalid variants are logged and skipped.
Parameters
----------
base_prompt:
The current best synthesis prompt text.
scores:
ScoreResult from the most recent evaluation of *base_prompt*.
n:
Number of variants to attempt generating.
Returns
-------
list[str]
Valid variant prompt strings (may be fewer than *n*).
"""
user_prompt = self._build_user_prompt(base_prompt, scores)
# Identify which format markers are actually present in the base
required_markers = [m for m in _FORMAT_MARKERS if m in base_prompt]
variants: list[str] = []
for i in range(n):
logger.info("Generating variant %d/%d...", i + 1, n)
try:
raw = self.client.complete(
system_prompt=VARIANT_META_PROMPT,
user_prompt=user_prompt,
response_model=None, # free-form text, not JSON
modality="chat",
)
variant = str(raw).strip()
except Exception:
logger.exception("LLM error generating variant %d/%d", i + 1, n)
continue
# Validate the variant
if not self._validate(variant, base_prompt, required_markers, i + 1):
continue
variants.append(variant)
logger.info("Variant %d/%d accepted (%d chars)", i + 1, n, len(variant))
logger.info(
"Generated %d valid variant(s) out of %d attempts", len(variants), n
)
return variants
# ── Internal helpers ──────────────────────────────────────────────────
def _build_user_prompt(self, base_prompt: str, scores: ScoreResult) -> str:
"""Build the user message describing the current prompt and its scores."""
# Build per-dimension score lines, sorted worst-first
dim_lines: list[str] = []
dim_scores = [(d, getattr(scores, d, 0.0)) for d in DIMENSIONS]
dim_scores.sort(key=lambda x: x[1])
for dim, val in dim_scores:
justification = scores.justifications.get(dim, "")
label = dim.replace("_", " ").title()
line = f" {label}: {val:.2f}"
if justification:
line += f"{justification}"
dim_lines.append(line)
weakest = dim_scores[0][0].replace("_", " ").title()
second_weakest = dim_scores[1][0].replace("_", " ").title() if len(dim_scores) > 1 else weakest
return (
f"## Current Prompt\n\n{base_prompt}\n\n"
f"## Evaluation Scores (sorted weakest → strongest)\n\n"
+ "\n".join(dim_lines)
+ f"\n\n Composite: {scores.composite:.3f}\n\n"
f"## Priority\n\n"
f"The weakest dimensions are **{weakest}** and **{second_weakest}**. "
f"Focus your prompt modifications on improving these.\n\n"
f"Return the full modified prompt now."
)
def _validate(
self,
variant: str,
base_prompt: str,
required_markers: list[str],
index: int,
) -> bool:
"""Check a variant meets minimum quality gates."""
if not variant:
logger.warning("Variant %d is empty — skipping", index)
return False
# Must differ meaningfully from base
diff = abs(len(variant) - len(base_prompt))
# Also check actual content difference via set-symmetric-difference of lines
base_lines = set(base_prompt.splitlines())
variant_lines = set(variant.splitlines())
changed_lines = len(base_lines.symmetric_difference(variant_lines))
if diff < 50 and changed_lines < 3:
logger.warning(
"Variant %d too similar to base (len_diff=%d, changed_lines=%d) — skipping",
index, diff, changed_lines,
)
return False
# Must preserve format markers
missing = [m for m in required_markers if m not in variant]
if missing:
logger.warning(
"Variant %d missing format markers %s — skipping",
index, missing,
)
return False
return True

1
pipeline Symbolic link
View file

@ -0,0 +1 @@
backend/pipeline