promptlooper/backend/routers/export.py
John Lightner e42117c8ee MAESTRO: Implement export router with JSON, .env, YAML, and markdown report endpoints
Four fully authenticated endpoints at /api/export/experiments/{id}/:
- /best: Returns best config as JSON with weighted score and metadata
- /env: Flattened KEY=VALUE format with metadata comments
- /yaml: Simple YAML serialization (no external dependency)
- /report: Full markdown report with config space, top N configs,
  score distributions, token usage, and timing stats

34 tests in test_export.py covering all endpoints, auth, 404s, and helpers.
Updated test_routers.py to expect 401 (auth required) instead of 501 (stub).
2026-04-07 03:30:45 -05:00

387 lines
13 KiB
Python

"""Export router — export experiment results in various formats."""
import json
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from sqlalchemy.orm import Session, joinedload
from auth import get_current_user
from main import get_db
from models import Experiment, Run, RunStatus, Score, StageResult, User
router = APIRouter()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_experiment_or_404(db: Session, experiment_id: uuid.UUID) -> Experiment:
experiment = db.query(Experiment).filter(Experiment.id == experiment_id).first()
if experiment is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Experiment not found")
return experiment
def _get_scoring_weights(experiment: Experiment) -> dict[str, float]:
weights: dict[str, float] = {}
if experiment.scoring_config and isinstance(experiment.scoring_config, dict):
weights = experiment.scoring_config.get("weights", {})
return weights
def _compute_weighted_score(scores: list[Score], weights: dict[str, float]) -> float:
"""Compute weighted score for a run's scores."""
if not scores:
return 0.0
score_map: dict[str, float] = {}
for s in scores:
score_map[s.scorer_name] = s.value
if weights:
total_weight = sum(weights.get(name, 0.0) for name in score_map)
if total_weight > 0:
return sum(
score_map[name] * weights.get(name, 0.0)
for name in score_map
if name in weights
) / total_weight
else:
return sum(score_map.values()) / len(score_map)
else:
return sum(score_map.values()) / len(score_map)
def _get_best_run(db: Session, experiment: Experiment) -> Run | None:
"""Return the best completed run by weighted score, or None."""
weights = _get_scoring_weights(experiment)
runs = (
db.query(Run)
.options(joinedload(Run.scores))
.filter(Run.experiment_id == experiment.id, Run.status == RunStatus.completed)
.all()
)
best_run = None
best_score = -1.0
for run in runs:
if not run.scores:
continue
ws = _compute_weighted_score(run.scores, weights)
if ws > best_score:
best_score = ws
best_run = run
return best_run
def _build_best_config_payload(experiment: Experiment, run: Run, weights: dict[str, float]) -> dict:
"""Build the metadata+config dict for the best run."""
score_map = {s.scorer_name: s.value for s in run.scores}
return {
"experiment_name": experiment.name,
"experiment_id": str(experiment.id),
"exported_at": datetime.now(timezone.utc).isoformat(),
"weighted_score": _compute_weighted_score(run.scores, weights),
"scores": score_map,
"run_id": str(run.id),
"config_hash": run.config_hash,
"config": run.config,
}
# ---------------------------------------------------------------------------
# Export Best Config — JSON
# ---------------------------------------------------------------------------
@router.get("/experiments/{experiment_id}/best")
def export_best(
experiment_id: uuid.UUID,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Best config as JSON."""
experiment = _get_experiment_or_404(db, experiment_id)
best_run = _get_best_run(db, experiment)
if best_run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No completed runs with scores found",
)
weights = _get_scoring_weights(experiment)
payload = _build_best_config_payload(experiment, best_run, weights)
return payload
# ---------------------------------------------------------------------------
# Export Best Config — .env
# ---------------------------------------------------------------------------
def _flatten_dict(d: dict, prefix: str = "") -> dict[str, str]:
"""Flatten nested dict into KEY=value pairs for .env format."""
items: dict[str, str] = {}
for k, v in d.items():
key = f"{prefix}{k}".upper() if prefix else k.upper()
if isinstance(v, dict):
items.update(_flatten_dict(v, f"{key}_"))
elif isinstance(v, list):
items[key] = json.dumps(v)
else:
items[key] = str(v)
return items
@router.get("/experiments/{experiment_id}/env")
def export_env(
experiment_id: uuid.UUID,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Best config as .env snippet."""
experiment = _get_experiment_or_404(db, experiment_id)
best_run = _get_best_run(db, experiment)
if best_run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No completed runs with scores found",
)
weights = _get_scoring_weights(experiment)
payload = _build_best_config_payload(experiment, best_run, weights)
lines = [
f"# PromptLooper — Best config for: {experiment.name}",
f"# Exported: {payload['exported_at']}",
f"# Weighted score: {payload['weighted_score']:.4f}",
f"# Run ID: {payload['run_id']}",
"",
]
flat = _flatten_dict(payload["config"])
for key, value in sorted(flat.items()):
lines.append(f"{key}={value}")
content = "\n".join(lines) + "\n"
return Response(content=content, media_type="text/plain")
# ---------------------------------------------------------------------------
# Export Best Config — YAML
# ---------------------------------------------------------------------------
def _dict_to_yaml(d: dict, indent: int = 0) -> str:
"""Simple YAML serializer for config dicts (no external dependency)."""
lines: list[str] = []
prefix = " " * indent
for k, v in d.items():
if isinstance(v, dict):
lines.append(f"{prefix}{k}:")
lines.append(_dict_to_yaml(v, indent + 1))
elif isinstance(v, list):
lines.append(f"{prefix}{k}:")
for item in v:
if isinstance(item, dict):
lines.append(f"{prefix} -")
lines.append(_dict_to_yaml(item, indent + 2))
else:
lines.append(f"{prefix} - {item}")
elif isinstance(v, bool):
lines.append(f"{prefix}{k}: {'true' if v else 'false'}")
elif v is None:
lines.append(f"{prefix}{k}: null")
else:
lines.append(f"{prefix}{k}: {v}")
return "\n".join(lines)
@router.get("/experiments/{experiment_id}/yaml")
def export_yaml(
experiment_id: uuid.UUID,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Best config as YAML."""
experiment = _get_experiment_or_404(db, experiment_id)
best_run = _get_best_run(db, experiment)
if best_run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No completed runs with scores found",
)
weights = _get_scoring_weights(experiment)
payload = _build_best_config_payload(experiment, best_run, weights)
header = (
f"# PromptLooper — Best config for: {experiment.name}\n"
f"# Exported: {payload['exported_at']}\n"
f"# Weighted score: {payload['weighted_score']:.4f}\n"
f"# Run ID: {payload['run_id']}\n\n"
)
content = header + _dict_to_yaml(payload) + "\n"
return Response(content=content, media_type="text/yaml")
# ---------------------------------------------------------------------------
# Export Report — Markdown
# ---------------------------------------------------------------------------
@router.get("/experiments/{experiment_id}/report")
def export_report(
experiment_id: uuid.UUID,
top_n: int = Query(5, ge=1, le=50),
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Full experiment report (markdown)."""
experiment = _get_experiment_or_404(db, experiment_id)
runs = (
db.query(Run)
.options(joinedload(Run.scores), joinedload(Run.stage_results))
.filter(Run.experiment_id == experiment_id)
.all()
)
weights = _get_scoring_weights(experiment)
completed = [r for r in runs if r.status == RunStatus.completed]
failed = [r for r in runs if r.status == RunStatus.failed]
# Compute scored entries
scored_entries: list[tuple[Run, float]] = []
for run in completed:
if run.scores:
ws = _compute_weighted_score(run.scores, weights)
scored_entries.append((run, ws))
scored_entries.sort(key=lambda e: e[1], reverse=True)
# Collect all scorer names
all_scorer_names: set[str] = set()
for run in completed:
for s in run.scores:
all_scorer_names.add(s.scorer_name)
# Score distributions per scorer
score_values: dict[str, list[float]] = {name: [] for name in sorted(all_scorer_names)}
for run in completed:
for s in run.scores:
score_values[s.scorer_name].append(s.value)
# Token and timing stats
total_tokens_in = sum(r.tokens_in or 0 for r in runs)
total_tokens_out = sum(r.tokens_out or 0 for r in runs)
durations = [r.duration_ms for r in completed if r.duration_ms is not None]
now = datetime.now(timezone.utc).isoformat()
lines: list[str] = []
lines.append(f"# Experiment Report: {experiment.name}")
lines.append("")
lines.append(f"**Generated:** {now} ")
lines.append(f"**Experiment ID:** `{experiment.id}` ")
if experiment.description:
lines.append(f"**Description:** {experiment.description} ")
lines.append(f"**Status:** {experiment.status.value if hasattr(experiment.status, 'value') else experiment.status} ")
lines.append("")
# Config space
lines.append("## Configuration Space")
lines.append("")
if experiment.parameter_space:
lines.append("```json")
lines.append(json.dumps(experiment.parameter_space, indent=2))
lines.append("```")
else:
lines.append("_No parameter space defined._")
lines.append("")
# Run summary
lines.append("## Run Summary")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Total runs | {len(runs)} |")
lines.append(f"| Completed | {len(completed)} |")
lines.append(f"| Failed | {len(failed)} |")
lines.append(f"| Scored | {len(scored_entries)} |")
lines.append("")
# Top N configs
lines.append(f"## Top {min(top_n, len(scored_entries))} Configurations")
lines.append("")
if scored_entries:
lines.append("| Rank | Run ID | Weighted Score | Config Hash |")
lines.append("|------|--------|---------------|-------------|")
for i, (run, ws) in enumerate(scored_entries[:top_n], 1):
lines.append(f"| {i} | `{str(run.id)[:8]}...` | {ws:.4f} | `{run.config_hash[:12]}...` |")
lines.append("")
# Detail for top entry
best_run, best_score = scored_entries[0]
lines.append("### Best Configuration Detail")
lines.append("")
lines.append("```json")
lines.append(json.dumps(best_run.config, indent=2))
lines.append("```")
lines.append("")
lines.append("**Scores:**")
lines.append("")
for s in best_run.scores:
lines.append(f"- **{s.scorer_name}:** {s.value:.4f}")
lines.append("")
else:
lines.append("_No scored runs available._")
lines.append("")
# Score distributions
if score_values:
lines.append("## Score Distributions")
lines.append("")
lines.append("| Scorer | Min | Max | Mean | Count |")
lines.append("|--------|-----|-----|------|-------|")
for name in sorted(score_values.keys()):
vals = score_values[name]
if vals:
lines.append(
f"| {name} | {min(vals):.4f} | {max(vals):.4f} | "
f"{sum(vals)/len(vals):.4f} | {len(vals)} |"
)
lines.append("")
# Token usage
lines.append("## Token Usage")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Total tokens in | {total_tokens_in:,} |")
lines.append(f"| Total tokens out | {total_tokens_out:,} |")
lines.append(f"| Total tokens | {total_tokens_in + total_tokens_out:,} |")
lines.append("")
# Timing stats
lines.append("## Timing")
lines.append("")
if durations:
avg_ms = sum(durations) / len(durations)
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Fastest run | {min(durations):,} ms |")
lines.append(f"| Slowest run | {max(durations):,} ms |")
lines.append(f"| Average | {avg_ms:,.0f} ms |")
lines.append(f"| Total time | {sum(durations):,} ms |")
else:
lines.append("_No timing data available._")
lines.append("")
content = "\n".join(lines)
return Response(content=content, media_type="text/markdown")