MAESTRO: Create backend/models.py with all 8 SQLAlchemy ORM models from spec

Define User, Project, Experiment, Run, StageResult, Score, ResponseCache,
and WebhookConfig with UUID primary keys, JSON columns, enum types
(ExperimentStatus, RunStatus), full relationship cascades, and indexes.
Uses sqlalchemy.JSON (not JSONB) for SQLite compatibility in single-container
mode. 16 tests added covering table creation, CRUD, uniqueness constraints,
default values, and cascade deletes — all passing.
This commit is contained in:
John Lightner 2026-04-07 01:49:10 -05:00
parent 309bbacb5d
commit 7ef116e2f9
3 changed files with 637 additions and 1 deletions

View file

@ -20,7 +20,8 @@ Set up the PromptLooper repository, Docker infrastructure, and basic project ske
- [x] Create backend/config.py using Pydantic Settings. Define all configuration from the Environment Variables table. Implement the SQLite fallback logic: when DATABASE_URL is not set, construct a SQLite URL pointing to DATA_DIR/promptlooper.db. When REDIS_URL is not set, set a flag for in-process mode. - [x] Create backend/config.py using Pydantic Settings. Define all configuration from the Environment Variables table. Implement the SQLite fallback logic: when DATABASE_URL is not set, construct a SQLite URL pointing to DATA_DIR/promptlooper.db. When REDIS_URL is not set, set a flag for in-process mode.
> Created backend/config.py with Pydantic Settings class defining all 13 env vars. SQLite fallback via `effective_database_url` property constructs sqlite:///DATA_DIR/promptlooper.db when DATABASE_URL is unset. `use_in_process_queue` property flags in-process mode when REDIS_URL is absent. JWT_SECRET auto-generates via `secrets.token_urlsafe(32)` when not provided. Empty API_KEY strings normalize to None. 13 tests in tests/test_config.py all passing. > Created backend/config.py with Pydantic Settings class defining all 13 env vars. SQLite fallback via `effective_database_url` property constructs sqlite:///DATA_DIR/promptlooper.db when DATABASE_URL is unset. `use_in_process_queue` property flags in-process mode when REDIS_URL is absent. JWT_SECRET auto-generates via `secrets.token_urlsafe(32)` when not provided. Empty API_KEY strings normalize to None. 13 tests in tests/test_config.py all passing.
- [ ] Create backend/models.py with all SQLAlchemy ORM models from the spec's Data Model section: User, Project, Experiment, Run, StageResult, Score, ResponseCache, and WebhookConfig. Include all fields, types, relationships, and indexes. Use UUID primary keys and JSONB for flexible fields. - [x] Create backend/models.py with all SQLAlchemy ORM models from the spec's Data Model section: User, Project, Experiment, Run, StageResult, Score, ResponseCache, and WebhookConfig. Include all fields, types, relationships, and indexes. Use UUID primary keys and JSONB for flexible fields.
> Created all 8 ORM models with UUID PKs, JSON columns (using sqlalchemy.JSON for SQLite compatibility — maps to JSONB on PostgreSQL), enum types (ExperimentStatus, RunStatus), full relationship definitions with cascade deletes, and indexes on foreign keys and commonly filtered columns. Score.metadata mapped as `scorer_metadata` Python attribute (column name stays "metadata") to avoid SQLAlchemy reserved name conflict. 16 tests in tests/test_models.py all passing.
- [ ] Set up Alembic: create alembic.ini and alembic/env.py configured to read DATABASE_URL from the config. Generate and apply the initial migration from the models. - [ ] Set up Alembic: create alembic.ini and alembic/env.py configured to read DATABASE_URL from the config. Generate and apply the initial migration from the models.

276
backend/models.py Normal file
View file

