chrysopedia/alembic/versions/010_add_pipeline_runs.py
jlightner c2db9aa011 feat: Pipeline runs — per-execution tracking with run-scoped events
Data model:
- New pipeline_runs table (id, video_id, run_number, trigger, status,
  started_at, finished_at, error_stage, total_tokens)
- pipeline_events gains run_id FK (nullable for backward compat)
- Alembic migration 010_add_pipeline_runs

Backend:
- run_pipeline() creates a PipelineRun, threads run_id through all stages
- _emit_event() and _make_llm_callback() accept and store run_id
- Stage 6 (final) calls _finish_run() to mark complete with token totals
- mark_pipeline_error marks run as error
- Revoke marks running runs as cancelled
- Trigger endpoints pass trigger type (manual, clean_reprocess)
- New GET /admin/pipeline/runs/{video_id} — lists runs with event counts
- GET /admin/pipeline/events supports ?run_id= filter

Frontend:
- Expanded video detail now shows RunList instead of flat EventLog
- Each run is a collapsible card showing: run number, trigger type,
  status badge, timestamps, token count, event count
- Latest run auto-expands, older runs collapsed
- Legacy events (pre-run-tracking) shown as separate collapsible section
- Run cards color-coded: cyan border for running, red for error,
  gray for cancelled
- EventLog accepts optional runId prop to scope events to a single run
2026-03-31 17:13:41 +00:00

54 lines
2 KiB
Python

"""Add pipeline_runs table and run_id FK on pipeline_events.
Each pipeline trigger creates a run. Events are scoped to runs
for clean per-execution audit trails.
Revision ID: 010_add_pipeline_runs
Revises: 009_add_creator_hidden_flag
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = "010_add_pipeline_runs"
down_revision = "009_add_creator_hidden_flag"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create enums
pipeline_run_trigger = sa.Enum(
"manual", "clean_reprocess", "auto_ingest", "bulk",
name="pipeline_run_trigger",
)
pipeline_run_status = sa.Enum(
"running", "complete", "error", "cancelled",
name="pipeline_run_status",
)
op.create_table(
"pipeline_runs",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("video_id", UUID(as_uuid=True), sa.ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True),
sa.Column("run_number", sa.Integer, nullable=False),
sa.Column("trigger", pipeline_run_trigger, nullable=False),
sa.Column("status", pipeline_run_status, nullable=False, server_default="running"),
sa.Column("started_at", sa.DateTime, nullable=False, server_default=sa.text("now()")),
sa.Column("finished_at", sa.DateTime, nullable=True),
sa.Column("error_stage", sa.String(50), nullable=True),
sa.Column("total_tokens", sa.Integer, nullable=False, server_default="0"),
)
# Add run_id to pipeline_events (nullable for backward compat)
op.add_column(
"pipeline_events",
sa.Column("run_id", UUID(as_uuid=True), sa.ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True),
)
def downgrade() -> None:
op.drop_column("pipeline_events", "run_id")
op.drop_table("pipeline_runs")
op.execute("DROP TYPE IF EXISTS pipeline_run_trigger")
op.execute("DROP TYPE IF EXISTS pipeline_run_status")