Data model:
- New pipeline_runs table (id, video_id, run_number, trigger, status,
started_at, finished_at, error_stage, total_tokens)
- pipeline_events gains run_id FK (nullable for backward compat)
- Alembic migration 010_add_pipeline_runs
Backend:
- run_pipeline() creates a PipelineRun, threads run_id through all stages
- _emit_event() and _make_llm_callback() accept and store run_id
- Stage 6 (final) calls _finish_run() to mark complete with token totals
- mark_pipeline_error marks run as error
- Revoke marks running runs as cancelled
- Trigger endpoints pass trigger type (manual, clean_reprocess)
- New GET /admin/pipeline/runs/{video_id} — lists runs with event counts
- GET /admin/pipeline/events supports ?run_id= filter
Frontend:
- Expanded video detail now shows RunList instead of flat EventLog
- Each run is a collapsible card showing: run number, trigger type,
status badge, timestamps, token count, event count
- Latest run auto-expands, older runs collapsed
- Legacy events (pre-run-tracking) shown as separate collapsible section
- Run cards color-coded: cyan border for running, red for error,
gray for cancelled
- EventLog accepts optional runId prop to scope events to a single run
54 lines
2 KiB
Python
54 lines
2 KiB
Python
"""Add pipeline_runs table and run_id FK on pipeline_events.
|
|
|
|
Each pipeline trigger creates a run. Events are scoped to runs
|
|
for clean per-execution audit trails.
|
|
|
|
Revision ID: 010_add_pipeline_runs
|
|
Revises: 009_add_creator_hidden_flag
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
|
|
revision = "010_add_pipeline_runs"
|
|
down_revision = "009_add_creator_hidden_flag"
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
# Create enums
|
|
pipeline_run_trigger = sa.Enum(
|
|
"manual", "clean_reprocess", "auto_ingest", "bulk",
|
|
name="pipeline_run_trigger",
|
|
)
|
|
pipeline_run_status = sa.Enum(
|
|
"running", "complete", "error", "cancelled",
|
|
name="pipeline_run_status",
|
|
)
|
|
|
|
op.create_table(
|
|
"pipeline_runs",
|
|
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
|
|
sa.Column("video_id", UUID(as_uuid=True), sa.ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True),
|
|
sa.Column("run_number", sa.Integer, nullable=False),
|
|
sa.Column("trigger", pipeline_run_trigger, nullable=False),
|
|
sa.Column("status", pipeline_run_status, nullable=False, server_default="running"),
|
|
sa.Column("started_at", sa.DateTime, nullable=False, server_default=sa.text("now()")),
|
|
sa.Column("finished_at", sa.DateTime, nullable=True),
|
|
sa.Column("error_stage", sa.String(50), nullable=True),
|
|
sa.Column("total_tokens", sa.Integer, nullable=False, server_default="0"),
|
|
)
|
|
|
|
# Add run_id to pipeline_events (nullable for backward compat)
|
|
op.add_column(
|
|
"pipeline_events",
|
|
sa.Column("run_id", UUID(as_uuid=True), sa.ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True),
|
|
)
|
|
|
|
|
|
def downgrade() -> None:
|
|
op.drop_column("pipeline_events", "run_id")
|
|
op.drop_table("pipeline_runs")
|
|
op.execute("DROP TYPE IF EXISTS pipeline_run_trigger")
|
|
op.execute("DROP TYPE IF EXISTS pipeline_run_status")
|