@ -0,0 +1,276 @@
"""PromptLooper SQLAlchemy ORM models."""
import enum
import uuid
from datetime import datetime, timezone
from sqlalchemy import (
JSON,
Boolean,
DateTime,
Enum,
Float,
ForeignKey,
Index,
Integer,
Numeric,
String,
Text,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _new_uuid() -> uuid.UUID:
return uuid.uuid4()
# ---------------------------------------------------------------------------
# Base
# ---------------------------------------------------------------------------
class Base(DeclarativeBase):
"""Shared declarative base for all models."""
type_annotation_map = {
dict: JSON,
}
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class ExperimentStatus(str, enum.Enum):
draft = "draft"
running = "running"
paused = "paused"
completed = "completed"
class RunStatus(str, enum.Enum):
pending = "pending"
running = "running"
completed = "completed"
failed = "failed"
cached = "cached"
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
username: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, nullable=False
)
# Relationships
projects: Mapped[list["Project"]] = relationship(
back_populates="owner", cascade="all, delete-orphan"
)
class Project(Base):
__tablename__ = "projects"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
owner_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, onupdate=_utcnow, nullable=False
)
# Relationships
owner: Mapped["User"] = relationship(back_populates="projects")
experiments: Mapped[list["Experiment"]] = relationship(
back_populates="project", cascade="all, delete-orphan"
)
class Experiment(Base):
__tablename__ = "experiments"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
project_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
sample_data: Mapped[dict | None] = mapped_column(JSON, nullable=True)
pipeline_stages: Mapped[dict | None] = mapped_column(JSON, nullable=True)
scoring_config: Mapped[dict | None] = mapped_column(JSON, nullable=True)
parameter_space: Mapped[dict | None] = mapped_column(JSON, nullable=True)
status: Mapped[ExperimentStatus] = mapped_column(
Enum(ExperimentStatus, name="experiment_status"),
default=ExperimentStatus.draft,
nullable=False,
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, onupdate=_utcnow, nullable=False
)
# Relationships
project: Mapped["Project"] = relationship(back_populates="experiments")
runs: Mapped[list["Run"]] = relationship(
back_populates="experiment", cascade="all, delete-orphan"
)
__table_args__ = (
Index("ix_experiments_project_id", "project_id"),
Index("ix_experiments_status", "status"),
)
class Run(Base):
__tablename__ = "runs"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
experiment_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("experiments.id", ondelete="CASCADE"), nullable=False
)
config_hash: Mapped[str] = mapped_column(String(64), nullable=False)
config: Mapped[dict] = mapped_column(JSON, nullable=False)
status: Mapped[RunStatus] = mapped_column(
Enum(RunStatus, name="run_status"),
default=RunStatus.pending,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
completed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
tokens_in: Mapped[int | None] = mapped_column(Integer, nullable=True)
tokens_out: Mapped[int | None] = mapped_column(Integer, nullable=True)
cost_estimate: Mapped[float | None] = mapped_column(
Numeric(precision=12, scale=6), nullable=True
)
# Relationships
experiment: Mapped["Experiment"] = relationship(back_populates="runs")
stage_results: Mapped[list["StageResult"]] = relationship(
back_populates="run", cascade="all, delete-orphan"
)
scores: Mapped[list["Score"]] = relationship(
back_populates="run", cascade="all, delete-orphan"
)
__table_args__ = (
Index("ix_runs_experiment_id", "experiment_id"),
Index("ix_runs_config_hash", "config_hash"),
Index("ix_runs_status", "status"),
)
class StageResult(Base):
__tablename__ = "stage_results"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
run_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("runs.id", ondelete="CASCADE"), nullable=False
)
stage_index: Mapped[int] = mapped_column(Integer, nullable=False)
prompt_sent: Mapped[str] = mapped_column(Text, nullable=False)
response_raw: Mapped[str] = mapped_column(Text, nullable=False)
model_used: Mapped[str] = mapped_column(String(255), nullable=False)
parameters: Mapped[dict | None] = mapped_column(JSON, nullable=True)
tokens_in: Mapped[int | None] = mapped_column(Integer, nullable=True)
tokens_out: Mapped[int | None] = mapped_column(Integer, nullable=True)
latency_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
# Relationships
run: Mapped["Run"] = relationship(back_populates="stage_results")
__table_args__ = (
Index("ix_stage_results_run_id", "run_id"),
)
class Score(Base):
__tablename__ = "scores"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
run_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("runs.id", ondelete="CASCADE"), nullable=False
)
scorer_name: Mapped[str] = mapped_column(String(255), nullable=False)
value: Mapped[float] = mapped_column(Float, nullable=False)
scorer_metadata: Mapped[dict | None] = mapped_column(
"metadata", JSON, nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, nullable=False
)
# Relationships
run: Mapped["Run"] = relationship(back_populates="scores")
__table_args__ = (
Index("ix_scores_run_id", "run_id"),
Index("ix_scores_scorer_name", "scorer_name"),
)
class ResponseCache(Base):
__tablename__ = "response_cache"
config_hash: Mapped[str] = mapped_column(
String(64), primary_key=True
)
response: Mapped[str] = mapped_column(Text, nullable=False)
model: Mapped[str] = mapped_column(String(255), nullable=False)
tokens_in: Mapped[int | None] = mapped_column(Integer, nullable=True)
tokens_out: Mapped[int | None] = mapped_column(Integer, nullable=True)
latency_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=_utcnow, nullable=False
)
class WebhookConfig(Base):
__tablename__ = "webhook_configs"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True, default=_new_uuid
)
event_type: Mapped[str] = mapped_column(String(255), nullable=False)
url: Mapped[str] = mapped_column(String(2048), nullable=False)
headers: Mapped[dict | None] = mapped_column(JSON, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
__table_args__ = (
Index("ix_webhook_configs_event_type", "event_type"),
)

View file

@ -0,0 +1,359 @@
"""Tests for SQLAlchemy ORM models."""
import uuid
from datetime import datetime, timezone
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
from models import (
Base,
Experiment,
ExperimentStatus,
Project,
ResponseCache,
Run,
RunStatus,
Score,
StageResult,
User,
WebhookConfig,
)
def _engine():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
return engine
def _session(engine):
return Session(engine)
# ---------------------------------------------------------------------------
# Table existence
# ---------------------------------------------------------------------------
def test_all_tables_created():
engine = _engine()
table_names = inspect(engine).get_table_names()
expected = {
"users",
"projects",
"experiments",
"runs",
"stage_results",
"scores",
"response_cache",
"webhook_configs",
}
assert expected.issubset(set(table_names))
# ---------------------------------------------------------------------------
# User
# ---------------------------------------------------------------------------
def test_user_creation():
engine = _engine()
with _session(engine) as session:
user = User(username="admin", password_hash="hashed", is_admin=True)
session.add(user)
session.commit()
assert isinstance(user.id, uuid.UUID)
assert user.username == "admin"
assert user.is_admin is True
assert isinstance(user.created_at, datetime)
def test_user_username_unique():
engine = _engine()
with _session(engine) as session:
session.add(User(username="dup", password_hash="h1"))
session.commit()
session.add(User(username="dup", password_hash="h2"))
try:
session.commit()
assert False, "Should have raised IntegrityError"
except Exception:
session.rollback()
# ---------------------------------------------------------------------------
# Project
# ---------------------------------------------------------------------------
def test_project_with_owner():
engine = _engine()
with _session(engine) as session:
user = User(username="owner", password_hash="h")
project = Project(name="Test Project", description="A test", owner=user)
session.add(project)
session.commit()
assert project.owner_id == user.id
assert project.name == "Test Project"
assert isinstance(project.updated_at, datetime)
def test_project_cascade_delete_from_user():
engine = _engine()
with _session(engine) as session:
user = User(username="owner", password_hash="h")
project = Project(name="P1", owner=user)
session.add(project)
session.commit()
project_id = project.id
session.delete(user)
session.commit()
assert session.get(Project, project_id) is None
# ---------------------------------------------------------------------------
# Experiment
# ---------------------------------------------------------------------------
def test_experiment_defaults():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(
project=project,
name="Exp1",
sample_data={"inputs": ["hello"]},
pipeline_stages=[{"prompt": "test"}],
scoring_config={"scorers": ["keyword"]},
parameter_space={"temperature": [0.1, 0.5]},
)
session.add(exp)
session.commit()
assert exp.status == ExperimentStatus.draft
assert exp.sample_data == {"inputs": ["hello"]}
assert isinstance(exp.created_at, datetime)
def test_experiment_cascade_delete_from_project():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
session.add(exp)
session.commit()
exp_id = exp.id
session.delete(project)
session.commit()
assert session.get(Experiment, exp_id) is None
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
def test_run_creation():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
run = Run(
experiment=exp,
config_hash="a" * 64,
config={"model": "gpt-4", "temperature": 0.5},
status=RunStatus.completed,
duration_ms=1200,
tokens_in=100,
tokens_out=50,
)
session.add(run)
session.commit()
assert run.status == RunStatus.completed
assert run.config["model"] == "gpt-4"
def test_run_default_status():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
run = Run(experiment=exp, config_hash="b" * 64, config={})
session.add(run)
session.commit()
assert run.status == RunStatus.pending
# ---------------------------------------------------------------------------
# StageResult
# ---------------------------------------------------------------------------
def test_stage_result():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
run = Run(experiment=exp, config_hash="c" * 64, config={})
sr = StageResult(
run=run,
stage_index=0,
prompt_sent="Hello",
response_raw="World",
model_used="gpt-4",
parameters={"temperature": 0.5},
tokens_in=10,
tokens_out=5,
latency_ms=200,
)
session.add(sr)
session.commit()
assert sr.stage_index == 0
assert sr.model_used == "gpt-4"
assert len(run.stage_results) == 1
# ---------------------------------------------------------------------------
# Score
# ---------------------------------------------------------------------------
def test_score():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
run = Run(experiment=exp, config_hash="d" * 64, config={})
score = Score(
run=run,
scorer_name="embedding_similarity",
value=0.87,
scorer_metadata={"reference_id": "ref1"},
)
session.add(score)
session.commit()
assert score.value == 0.87
assert score.scorer_name == "embedding_similarity"
assert len(run.scores) == 1
# ---------------------------------------------------------------------------
# ResponseCache
# ---------------------------------------------------------------------------
def test_response_cache():
engine = _engine()
with _session(engine) as session:
cache = ResponseCache(
config_hash="e" * 64,
response="cached response",
model="gpt-4",
tokens_in=50,
tokens_out=25,
latency_ms=300,
)
session.add(cache)
session.commit()
fetched = session.get(ResponseCache, "e" * 64)
assert fetched is not None
assert fetched.response == "cached response"
def test_response_cache_pk_is_config_hash():
engine = _engine()
with _session(engine) as session:
session.add(
ResponseCache(config_hash="f" * 64, response="r1", model="m1")
)
session.commit()
session.add(
ResponseCache(config_hash="f" * 64, response="r2", model="m2")
)
try:
session.commit()
assert False, "Should have raised IntegrityError"
except Exception:
session.rollback()
# ---------------------------------------------------------------------------
# WebhookConfig
# ---------------------------------------------------------------------------
def test_webhook_config():
engine = _engine()
with _session(engine) as session:
wh = WebhookConfig(
event_type="experiment.completed",
url="https://example.com/hook",
headers={"Authorization": "Bearer token"},
is_active=True,
)
session.add(wh)
session.commit()
assert isinstance(wh.id, uuid.UUID)
assert wh.event_type == "experiment.completed"
assert wh.is_active is True
def test_webhook_config_default_active():
engine = _engine()
with _session(engine) as session:
wh = WebhookConfig(
event_type="run.failed",
url="https://example.com/hook",
)
session.add(wh)
session.commit()
assert wh.is_active is True
# ---------------------------------------------------------------------------
# Relationship cascades: Run → StageResult + Score
# ---------------------------------------------------------------------------
def test_run_cascade_deletes_children():
engine = _engine()
with _session(engine) as session:
user = User(username="u", password_hash="h")
project = Project(name="P", owner=user)
exp = Experiment(project=project, name="E")
run = Run(experiment=exp, config_hash="g" * 64, config={})
sr = StageResult(
run=run, stage_index=0, prompt_sent="p",
response_raw="r", model_used="m",
)
score = Score(run=run, scorer_name="test", value=0.5)
session.add_all([run, sr, score])
session.commit()
sr_id, score_id = sr.id, score.id
session.delete(run)
session.commit()
assert session.get(StageResult, sr_id) is None
assert session.get(Score, score_id) is None