Four fully authenticated endpoints at /api/export/experiments/{id}/:
- /best: Returns best config as JSON with weighted score and metadata
- /env: Flattened KEY=VALUE format with metadata comments
- /yaml: Simple YAML serialization (no external dependency)
- /report: Full markdown report with config space, top N configs,
score distributions, token usage, and timing stats
34 tests in test_export.py covering all endpoints, auth, 404s, and helpers.
Updated test_routers.py to expect 401 (auth required) instead of 501 (stub).
387 lines
13 KiB
Python
387 lines
13 KiB
Python
"""Export router — export experiment results in various formats."""
|
|
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from auth import get_current_user
|
|
from main import get_db
|
|
from models import Experiment, Run, RunStatus, Score, StageResult, User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _get_experiment_or_404(db: Session, experiment_id: uuid.UUID) -> Experiment:
|
|
experiment = db.query(Experiment).filter(Experiment.id == experiment_id).first()
|
|
if experiment is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Experiment not found")
|
|
return experiment
|
|
|
|
|
|
def _get_scoring_weights(experiment: Experiment) -> dict[str, float]:
|
|
weights: dict[str, float] = {}
|
|
if experiment.scoring_config and isinstance(experiment.scoring_config, dict):
|
|
weights = experiment.scoring_config.get("weights", {})
|
|
return weights
|
|
|
|
|
|
def _compute_weighted_score(scores: list[Score], weights: dict[str, float]) -> float:
|
|
"""Compute weighted score for a run's scores."""
|
|
if not scores:
|
|
return 0.0
|
|
|
|
score_map: dict[str, float] = {}
|
|
for s in scores:
|
|
score_map[s.scorer_name] = s.value
|
|
|
|
if weights:
|
|
total_weight = sum(weights.get(name, 0.0) for name in score_map)
|
|
if total_weight > 0:
|
|
return sum(
|
|
score_map[name] * weights.get(name, 0.0)
|
|
for name in score_map
|
|
if name in weights
|
|
) / total_weight
|
|
else:
|
|
return sum(score_map.values()) / len(score_map)
|
|
else:
|
|
return sum(score_map.values()) / len(score_map)
|
|
|
|
|
|
def _get_best_run(db: Session, experiment: Experiment) -> Run | None:
|
|
"""Return the best completed run by weighted score, or None."""
|
|
weights = _get_scoring_weights(experiment)
|
|
runs = (
|
|
db.query(Run)
|
|
.options(joinedload(Run.scores))
|
|
.filter(Run.experiment_id == experiment.id, Run.status == RunStatus.completed)
|
|
.all()
|
|
)
|
|
|
|
best_run = None
|
|
best_score = -1.0
|
|
for run in runs:
|
|
if not run.scores:
|
|
continue
|
|
ws = _compute_weighted_score(run.scores, weights)
|
|
if ws > best_score:
|
|
best_score = ws
|
|
best_run = run
|
|
|
|
return best_run
|
|
|
|
|
|
def _build_best_config_payload(experiment: Experiment, run: Run, weights: dict[str, float]) -> dict:
|
|
"""Build the metadata+config dict for the best run."""
|
|
score_map = {s.scorer_name: s.value for s in run.scores}
|
|
return {
|
|
"experiment_name": experiment.name,
|
|
"experiment_id": str(experiment.id),
|
|
"exported_at": datetime.now(timezone.utc).isoformat(),
|
|
"weighted_score": _compute_weighted_score(run.scores, weights),
|
|
"scores": score_map,
|
|
"run_id": str(run.id),
|
|
"config_hash": run.config_hash,
|
|
"config": run.config,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Export Best Config — JSON
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/experiments/{experiment_id}/best")
|
|
def export_best(
|
|
experiment_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
"""Best config as JSON."""
|
|
experiment = _get_experiment_or_404(db, experiment_id)
|
|
best_run = _get_best_run(db, experiment)
|
|
if best_run is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No completed runs with scores found",
|
|
)
|
|
|
|
weights = _get_scoring_weights(experiment)
|
|
payload = _build_best_config_payload(experiment, best_run, weights)
|
|
return payload
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Export Best Config — .env
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _flatten_dict(d: dict, prefix: str = "") -> dict[str, str]:
|
|
"""Flatten nested dict into KEY=value pairs for .env format."""
|
|
items: dict[str, str] = {}
|
|
for k, v in d.items():
|
|
key = f"{prefix}{k}".upper() if prefix else k.upper()
|
|
if isinstance(v, dict):
|
|
items.update(_flatten_dict(v, f"{key}_"))
|
|
elif isinstance(v, list):
|
|
items[key] = json.dumps(v)
|
|
else:
|
|
items[key] = str(v)
|
|
return items
|
|
|
|
|
|
@router.get("/experiments/{experiment_id}/env")
|
|
def export_env(
|
|
experiment_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
"""Best config as .env snippet."""
|
|
experiment = _get_experiment_or_404(db, experiment_id)
|
|
best_run = _get_best_run(db, experiment)
|
|
if best_run is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No completed runs with scores found",
|
|
)
|
|
|
|
weights = _get_scoring_weights(experiment)
|
|
payload = _build_best_config_payload(experiment, best_run, weights)
|
|
|
|
lines = [
|
|
f"# PromptLooper — Best config for: {experiment.name}",
|
|
f"# Exported: {payload['exported_at']}",
|
|
f"# Weighted score: {payload['weighted_score']:.4f}",
|
|
f"# Run ID: {payload['run_id']}",
|
|
"",
|
|
]
|
|
|
|
flat = _flatten_dict(payload["config"])
|
|
for key, value in sorted(flat.items()):
|
|
lines.append(f"{key}={value}")
|
|
|
|
content = "\n".join(lines) + "\n"
|
|
return Response(content=content, media_type="text/plain")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Export Best Config — YAML
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _dict_to_yaml(d: dict, indent: int = 0) -> str:
|
|
"""Simple YAML serializer for config dicts (no external dependency)."""
|
|
lines: list[str] = []
|
|
prefix = " " * indent
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
lines.append(f"{prefix}{k}:")
|
|
lines.append(_dict_to_yaml(v, indent + 1))
|
|
elif isinstance(v, list):
|
|
lines.append(f"{prefix}{k}:")
|
|
for item in v:
|
|
if isinstance(item, dict):
|
|
lines.append(f"{prefix} -")
|
|
lines.append(_dict_to_yaml(item, indent + 2))
|
|
else:
|
|
lines.append(f"{prefix} - {item}")
|
|
elif isinstance(v, bool):
|
|
lines.append(f"{prefix}{k}: {'true' if v else 'false'}")
|
|
elif v is None:
|
|
lines.append(f"{prefix}{k}: null")
|
|
else:
|
|
lines.append(f"{prefix}{k}: {v}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@router.get("/experiments/{experiment_id}/yaml")
|
|
def export_yaml(
|
|
experiment_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
"""Best config as YAML."""
|
|
experiment = _get_experiment_or_404(db, experiment_id)
|
|
best_run = _get_best_run(db, experiment)
|
|
if best_run is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No completed runs with scores found",
|
|
)
|
|
|
|
weights = _get_scoring_weights(experiment)
|
|
payload = _build_best_config_payload(experiment, best_run, weights)
|
|
|
|
header = (
|
|
f"# PromptLooper — Best config for: {experiment.name}\n"
|
|
f"# Exported: {payload['exported_at']}\n"
|
|
f"# Weighted score: {payload['weighted_score']:.4f}\n"
|
|
f"# Run ID: {payload['run_id']}\n\n"
|
|
)
|
|
content = header + _dict_to_yaml(payload) + "\n"
|
|
return Response(content=content, media_type="text/yaml")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Export Report — Markdown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/experiments/{experiment_id}/report")
|
|
def export_report(
|
|
experiment_id: uuid.UUID,
|
|
top_n: int = Query(5, ge=1, le=50),
|
|
db: Session = Depends(get_db),
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
"""Full experiment report (markdown)."""
|
|
experiment = _get_experiment_or_404(db, experiment_id)
|
|
|
|
runs = (
|
|
db.query(Run)
|
|
.options(joinedload(Run.scores), joinedload(Run.stage_results))
|
|
.filter(Run.experiment_id == experiment_id)
|
|
.all()
|
|
)
|
|
|
|
weights = _get_scoring_weights(experiment)
|
|
completed = [r for r in runs if r.status == RunStatus.completed]
|
|
failed = [r for r in runs if r.status == RunStatus.failed]
|
|
|
|
# Compute scored entries
|
|
scored_entries: list[tuple[Run, float]] = []
|
|
for run in completed:
|
|
if run.scores:
|
|
ws = _compute_weighted_score(run.scores, weights)
|
|
scored_entries.append((run, ws))
|
|
scored_entries.sort(key=lambda e: e[1], reverse=True)
|
|
|
|
# Collect all scorer names
|
|
all_scorer_names: set[str] = set()
|
|
for run in completed:
|
|
for s in run.scores:
|
|
all_scorer_names.add(s.scorer_name)
|
|
|
|
# Score distributions per scorer
|
|
score_values: dict[str, list[float]] = {name: [] for name in sorted(all_scorer_names)}
|
|
for run in completed:
|
|
for s in run.scores:
|
|
score_values[s.scorer_name].append(s.value)
|
|
|
|
# Token and timing stats
|
|
total_tokens_in = sum(r.tokens_in or 0 for r in runs)
|
|
total_tokens_out = sum(r.tokens_out or 0 for r in runs)
|
|
durations = [r.duration_ms for r in completed if r.duration_ms is not None]
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
lines: list[str] = []
|
|
lines.append(f"# Experiment Report: {experiment.name}")
|
|
lines.append("")
|
|
lines.append(f"**Generated:** {now} ")
|
|
lines.append(f"**Experiment ID:** `{experiment.id}` ")
|
|
if experiment.description:
|
|
lines.append(f"**Description:** {experiment.description} ")
|
|
lines.append(f"**Status:** {experiment.status.value if hasattr(experiment.status, 'value') else experiment.status} ")
|
|
lines.append("")
|
|
|
|
# Config space
|
|
lines.append("## Configuration Space")
|
|
lines.append("")
|
|
if experiment.parameter_space:
|
|
lines.append("```json")
|
|
lines.append(json.dumps(experiment.parameter_space, indent=2))
|
|
lines.append("```")
|
|
else:
|
|
lines.append("_No parameter space defined._")
|
|
lines.append("")
|
|
|
|
# Run summary
|
|
lines.append("## Run Summary")
|
|
lines.append("")
|
|
lines.append(f"| Metric | Value |")
|
|
lines.append(f"|--------|-------|")
|
|
lines.append(f"| Total runs | {len(runs)} |")
|
|
lines.append(f"| Completed | {len(completed)} |")
|
|
lines.append(f"| Failed | {len(failed)} |")
|
|
lines.append(f"| Scored | {len(scored_entries)} |")
|
|
lines.append("")
|
|
|
|
# Top N configs
|
|
lines.append(f"## Top {min(top_n, len(scored_entries))} Configurations")
|
|
lines.append("")
|
|
if scored_entries:
|
|
lines.append("| Rank | Run ID | Weighted Score | Config Hash |")
|
|
lines.append("|------|--------|---------------|-------------|")
|
|
for i, (run, ws) in enumerate(scored_entries[:top_n], 1):
|
|
lines.append(f"| {i} | `{str(run.id)[:8]}...` | {ws:.4f} | `{run.config_hash[:12]}...` |")
|
|
lines.append("")
|
|
|
|
# Detail for top entry
|
|
best_run, best_score = scored_entries[0]
|
|
lines.append("### Best Configuration Detail")
|
|
lines.append("")
|
|
lines.append("```json")
|
|
lines.append(json.dumps(best_run.config, indent=2))
|
|
lines.append("```")
|
|
lines.append("")
|
|
|
|
lines.append("**Scores:**")
|
|
lines.append("")
|
|
for s in best_run.scores:
|
|
lines.append(f"- **{s.scorer_name}:** {s.value:.4f}")
|
|
lines.append("")
|
|
else:
|
|
lines.append("_No scored runs available._")
|
|
lines.append("")
|
|
|
|
# Score distributions
|
|
if score_values:
|
|
lines.append("## Score Distributions")
|
|
lines.append("")
|
|
lines.append("| Scorer | Min | Max | Mean | Count |")
|
|
lines.append("|--------|-----|-----|------|-------|")
|
|
for name in sorted(score_values.keys()):
|
|
vals = score_values[name]
|
|
if vals:
|
|
lines.append(
|
|
f"| {name} | {min(vals):.4f} | {max(vals):.4f} | "
|
|
f"{sum(vals)/len(vals):.4f} | {len(vals)} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Token usage
|
|
lines.append("## Token Usage")
|
|
lines.append("")
|
|
lines.append(f"| Metric | Value |")
|
|
lines.append(f"|--------|-------|")
|
|
lines.append(f"| Total tokens in | {total_tokens_in:,} |")
|
|
lines.append(f"| Total tokens out | {total_tokens_out:,} |")
|
|
lines.append(f"| Total tokens | {total_tokens_in + total_tokens_out:,} |")
|
|
lines.append("")
|
|
|
|
# Timing stats
|
|
lines.append("## Timing")
|
|
lines.append("")
|
|
if durations:
|
|
avg_ms = sum(durations) / len(durations)
|
|
lines.append(f"| Metric | Value |")
|
|
lines.append(f"|--------|-------|")
|
|
lines.append(f"| Fastest run | {min(durations):,} ms |")
|
|
lines.append(f"| Slowest run | {max(durations):,} ms |")
|
|
lines.append(f"| Average | {avg_ms:,.0f} ms |")
|
|
lines.append(f"| Total time | {sum(durations):,} ms |")
|
|
else:
|
|
lines.append("_No timing data available._")
|
|
lines.append("")
|
|
|
|
content = "\n".join(lines)
|
|
return Response(content=content, media_type="text/markdown")
|