diff --git a/backend/pipeline/quality/__main__.py b/backend/pipeline/quality/__main__.py
index 7b9a1e2..7b1e2b8 100644
--- a/backend/pipeline/quality/__main__.py
+++ b/backend/pipeline/quality/__main__.py
@@ -20,14 +20,16 @@ from pipeline.llm_client import LLMClient
from .fitness import FitnessRunner
from .optimizer import OptimizationLoop, OptimizationResult
-from .scorer import DIMENSIONS, ScoreRunner
+from .scorer import DIMENSIONS, STAGE_CONFIGS, ScoreRunner
# ── Reporting helpers ────────────────────────────────────────────────────────
-def print_leaderboard(result: OptimizationResult) -> None:
+def print_leaderboard(result: OptimizationResult, stage: int = 5) -> None:
"""Print a formatted leaderboard of top 5 variants by composite score."""
+ dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS
+
# Filter to entries that actually scored (no errors)
scored = [h for h in result.history if not h.get("error")]
if not scored:
@@ -37,19 +39,20 @@ def print_leaderboard(result: OptimizationResult) -> None:
ranked = sorted(scored, key=lambda h: h["composite"], reverse=True)[:5]
print(f"\n{'='*72}")
- print(" LEADERBOARD — Top 5 Variants by Composite Score")
+ print(f" LEADERBOARD — Top 5 Variants by Composite Score (Stage {stage})")
print(f"{'='*72}")
# Header
- dim_headers = " ".join(f"{d[:5]:>5s}" for d in DIMENSIONS)
+ dim_headers = " ".join(f"{d[:5]:>5s}" for d in dims)
+ sep_segments = " ".join("─" * 5 for _ in dims)
print(f" {'#':>2s} {'Label':<16s} {'Comp':>5s} {dim_headers}")
- print(f" {'─'*2} {'─'*16} {'─'*5} {'─'*5} {'─'*5} {'─'*5} {'─'*5} {'─'*5}")
+ print(f" {'─'*2} {'─'*16} {'─'*5} {sep_segments}")
for i, entry in enumerate(ranked, 1):
label = entry.get("label", "?")[:16]
comp = entry["composite"]
dim_vals = " ".join(
- f"{entry['scores'].get(d, 0.0):5.2f}" for d in DIMENSIONS
+ f"{entry['scores'].get(d, 0.0):5.2f}" for d in dims
)
bar = "█" * int(comp * 20) + "░" * (20 - int(comp * 20))
print(f" {i:>2d} {label:<16s} {comp:5.3f} {dim_vals} {bar}")
@@ -135,6 +138,8 @@ def write_results_json(
filename = f"optimize_stage{stage}_{timestamp}.json"
filepath = out_path / filename
+ dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS
+
payload = {
"config": {
"stage": stage,
@@ -145,7 +150,7 @@ def write_results_json(
"best_prompt": result.best_prompt,
"best_scores": {
"composite": result.best_score.composite,
- **{d: getattr(result.best_score, d) for d in DIMENSIONS},
+ **{d: result.best_score.scores.get(d, 0.0) for d in dims},
},
"elapsed_seconds": result.elapsed_seconds,
"history": result.history,
@@ -321,10 +326,10 @@ def _run_score(args: argparse.Namespace) -> int:
def _run_optimize(args: argparse.Namespace) -> int:
"""Execute the optimize subcommand."""
- # Stage validation — only stage 5 is supported
- if args.stage != 5:
+ # Stage validation — stages 2-5 are supported
+ if args.stage not in STAGE_CONFIGS:
print(
- f"Error: only stage 5 is supported for optimization (got stage {args.stage})",
+ f"Error: unsupported stage {args.stage}. Valid stages: {sorted(STAGE_CONFIGS)}",
file=sys.stderr,
)
return 1
@@ -364,7 +369,7 @@ def _run_optimize(args: argparse.Namespace) -> int:
return 1
# Reporting
- print_leaderboard(result)
+ print_leaderboard(result, stage=args.stage)
print_trajectory(result)
# Write results JSON
diff --git a/backend/pipeline/quality/fixtures/sample_classifications.json b/backend/pipeline/quality/fixtures/sample_classifications.json
new file mode 100644
index 0000000..c3e1693
--- /dev/null
+++ b/backend/pipeline/quality/fixtures/sample_classifications.json
@@ -0,0 +1,29 @@
+{
+ "extracted_moments": [
+ {
+ "title": "Frequency-specific sidechain with Trackspacer",
+ "summary": "Using Trackspacer plugin for frequency-band sidechain compression targeting 100-300Hz, allowing bass high-end to remain present while clearing low-mid mud under the kick.",
+ "content_type": "technique",
+ "plugins": ["Trackspacer"],
+ "start_time": 15.2,
+ "end_time": 52.1
+ },
+ {
+ "title": "Parallel drum compression chain",
+ "summary": "Setting up Ableton's Drum Buss at 40% drive into a return track with Valhalla Room at 1.2s decay, mixed at -12dB for room sound without wash.",
+ "content_type": "settings",
+ "plugins": ["Drum Buss", "Valhalla Room"],
+ "start_time": 52.1,
+ "end_time": 89.3
+ },
+ {
+ "title": "Mono compatibility checking workflow",
+ "summary": "Using Ableton's Utility plugin on the sub bus to constantly check mono compatibility of layered bass patches, catching phase cancellation before mixdown.",
+ "content_type": "workflow",
+ "plugins": ["Utility"],
+ "start_time": 89.3,
+ "end_time": 110.0
+ }
+ ],
+ "taxonomy": "Sound Design > Mixing & Processing"
+}
diff --git a/backend/pipeline/quality/fixtures/sample_segments.json b/backend/pipeline/quality/fixtures/sample_segments.json
new file mode 100644
index 0000000..5ac45fd
--- /dev/null
+++ b/backend/pipeline/quality/fixtures/sample_segments.json
@@ -0,0 +1,40 @@
+{
+ "transcript_segments": [
+ {
+ "index": 0,
+ "start_time": 0.0,
+ "end_time": 15.2,
+ "text": "Hey everyone, today we're going to talk about sidechain compression and how I use it in my productions."
+ },
+ {
+ "index": 1,
+ "start_time": 15.2,
+ "end_time": 34.8,
+ "text": "So the basic idea is you take the kick drum signal and use it to duck the bass. Most people use a compressor for this but I actually prefer Trackspacer because it gives you frequency-specific ducking."
+ },
+ {
+ "index": 2,
+ "start_time": 34.8,
+ "end_time": 52.1,
+ "text": "With Trackspacer you can set it to only affect 100 to 300 Hz so when the kick hits, the bass ducks just in that low-mid range. The top end stays right there."
+ },
+ {
+ "index": 3,
+ "start_time": 52.1,
+ "end_time": 71.5,
+ "text": "Now let me show you another technique — parallel compression on drums. I use Drum Buss with the drive at about 40 percent, then send that to a return track."
+ },
+ {
+ "index": 4,
+ "start_time": 71.5,
+ "end_time": 89.3,
+ "text": "On the return I put Valhalla Room with a short decay, like 1.2 seconds. Mix it in at minus 12 dB. Your drums just breathe — they get this room sound without getting washy."
+ },
+ {
+ "index": 5,
+ "start_time": 89.3,
+ "end_time": 110.0,
+ "text": "One more thing about mono compatibility. I always have Utility on the sub bus and I flip to mono constantly. If your layered bass sounds thin in mono you've got phase issues."
+ }
+ ]
+}
diff --git a/backend/pipeline/quality/fixtures/sample_topic_group.json b/backend/pipeline/quality/fixtures/sample_topic_group.json
new file mode 100644
index 0000000..397af37
--- /dev/null
+++ b/backend/pipeline/quality/fixtures/sample_topic_group.json
@@ -0,0 +1,18 @@
+{
+ "topic_segments": [
+ {
+ "start_index": 0,
+ "end_index": 2,
+ "topic_label": "Frequency-specific sidechain compression with Trackspacer",
+ "summary": "Using Trackspacer for frequency-band sidechain ducking instead of traditional volume compression",
+ "transcript_text": "Hey everyone, today we're going to talk about sidechain compression and how I use it in my productions. So the basic idea is you take the kick drum signal and use it to duck the bass. Most people use a compressor for this but I actually prefer Trackspacer because it gives you frequency-specific ducking. With Trackspacer you can set it to only affect 100 to 300 Hz so when the kick hits, the bass ducks just in that low-mid range. The top end stays right there."
+ },
+ {
+ "start_index": 3,
+ "end_index": 4,
+ "topic_label": "Parallel drum compression with Drum Buss and Valhalla Room",
+ "summary": "Setting up a parallel compression chain using Ableton's Drum Buss and Valhalla Room reverb for drum processing",
+ "transcript_text": "Now let me show you another technique — parallel compression on drums. I use Drum Buss with the drive at about 40 percent, then send that to a return track. On the return I put Valhalla Room with a short decay, like 1.2 seconds. Mix it in at minus 12 dB. Your drums just breathe — they get this room sound without getting washy."
+ }
+ ]
+}
diff --git a/backend/pipeline/quality/optimizer.py b/backend/pipeline/quality/optimizer.py
index 7aebb85..a26725f 100644
--- a/backend/pipeline/quality/optimizer.py
+++ b/backend/pipeline/quality/optimizer.py
@@ -1,4 +1,4 @@
-"""Automated prompt optimization loop for Stage 5 synthesis.
+"""Automated prompt optimization loop for pipeline stages 2-5.
Orchestrates a generate→score→select cycle:
1. Score the current best prompt against reference fixtures
@@ -9,6 +9,7 @@ Orchestrates a generate→score→select cycle:
Usage (via CLI):
python -m pipeline.quality optimize --stage 5 --iterations 10
+ python -m pipeline.quality optimize --stage 3 --iterations 5 --file fixtures/sample_topic_group.json
"""
from __future__ import annotations
@@ -19,8 +20,7 @@ from dataclasses import dataclass, field
from pathlib import Path
from pipeline.llm_client import LLMClient
-from pipeline.quality.scorer import DIMENSIONS, ScoreResult, ScoreRunner
-from pipeline.quality.variant_generator import PromptVariantGenerator
+from pipeline.quality.scorer import STAGE_CONFIGS, ScoreResult, ScoreRunner
logger = logging.getLogger(__name__)
@@ -47,9 +47,9 @@ class OptimizationLoop:
client:
LLMClient instance for LLM calls (synthesis + scoring + variant gen).
stage:
- Pipeline stage number (currently only 5 is supported).
+ Pipeline stage number (2-5).
fixture_path:
- Path to a JSON fixture file containing ``creator_name`` and ``moments``.
+ Path to a JSON fixture file matching the stage's expected keys.
iterations:
Number of generate→score→select cycles.
variants_per_iter:
@@ -64,11 +64,17 @@ class OptimizationLoop:
iterations: int = 5,
variants_per_iter: int = 2,
) -> None:
+ if stage not in STAGE_CONFIGS:
+ raise ValueError(
+ f"Unsupported stage {stage}. Valid stages: {sorted(STAGE_CONFIGS)}"
+ )
+
self.client = client
self.stage = stage
self.fixture_path = fixture_path
self.iterations = iterations
self.variants_per_iter = variants_per_iter
+ self.config = STAGE_CONFIGS[stage]
self.scorer = ScoreRunner(client)
self.generator = PromptVariantGenerator(client)
@@ -85,9 +91,10 @@ class OptimizationLoop:
from pipeline.stages import _load_prompt
t0 = time.monotonic()
+ dimensions = self.config.dimensions
- # Load base prompt
- prompt_file = f"stage{self.stage}_synthesis.txt"
+ # Load base prompt using the stage's configured prompt file
+ prompt_file = self.config.prompt_file
try:
base_prompt = _load_prompt(prompt_file)
except FileNotFoundError:
@@ -109,8 +116,6 @@ class OptimizationLoop:
elapsed_seconds=round(time.monotonic() - t0, 2),
)
- moments = fixture["moments"]
- creator_name = fixture["creator_name"]
history: list[dict] = []
# Score the baseline
@@ -120,11 +125,7 @@ class OptimizationLoop:
print(f"{'='*60}\n")
print(" Scoring baseline prompt...")
- best_score = self.scorer.synthesize_and_score(
- moments=moments,
- creator_name=creator_name,
- voice_level=0.5,
- )
+ best_score = self._score_variant(base_prompt, fixture)
best_prompt = base_prompt
history.append({
@@ -133,7 +134,7 @@ class OptimizationLoop:
"prompt_text": base_prompt[:200] + "..." if len(base_prompt) > 200 else base_prompt,
"prompt_length": len(base_prompt),
"composite": best_score.composite,
- "scores": {d: getattr(best_score, d) for d in DIMENSIONS},
+ "scores": {d: best_score.scores.get(d, 0.0) for d in dimensions},
"error": best_score.error,
"label": "baseline",
})
@@ -154,11 +155,12 @@ class OptimizationLoop:
for iteration in range(1, self.iterations + 1):
print(f"\n ── Iteration {iteration}/{self.iterations} ──")
- # Generate variants
+ # Generate variants with stage-appropriate markers
variants = self.generator.generate(
base_prompt=best_prompt,
scores=best_score,
n=self.variants_per_iter,
+ stage=self.stage,
)
if not variants:
@@ -172,10 +174,7 @@ class OptimizationLoop:
for vi, variant_prompt in enumerate(variants):
print(f" Scoring variant {vi + 1}/{len(variants)}...")
- # Temporarily replace the base prompt with the variant for synthesis
- score = self._score_variant(
- variant_prompt, moments, creator_name,
- )
+ score = self._score_variant(variant_prompt, fixture)
history.append({
"iteration": iteration,
@@ -183,7 +182,7 @@ class OptimizationLoop:
"prompt_text": variant_prompt[:200] + "..." if len(variant_prompt) > 200 else variant_prompt,
"prompt_length": len(variant_prompt),
"composite": score.composite,
- "scores": {d: getattr(score, d) for d in DIMENSIONS},
+ "scores": {d: score.scores.get(d, 0.0) for d in dimensions},
"error": score.error,
"label": f"iter{iteration}_v{vi+1}",
})
@@ -223,47 +222,50 @@ class OptimizationLoop:
# ── Internal helpers ──────────────────────────────────────────────────
def _load_fixture(self) -> dict:
- """Load and validate the fixture JSON file."""
+ """Load and validate the fixture JSON file against stage-specific keys."""
path = Path(self.fixture_path)
if not path.exists():
raise FileNotFoundError(f"Fixture not found: {path}")
data = json.loads(path.read_text(encoding="utf-8"))
- if "moments" not in data:
- raise KeyError("Fixture must contain 'moments' key")
- if "creator_name" not in data:
- raise KeyError("Fixture must contain 'creator_name' key")
+ for key in self.config.fixture_keys:
+ if key not in data:
+ raise KeyError(
+ f"Stage {self.stage} fixture must contain '{key}' key "
+ f"(required: {self.config.fixture_keys})"
+ )
return data
def _score_variant(
self,
variant_prompt: str,
- moments: list[dict],
- creator_name: str,
+ fixture: dict,
) -> ScoreResult:
- """Score a variant prompt by running synthesis + scoring.
+ """Score a variant prompt by running LLM completion + scoring.
- Uses the variant as a direct system prompt for synthesis, bypassing
- VoiceDial (the optimization loop owns the full prompt text).
+ Dispatches to stage-specific synthesis logic:
+ - Stages 2-4: call LLM with the variant prompt and fixture input,
+ parse with the stage's schema, then score via score_stage_output()
+ - Stage 5: original flow (synthesis + page scoring)
"""
- from pipeline.schemas import SynthesisResult
from pipeline.stages import _get_stage_config
import json as _json
import openai as _openai
model_override, modality = _get_stage_config(self.stage)
+ schema_class = self.config.get_schema()
- moments_json = _json.dumps(moments, indent=2)
- user_prompt = f"{creator_name}\n\n{moments_json}\n"
+ # Build user prompt from fixture data — stage-specific formatting
+ user_prompt = self._build_user_prompt(fixture)
t0 = time.monotonic()
try:
raw = self.client.complete(
system_prompt=variant_prompt,
user_prompt=user_prompt,
- response_model=SynthesisResult,
+ response_model=schema_class,
modality=modality,
model_override=model_override,
)
@@ -272,48 +274,89 @@ class OptimizationLoop:
elapsed_synth = round(time.monotonic() - t0, 2)
return ScoreResult(
elapsed_seconds=elapsed_synth,
- error=f"Synthesis LLM error: {exc}",
+ error=f"LLM error (stage {self.stage}): {exc}",
)
except Exception as exc:
elapsed_synth = round(time.monotonic() - t0, 2)
- logger.exception("Unexpected error during variant synthesis")
+ logger.exception("Unexpected error during variant synthesis (stage %d)", self.stage)
return ScoreResult(
elapsed_seconds=elapsed_synth,
error=f"Unexpected synthesis error: {exc}",
)
- # Parse synthesis
+ # Parse the LLM response into the stage schema
raw_text = str(raw).strip()
try:
- synthesis = self.client.parse_response(raw_text, SynthesisResult)
+ parsed = self.client.parse_response(raw_text, schema_class)
except Exception as exc:
return ScoreResult(
elapsed_seconds=elapsed_synth,
- error=f"Variant synthesis parse error: {exc}",
+ error=f"Variant parse error (stage {self.stage}): {exc}",
)
- if not synthesis.pages:
+ # Convert parsed output to JSON for the scorer
+ output_json = self._schema_to_output_json(parsed)
+ if output_json is None:
return ScoreResult(
elapsed_seconds=elapsed_synth,
- error="Variant synthesis returned no pages",
+ error=f"Stage {self.stage} produced empty output",
)
- # Score the first page
- page = synthesis.pages[0]
- page_json = {
- "title": page.title,
- "creator_name": creator_name,
- "summary": page.summary,
- "body_sections": [
- {"heading": heading, "content": content}
- for heading, content in page.body_sections.items()
- ],
- }
-
- result = self.scorer.score_page(page_json, moments)
+ # Score using the generic stage scorer
+ result = self.scorer.score_stage_output(
+ stage=self.stage,
+ output_json=output_json,
+ input_json=self._fixture_to_input_json(fixture),
+ )
result.elapsed_seconds = round(result.elapsed_seconds + elapsed_synth, 2)
return result
+ def _build_user_prompt(self, fixture: dict) -> str:
+ """Build a stage-appropriate user prompt from fixture data."""
+ if self.stage == 2:
+ segments_json = json.dumps(fixture["transcript_segments"], indent=2)
+ return f"\n{segments_json}\n"
+
+ elif self.stage == 3:
+ segments_json = json.dumps(fixture["topic_segments"], indent=2)
+ return f"\n{segments_json}\n"
+
+ elif self.stage == 4:
+ moments_json = json.dumps(fixture["extracted_moments"], indent=2)
+ taxonomy = fixture.get("taxonomy", "")
+ prompt = f"\n{moments_json}\n"
+ if taxonomy:
+ prompt += f"\n{taxonomy}"
+ return prompt
+
+ elif self.stage == 5:
+ moments_json = json.dumps(fixture["moments"], indent=2)
+ creator = fixture.get("creator_name", "Unknown")
+ return f"{creator}\n\n{moments_json}\n"
+
+ else:
+ return json.dumps(fixture, indent=2)
+
+ def _schema_to_output_json(self, parsed: object) -> dict | list | None:
+ """Convert a parsed Pydantic schema instance to JSON-serializable dict."""
+ if hasattr(parsed, "model_dump"):
+ return parsed.model_dump()
+ elif hasattr(parsed, "dict"):
+ return parsed.dict()
+ return None
+
+ def _fixture_to_input_json(self, fixture: dict) -> dict | list:
+ """Extract the primary input data from the fixture for scorer context."""
+ if self.stage == 2:
+ return fixture["transcript_segments"]
+ elif self.stage == 3:
+ return fixture["topic_segments"]
+ elif self.stage == 4:
+ return fixture["extracted_moments"]
+ elif self.stage == 5:
+ return fixture["moments"]
+ return fixture
+
def _print_iteration_summary(
self,
iteration: int,
@@ -322,8 +365,9 @@ class OptimizationLoop:
) -> None:
"""Print a compact one-line summary of the current best scores."""
label = "BASELINE" if is_baseline else f"ITER {iteration}"
+ dimensions = self.config.dimensions
dims = " ".join(
- f"{d[:4]}={getattr(score, d):.2f}" for d in DIMENSIONS
+ f"{d[:4]}={score.scores.get(d, 0.0):.2f}" for d in dimensions
)
print(f" [{label}] composite={score.composite:.3f} {dims}")
@@ -334,6 +378,8 @@ class OptimizationLoop:
elapsed: float,
) -> None:
"""Print the final optimization summary."""
+ dimensions = self.config.dimensions
+
print(f"\n{'='*60}")
print(" OPTIMIZATION COMPLETE")
print(f"{'='*60}")
@@ -352,8 +398,8 @@ class OptimizationLoop:
print(f" Improvement: {improvement:.3f} (no gain)")
print(f"\n Per-dimension best scores:")
- for d in DIMENSIONS:
- val = getattr(best_score, d)
+ for d in dimensions:
+ val = best_score.scores.get(d, 0.0)
bar = "█" * int(val * 20) + "░" * (20 - int(val * 20))
print(f" {d.replace('_', ' ').title():25s} {val:.2f} {bar}")
@@ -362,3 +408,8 @@ class OptimizationLoop:
print(f"\n ⚠ {errored} variant(s) errored during scoring")
print(f"{'='*60}\n")
+
+
+# Late import to avoid circular dependency (scorer imports at module level,
+# variant_generator imports scorer)
+from pipeline.quality.variant_generator import PromptVariantGenerator # noqa: E402
diff --git a/backend/pipeline/quality/scorer.py b/backend/pipeline/quality/scorer.py
index 6270e64..3218ffb 100644
--- a/backend/pipeline/quality/scorer.py
+++ b/backend/pipeline/quality/scorer.py
@@ -281,7 +281,7 @@ STAGE_CONFIGS: dict[int, StageConfig] = {
dimensions=["structural", "content_specificity", "voice_preservation", "readability", "factual_fidelity"],
rubric=SCORING_RUBRIC,
format_markers=["SynthesisResult", '"pages"', "body_sections", "title", "summary"],
- fixture_keys=["key_moments", "creator_name"],
+ fixture_keys=["moments", "creator_name"],
prompt_file="stage5_synthesis.txt",
schema_class="SynthesisResult",
),