From 4f4126e0ce3bb00d533327442220b47f8d1c63e4 Mon Sep 17 00:00:00 2001 From: jlightner Date: Wed, 1 Apr 2026 09:24:42 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Generalized=20OptimizationLoop=20to=20s?= =?UTF-8?q?tages=202-5=20with=20per-stage=20fixture=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/quality/optimizer.py" - "backend/pipeline/quality/__main__.py" - "backend/pipeline/quality/scorer.py" - "backend/pipeline/quality/fixtures/sample_segments.json" - "backend/pipeline/quality/fixtures/sample_topic_group.json" - "backend/pipeline/quality/fixtures/sample_classifications.json" GSD-Task: S04/T02 --- backend/pipeline/quality/__main__.py | 27 +-- .../fixtures/sample_classifications.json | 29 +++ .../quality/fixtures/sample_segments.json | 40 +++++ .../quality/fixtures/sample_topic_group.json | 18 ++ backend/pipeline/quality/optimizer.py | 167 ++++++++++++------ backend/pipeline/quality/scorer.py | 2 +- 6 files changed, 213 insertions(+), 70 deletions(-) create mode 100644 backend/pipeline/quality/fixtures/sample_classifications.json create mode 100644 backend/pipeline/quality/fixtures/sample_segments.json create mode 100644 backend/pipeline/quality/fixtures/sample_topic_group.json diff --git a/backend/pipeline/quality/__main__.py b/backend/pipeline/quality/__main__.py index 7b9a1e2..7b1e2b8 100644 --- a/backend/pipeline/quality/__main__.py +++ b/backend/pipeline/quality/__main__.py @@ -20,14 +20,16 @@ from pipeline.llm_client import LLMClient from .fitness import FitnessRunner from .optimizer import OptimizationLoop, OptimizationResult -from .scorer import DIMENSIONS, ScoreRunner +from .scorer import DIMENSIONS, STAGE_CONFIGS, ScoreRunner # ── Reporting helpers ──────────────────────────────────────────────────────── -def print_leaderboard(result: OptimizationResult) -> None: +def print_leaderboard(result: OptimizationResult, stage: int = 5) -> None: """Print a formatted leaderboard of top 5 variants by composite score.""" + dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS + # Filter to entries that actually scored (no errors) scored = [h for h in result.history if not h.get("error")] if not scored: @@ -37,19 +39,20 @@ def print_leaderboard(result: OptimizationResult) -> None: ranked = sorted(scored, key=lambda h: h["composite"], reverse=True)[:5] print(f"\n{'='*72}") - print(" LEADERBOARD — Top 5 Variants by Composite Score") + print(f" LEADERBOARD — Top 5 Variants by Composite Score (Stage {stage})") print(f"{'='*72}") # Header - dim_headers = " ".join(f"{d[:5]:>5s}" for d in DIMENSIONS) + dim_headers = " ".join(f"{d[:5]:>5s}" for d in dims) + sep_segments = " ".join("─" * 5 for _ in dims) print(f" {'#':>2s} {'Label':<16s} {'Comp':>5s} {dim_headers}") - print(f" {'─'*2} {'─'*16} {'─'*5} {'─'*5} {'─'*5} {'─'*5} {'─'*5} {'─'*5}") + print(f" {'─'*2} {'─'*16} {'─'*5} {sep_segments}") for i, entry in enumerate(ranked, 1): label = entry.get("label", "?")[:16] comp = entry["composite"] dim_vals = " ".join( - f"{entry['scores'].get(d, 0.0):5.2f}" for d in DIMENSIONS + f"{entry['scores'].get(d, 0.0):5.2f}" for d in dims ) bar = "█" * int(comp * 20) + "░" * (20 - int(comp * 20)) print(f" {i:>2d} {label:<16s} {comp:5.3f} {dim_vals} {bar}") @@ -135,6 +138,8 @@ def write_results_json( filename = f"optimize_stage{stage}_{timestamp}.json" filepath = out_path / filename + dims = STAGE_CONFIGS[stage].dimensions if stage in STAGE_CONFIGS else DIMENSIONS + payload = { "config": { "stage": stage, @@ -145,7 +150,7 @@ def write_results_json( "best_prompt": result.best_prompt, "best_scores": { "composite": result.best_score.composite, - **{d: getattr(result.best_score, d) for d in DIMENSIONS}, + **{d: result.best_score.scores.get(d, 0.0) for d in dims}, }, "elapsed_seconds": result.elapsed_seconds, "history": result.history, @@ -321,10 +326,10 @@ def _run_score(args: argparse.Namespace) -> int: def _run_optimize(args: argparse.Namespace) -> int: """Execute the optimize subcommand.""" - # Stage validation — only stage 5 is supported - if args.stage != 5: + # Stage validation — stages 2-5 are supported + if args.stage not in STAGE_CONFIGS: print( - f"Error: only stage 5 is supported for optimization (got stage {args.stage})", + f"Error: unsupported stage {args.stage}. Valid stages: {sorted(STAGE_CONFIGS)}", file=sys.stderr, ) return 1 @@ -364,7 +369,7 @@ def _run_optimize(args: argparse.Namespace) -> int: return 1 # Reporting - print_leaderboard(result) + print_leaderboard(result, stage=args.stage) print_trajectory(result) # Write results JSON diff --git a/backend/pipeline/quality/fixtures/sample_classifications.json b/backend/pipeline/quality/fixtures/sample_classifications.json new file mode 100644 index 0000000..c3e1693 --- /dev/null +++ b/backend/pipeline/quality/fixtures/sample_classifications.json @@ -0,0 +1,29 @@ +{ + "extracted_moments": [ + { + "title": "Frequency-specific sidechain with Trackspacer", + "summary": "Using Trackspacer plugin for frequency-band sidechain compression targeting 100-300Hz, allowing bass high-end to remain present while clearing low-mid mud under the kick.", + "content_type": "technique", + "plugins": ["Trackspacer"], + "start_time": 15.2, + "end_time": 52.1 + }, + { + "title": "Parallel drum compression chain", + "summary": "Setting up Ableton's Drum Buss at 40% drive into a return track with Valhalla Room at 1.2s decay, mixed at -12dB for room sound without wash.", + "content_type": "settings", + "plugins": ["Drum Buss", "Valhalla Room"], + "start_time": 52.1, + "end_time": 89.3 + }, + { + "title": "Mono compatibility checking workflow", + "summary": "Using Ableton's Utility plugin on the sub bus to constantly check mono compatibility of layered bass patches, catching phase cancellation before mixdown.", + "content_type": "workflow", + "plugins": ["Utility"], + "start_time": 89.3, + "end_time": 110.0 + } + ], + "taxonomy": "Sound Design > Mixing & Processing" +} diff --git a/backend/pipeline/quality/fixtures/sample_segments.json b/backend/pipeline/quality/fixtures/sample_segments.json new file mode 100644 index 0000000..5ac45fd --- /dev/null +++ b/backend/pipeline/quality/fixtures/sample_segments.json @@ -0,0 +1,40 @@ +{ + "transcript_segments": [ + { + "index": 0, + "start_time": 0.0, + "end_time": 15.2, + "text": "Hey everyone, today we're going to talk about sidechain compression and how I use it in my productions." + }, + { + "index": 1, + "start_time": 15.2, + "end_time": 34.8, + "text": "So the basic idea is you take the kick drum signal and use it to duck the bass. Most people use a compressor for this but I actually prefer Trackspacer because it gives you frequency-specific ducking." + }, + { + "index": 2, + "start_time": 34.8, + "end_time": 52.1, + "text": "With Trackspacer you can set it to only affect 100 to 300 Hz so when the kick hits, the bass ducks just in that low-mid range. The top end stays right there." + }, + { + "index": 3, + "start_time": 52.1, + "end_time": 71.5, + "text": "Now let me show you another technique — parallel compression on drums. I use Drum Buss with the drive at about 40 percent, then send that to a return track." + }, + { + "index": 4, + "start_time": 71.5, + "end_time": 89.3, + "text": "On the return I put Valhalla Room with a short decay, like 1.2 seconds. Mix it in at minus 12 dB. Your drums just breathe — they get this room sound without getting washy." + }, + { + "index": 5, + "start_time": 89.3, + "end_time": 110.0, + "text": "One more thing about mono compatibility. I always have Utility on the sub bus and I flip to mono constantly. If your layered bass sounds thin in mono you've got phase issues." + } + ] +} diff --git a/backend/pipeline/quality/fixtures/sample_topic_group.json b/backend/pipeline/quality/fixtures/sample_topic_group.json new file mode 100644 index 0000000..397af37 --- /dev/null +++ b/backend/pipeline/quality/fixtures/sample_topic_group.json @@ -0,0 +1,18 @@ +{ + "topic_segments": [ + { + "start_index": 0, + "end_index": 2, + "topic_label": "Frequency-specific sidechain compression with Trackspacer", + "summary": "Using Trackspacer for frequency-band sidechain ducking instead of traditional volume compression", + "transcript_text": "Hey everyone, today we're going to talk about sidechain compression and how I use it in my productions. So the basic idea is you take the kick drum signal and use it to duck the bass. Most people use a compressor for this but I actually prefer Trackspacer because it gives you frequency-specific ducking. With Trackspacer you can set it to only affect 100 to 300 Hz so when the kick hits, the bass ducks just in that low-mid range. The top end stays right there." + }, + { + "start_index": 3, + "end_index": 4, + "topic_label": "Parallel drum compression with Drum Buss and Valhalla Room", + "summary": "Setting up a parallel compression chain using Ableton's Drum Buss and Valhalla Room reverb for drum processing", + "transcript_text": "Now let me show you another technique — parallel compression on drums. I use Drum Buss with the drive at about 40 percent, then send that to a return track. On the return I put Valhalla Room with a short decay, like 1.2 seconds. Mix it in at minus 12 dB. Your drums just breathe — they get this room sound without getting washy." + } + ] +} diff --git a/backend/pipeline/quality/optimizer.py b/backend/pipeline/quality/optimizer.py index 7aebb85..a26725f 100644 --- a/backend/pipeline/quality/optimizer.py +++ b/backend/pipeline/quality/optimizer.py @@ -1,4 +1,4 @@ -"""Automated prompt optimization loop for Stage 5 synthesis. +"""Automated prompt optimization loop for pipeline stages 2-5. Orchestrates a generate→score→select cycle: 1. Score the current best prompt against reference fixtures @@ -9,6 +9,7 @@ Orchestrates a generate→score→select cycle: Usage (via CLI): python -m pipeline.quality optimize --stage 5 --iterations 10 + python -m pipeline.quality optimize --stage 3 --iterations 5 --file fixtures/sample_topic_group.json """ from __future__ import annotations @@ -19,8 +20,7 @@ from dataclasses import dataclass, field from pathlib import Path from pipeline.llm_client import LLMClient -from pipeline.quality.scorer import DIMENSIONS, ScoreResult, ScoreRunner -from pipeline.quality.variant_generator import PromptVariantGenerator +from pipeline.quality.scorer import STAGE_CONFIGS, ScoreResult, ScoreRunner logger = logging.getLogger(__name__) @@ -47,9 +47,9 @@ class OptimizationLoop: client: LLMClient instance for LLM calls (synthesis + scoring + variant gen). stage: - Pipeline stage number (currently only 5 is supported). + Pipeline stage number (2-5). fixture_path: - Path to a JSON fixture file containing ``creator_name`` and ``moments``. + Path to a JSON fixture file matching the stage's expected keys. iterations: Number of generate→score→select cycles. variants_per_iter: @@ -64,11 +64,17 @@ class OptimizationLoop: iterations: int = 5, variants_per_iter: int = 2, ) -> None: + if stage not in STAGE_CONFIGS: + raise ValueError( + f"Unsupported stage {stage}. Valid stages: {sorted(STAGE_CONFIGS)}" + ) + self.client = client self.stage = stage self.fixture_path = fixture_path self.iterations = iterations self.variants_per_iter = variants_per_iter + self.config = STAGE_CONFIGS[stage] self.scorer = ScoreRunner(client) self.generator = PromptVariantGenerator(client) @@ -85,9 +91,10 @@ class OptimizationLoop: from pipeline.stages import _load_prompt t0 = time.monotonic() + dimensions = self.config.dimensions - # Load base prompt - prompt_file = f"stage{self.stage}_synthesis.txt" + # Load base prompt using the stage's configured prompt file + prompt_file = self.config.prompt_file try: base_prompt = _load_prompt(prompt_file) except FileNotFoundError: @@ -109,8 +116,6 @@ class OptimizationLoop: elapsed_seconds=round(time.monotonic() - t0, 2), ) - moments = fixture["moments"] - creator_name = fixture["creator_name"] history: list[dict] = [] # Score the baseline @@ -120,11 +125,7 @@ class OptimizationLoop: print(f"{'='*60}\n") print(" Scoring baseline prompt...") - best_score = self.scorer.synthesize_and_score( - moments=moments, - creator_name=creator_name, - voice_level=0.5, - ) + best_score = self._score_variant(base_prompt, fixture) best_prompt = base_prompt history.append({ @@ -133,7 +134,7 @@ class OptimizationLoop: "prompt_text": base_prompt[:200] + "..." if len(base_prompt) > 200 else base_prompt, "prompt_length": len(base_prompt), "composite": best_score.composite, - "scores": {d: getattr(best_score, d) for d in DIMENSIONS}, + "scores": {d: best_score.scores.get(d, 0.0) for d in dimensions}, "error": best_score.error, "label": "baseline", }) @@ -154,11 +155,12 @@ class OptimizationLoop: for iteration in range(1, self.iterations + 1): print(f"\n ── Iteration {iteration}/{self.iterations} ──") - # Generate variants + # Generate variants with stage-appropriate markers variants = self.generator.generate( base_prompt=best_prompt, scores=best_score, n=self.variants_per_iter, + stage=self.stage, ) if not variants: @@ -172,10 +174,7 @@ class OptimizationLoop: for vi, variant_prompt in enumerate(variants): print(f" Scoring variant {vi + 1}/{len(variants)}...") - # Temporarily replace the base prompt with the variant for synthesis - score = self._score_variant( - variant_prompt, moments, creator_name, - ) + score = self._score_variant(variant_prompt, fixture) history.append({ "iteration": iteration, @@ -183,7 +182,7 @@ class OptimizationLoop: "prompt_text": variant_prompt[:200] + "..." if len(variant_prompt) > 200 else variant_prompt, "prompt_length": len(variant_prompt), "composite": score.composite, - "scores": {d: getattr(score, d) for d in DIMENSIONS}, + "scores": {d: score.scores.get(d, 0.0) for d in dimensions}, "error": score.error, "label": f"iter{iteration}_v{vi+1}", }) @@ -223,47 +222,50 @@ class OptimizationLoop: # ── Internal helpers ────────────────────────────────────────────────── def _load_fixture(self) -> dict: - """Load and validate the fixture JSON file.""" + """Load and validate the fixture JSON file against stage-specific keys.""" path = Path(self.fixture_path) if not path.exists(): raise FileNotFoundError(f"Fixture not found: {path}") data = json.loads(path.read_text(encoding="utf-8")) - if "moments" not in data: - raise KeyError("Fixture must contain 'moments' key") - if "creator_name" not in data: - raise KeyError("Fixture must contain 'creator_name' key") + for key in self.config.fixture_keys: + if key not in data: + raise KeyError( + f"Stage {self.stage} fixture must contain '{key}' key " + f"(required: {self.config.fixture_keys})" + ) return data def _score_variant( self, variant_prompt: str, - moments: list[dict], - creator_name: str, + fixture: dict, ) -> ScoreResult: - """Score a variant prompt by running synthesis + scoring. + """Score a variant prompt by running LLM completion + scoring. - Uses the variant as a direct system prompt for synthesis, bypassing - VoiceDial (the optimization loop owns the full prompt text). + Dispatches to stage-specific synthesis logic: + - Stages 2-4: call LLM with the variant prompt and fixture input, + parse with the stage's schema, then score via score_stage_output() + - Stage 5: original flow (synthesis + page scoring) """ - from pipeline.schemas import SynthesisResult from pipeline.stages import _get_stage_config import json as _json import openai as _openai model_override, modality = _get_stage_config(self.stage) + schema_class = self.config.get_schema() - moments_json = _json.dumps(moments, indent=2) - user_prompt = f"{creator_name}\n\n{moments_json}\n" + # Build user prompt from fixture data — stage-specific formatting + user_prompt = self._build_user_prompt(fixture) t0 = time.monotonic() try: raw = self.client.complete( system_prompt=variant_prompt, user_prompt=user_prompt, - response_model=SynthesisResult, + response_model=schema_class, modality=modality, model_override=model_override, ) @@ -272,48 +274,89 @@ class OptimizationLoop: elapsed_synth = round(time.monotonic() - t0, 2) return ScoreResult( elapsed_seconds=elapsed_synth, - error=f"Synthesis LLM error: {exc}", + error=f"LLM error (stage {self.stage}): {exc}", ) except Exception as exc: elapsed_synth = round(time.monotonic() - t0, 2) - logger.exception("Unexpected error during variant synthesis") + logger.exception("Unexpected error during variant synthesis (stage %d)", self.stage) return ScoreResult( elapsed_seconds=elapsed_synth, error=f"Unexpected synthesis error: {exc}", ) - # Parse synthesis + # Parse the LLM response into the stage schema raw_text = str(raw).strip() try: - synthesis = self.client.parse_response(raw_text, SynthesisResult) + parsed = self.client.parse_response(raw_text, schema_class) except Exception as exc: return ScoreResult( elapsed_seconds=elapsed_synth, - error=f"Variant synthesis parse error: {exc}", + error=f"Variant parse error (stage {self.stage}): {exc}", ) - if not synthesis.pages: + # Convert parsed output to JSON for the scorer + output_json = self._schema_to_output_json(parsed) + if output_json is None: return ScoreResult( elapsed_seconds=elapsed_synth, - error="Variant synthesis returned no pages", + error=f"Stage {self.stage} produced empty output", ) - # Score the first page - page = synthesis.pages[0] - page_json = { - "title": page.title, - "creator_name": creator_name, - "summary": page.summary, - "body_sections": [ - {"heading": heading, "content": content} - for heading, content in page.body_sections.items() - ], - } - - result = self.scorer.score_page(page_json, moments) + # Score using the generic stage scorer + result = self.scorer.score_stage_output( + stage=self.stage, + output_json=output_json, + input_json=self._fixture_to_input_json(fixture), + ) result.elapsed_seconds = round(result.elapsed_seconds + elapsed_synth, 2) return result + def _build_user_prompt(self, fixture: dict) -> str: + """Build a stage-appropriate user prompt from fixture data.""" + if self.stage == 2: + segments_json = json.dumps(fixture["transcript_segments"], indent=2) + return f"\n{segments_json}\n" + + elif self.stage == 3: + segments_json = json.dumps(fixture["topic_segments"], indent=2) + return f"\n{segments_json}\n" + + elif self.stage == 4: + moments_json = json.dumps(fixture["extracted_moments"], indent=2) + taxonomy = fixture.get("taxonomy", "") + prompt = f"\n{moments_json}\n" + if taxonomy: + prompt += f"\n{taxonomy}" + return prompt + + elif self.stage == 5: + moments_json = json.dumps(fixture["moments"], indent=2) + creator = fixture.get("creator_name", "Unknown") + return f"{creator}\n\n{moments_json}\n" + + else: + return json.dumps(fixture, indent=2) + + def _schema_to_output_json(self, parsed: object) -> dict | list | None: + """Convert a parsed Pydantic schema instance to JSON-serializable dict.""" + if hasattr(parsed, "model_dump"): + return parsed.model_dump() + elif hasattr(parsed, "dict"): + return parsed.dict() + return None + + def _fixture_to_input_json(self, fixture: dict) -> dict | list: + """Extract the primary input data from the fixture for scorer context.""" + if self.stage == 2: + return fixture["transcript_segments"] + elif self.stage == 3: + return fixture["topic_segments"] + elif self.stage == 4: + return fixture["extracted_moments"] + elif self.stage == 5: + return fixture["moments"] + return fixture + def _print_iteration_summary( self, iteration: int, @@ -322,8 +365,9 @@ class OptimizationLoop: ) -> None: """Print a compact one-line summary of the current best scores.""" label = "BASELINE" if is_baseline else f"ITER {iteration}" + dimensions = self.config.dimensions dims = " ".join( - f"{d[:4]}={getattr(score, d):.2f}" for d in DIMENSIONS + f"{d[:4]}={score.scores.get(d, 0.0):.2f}" for d in dimensions ) print(f" [{label}] composite={score.composite:.3f} {dims}") @@ -334,6 +378,8 @@ class OptimizationLoop: elapsed: float, ) -> None: """Print the final optimization summary.""" + dimensions = self.config.dimensions + print(f"\n{'='*60}") print(" OPTIMIZATION COMPLETE") print(f"{'='*60}") @@ -352,8 +398,8 @@ class OptimizationLoop: print(f" Improvement: {improvement:.3f} (no gain)") print(f"\n Per-dimension best scores:") - for d in DIMENSIONS: - val = getattr(best_score, d) + for d in dimensions: + val = best_score.scores.get(d, 0.0) bar = "█" * int(val * 20) + "░" * (20 - int(val * 20)) print(f" {d.replace('_', ' ').title():25s} {val:.2f} {bar}") @@ -362,3 +408,8 @@ class OptimizationLoop: print(f"\n ⚠ {errored} variant(s) errored during scoring") print(f"{'='*60}\n") + + +# Late import to avoid circular dependency (scorer imports at module level, +# variant_generator imports scorer) +from pipeline.quality.variant_generator import PromptVariantGenerator # noqa: E402 diff --git a/backend/pipeline/quality/scorer.py b/backend/pipeline/quality/scorer.py index 6270e64..3218ffb 100644 --- a/backend/pipeline/quality/scorer.py +++ b/backend/pipeline/quality/scorer.py @@ -281,7 +281,7 @@ STAGE_CONFIGS: dict[int, StageConfig] = { dimensions=["structural", "content_specificity", "voice_preservation", "readability", "factual_fidelity"], rubric=SCORING_RUBRIC, format_markers=["SynthesisResult", '"pages"', "body_sections", "title", "summary"], - fixture_keys=["key_moments", "creator_name"], + fixture_keys=["moments", "creator_name"], prompt_file="stage5_synthesis.txt", schema_class="SynthesisResult", ),