feat: Pipeline runs — per-execution tracking with run-scoped events

Data model:
- New pipeline_runs table (id, video_id, run_number, trigger, status,
  started_at, finished_at, error_stage, total_tokens)
- pipeline_events gains run_id FK (nullable for backward compat)
- Alembic migration 010_add_pipeline_runs

Backend:
- run_pipeline() creates a PipelineRun, threads run_id through all stages
- _emit_event() and _make_llm_callback() accept and store run_id
- Stage 6 (final) calls _finish_run() to mark complete with token totals
- mark_pipeline_error marks run as error
- Revoke marks running runs as cancelled
- Trigger endpoints pass trigger type (manual, clean_reprocess)
- New GET /admin/pipeline/runs/{video_id} — lists runs with event counts
- GET /admin/pipeline/events supports ?run_id= filter

Frontend:
- Expanded video detail now shows RunList instead of flat EventLog
- Each run is a collapsible card showing: run number, trigger type,
  status badge, timestamps, token count, event count
- Latest run auto-expands, older runs collapsed
- Legacy events (pre-run-tracking) shown as separate collapsible section
- Run cards color-coded: cyan border for running, red for error,
  gray for cancelled
- EventLog accepts optional runId prop to scope events to a single run
This commit is contained in:
jlightner 2026-03-31 17:13:41 +00:00
parent cd3b57a156
commit c2db9aa011
7 changed files with 605 additions and 50 deletions

View file

@ -0,0 +1,54 @@
"""Add pipeline_runs table and run_id FK on pipeline_events.
Each pipeline trigger creates a run. Events are scoped to runs
for clean per-execution audit trails.
Revision ID: 010_add_pipeline_runs
Revises: 009_add_creator_hidden_flag
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = "010_add_pipeline_runs"
down_revision = "009_add_creator_hidden_flag"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create enums
pipeline_run_trigger = sa.Enum(
"manual", "clean_reprocess", "auto_ingest", "bulk",
name="pipeline_run_trigger",
)
pipeline_run_status = sa.Enum(
"running", "complete", "error", "cancelled",
name="pipeline_run_status",
)
op.create_table(
"pipeline_runs",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("video_id", UUID(as_uuid=True), sa.ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True),
sa.Column("run_number", sa.Integer, nullable=False),
sa.Column("trigger", pipeline_run_trigger, nullable=False),
sa.Column("status", pipeline_run_status, nullable=False, server_default="running"),
sa.Column("started_at", sa.DateTime, nullable=False, server_default=sa.text("now()")),
sa.Column("finished_at", sa.DateTime, nullable=True),
sa.Column("error_stage", sa.String(50), nullable=True),
sa.Column("total_tokens", sa.Integer, nullable=False, server_default="0"),
)
# Add run_id to pipeline_events (nullable for backward compat)
op.add_column(
"pipeline_events",
sa.Column("run_id", UUID(as_uuid=True), sa.ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True),
)
def downgrade() -> None:
op.drop_column("pipeline_events", "run_id")
op.drop_table("pipeline_runs")
op.execute("DROP TYPE IF EXISTS pipeline_run_trigger")
op.execute("DROP TYPE IF EXISTS pipeline_run_status")

View file

@ -361,6 +361,62 @@ class ContentReport(Base):
resolved_at: Mapped[datetime | None] = mapped_column(nullable=True)
# ── Pipeline Event ───────────────────────────────────────────────────────────
class PipelineRunStatus(str, enum.Enum):
"""Status of a pipeline run."""
running = "running"
complete = "complete"
error = "error"
cancelled = "cancelled"
class PipelineRunTrigger(str, enum.Enum):
"""What initiated a pipeline run."""
manual = "manual"
clean_reprocess = "clean_reprocess"
auto_ingest = "auto_ingest"
bulk = "bulk"
class PipelineRun(Base):
"""A single execution of the pipeline for a video.
Each trigger/retrigger creates a new run. Events are scoped to a run
via run_id, giving a clean audit trail per execution.
"""
__tablename__ = "pipeline_runs"
id: Mapped[uuid.UUID] = _uuid_pk()
video_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True,
)
run_number: Mapped[int] = mapped_column(
Integer, nullable=False, doc="Auto-increment per video, 1-indexed"
)
trigger: Mapped[PipelineRunTrigger] = mapped_column(
Enum(PipelineRunTrigger, name="pipeline_run_trigger", create_constraint=True),
nullable=False,
)
status: Mapped[PipelineRunStatus] = mapped_column(
Enum(PipelineRunStatus, name="pipeline_run_status", create_constraint=True),
default=PipelineRunStatus.running,
server_default="running",
)
started_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)
finished_at: Mapped[datetime | None] = mapped_column(nullable=True)
error_stage: Mapped[str | None] = mapped_column(String(50), nullable=True)
total_tokens: Mapped[int] = mapped_column(Integer, default=0, server_default="0")
# relationships
video: Mapped[SourceVideo] = sa_relationship()
events: Mapped[list[PipelineEvent]] = sa_relationship(
back_populates="run", foreign_keys="PipelineEvent.run_id"
)
# ── Pipeline Event ───────────────────────────────────────────────────────────
class PipelineEvent(Base):
@ -375,6 +431,9 @@ class PipelineEvent(Base):
video_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), nullable=False, index=True,
)
run_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True,
)
stage: Mapped[str] = mapped_column(
String(50), nullable=False, doc="stage2_segmentation, stage3_extraction, etc."
)
@ -397,3 +456,8 @@ class PipelineEvent(Base):
system_prompt_text: Mapped[str | None] = mapped_column(Text, nullable=True)
user_prompt_text: Mapped[str | None] = mapped_column(Text, nullable=True)
response_text: Mapped[str | None] = mapped_column(Text, nullable=True)
# relationships
run: Mapped[PipelineRun | None] = sa_relationship(
back_populates="events", foreign_keys=[run_id]
)

View file

@ -76,6 +76,7 @@ def _emit_event(
stage: str,
event_type: str,
*,
run_id: str | None = None,
prompt_tokens: int | None = None,
completion_tokens: int | None = None,
total_tokens: int | None = None,
@ -92,6 +93,7 @@ def _emit_event(
try:
event = PipelineEvent(
video_id=video_id,
run_id=run_id,
stage=stage,
event_type=event_type,
prompt_tokens=prompt_tokens,
@ -132,6 +134,7 @@ def _make_llm_callback(
stage: str,
system_prompt: str | None = None,
user_prompt: str | None = None,
run_id: str | None = None,
):
"""Create an on_complete callback for LLMClient that emits llm_call events.
@ -149,6 +152,7 @@ def _make_llm_callback(
video_id=video_id,
stage=stage,
event_type="llm_call",
run_id=run_id,
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
@ -285,7 +289,7 @@ def _safe_parse_llm_response(
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage2_segmentation(self, video_id: str) -> str:
def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
"""Analyze transcript segments and identify topic boundaries.
Loads all TranscriptSegment rows for the video, sends them to the LLM
@ -295,7 +299,7 @@ def stage2_segmentation(self, video_id: str) -> str:
"""
start = time.monotonic()
logger.info("Stage 2 (segmentation) starting for video_id=%s", video_id)
_emit_event(video_id, "stage2_segmentation", "start")
_emit_event(video_id, "stage2_segmentation", "start", run_id=run_id)
session = _get_sync_session()
try:
@ -331,7 +335,7 @@ def stage2_segmentation(self, video_id: str) -> str:
hard_limit = get_settings().llm_max_tokens_hard_limit
max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage2_segmentation", hard_limit=hard_limit)
logger.info("Stage 2 using model=%s, modality=%s, max_tokens=%d", model_override or "default", modality, max_tokens)
raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult, on_complete=_make_llm_callback(video_id, "stage2_segmentation", system_prompt=system_prompt, user_prompt=user_prompt),
raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult, on_complete=_make_llm_callback(video_id, "stage2_segmentation", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id),
modality=modality, model_override=model_override, max_tokens=max_tokens)
result = _safe_parse_llm_response(raw, SegmentationResult, llm, system_prompt, user_prompt,
modality=modality, model_override=model_override)
@ -345,7 +349,7 @@ def stage2_segmentation(self, video_id: str) -> str:
session.commit()
elapsed = time.monotonic() - start
_emit_event(video_id, "stage2_segmentation", "complete")
_emit_event(video_id, "stage2_segmentation", "complete", run_id=run_id)
logger.info(
"Stage 2 (segmentation) completed for video_id=%s in %.1fs — %d topic groups found",
video_id, elapsed, len(result.segments),
@ -356,7 +360,7 @@ def stage2_segmentation(self, video_id: str) -> str:
raise # Don't retry missing prompt files
except Exception as exc:
session.rollback()
_emit_event(video_id, "stage2_segmentation", "error", payload={"error": str(exc)})
_emit_event(video_id, "stage2_segmentation", "error", run_id=run_id, payload={"error": str(exc)})
logger.error("Stage 2 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
@ -366,7 +370,7 @@ def stage2_segmentation(self, video_id: str) -> str:
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage3_extraction(self, video_id: str) -> str:
def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str:
"""Extract key moments from each topic segment group.
Groups segments by topic_label, calls the LLM for each group to extract
@ -376,7 +380,7 @@ def stage3_extraction(self, video_id: str) -> str:
"""
start = time.monotonic()
logger.info("Stage 3 (extraction) starting for video_id=%s", video_id)
_emit_event(video_id, "stage3_extraction", "start")
_emit_event(video_id, "stage3_extraction", "start", run_id=run_id)
session = _get_sync_session()
try:
@ -423,7 +427,7 @@ def stage3_extraction(self, video_id: str) -> str:
)
max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage3_extraction", hard_limit=hard_limit)
raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult, on_complete=_make_llm_callback(video_id, "stage3_extraction", system_prompt=system_prompt, user_prompt=user_prompt),
raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult, on_complete=_make_llm_callback(video_id, "stage3_extraction", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id),
modality=modality, model_override=model_override, max_tokens=max_tokens)
result = _safe_parse_llm_response(raw, ExtractionResult, llm, system_prompt, user_prompt,
modality=modality, model_override=model_override)
@ -451,7 +455,7 @@ def stage3_extraction(self, video_id: str) -> str:
session.commit()
elapsed = time.monotonic() - start
_emit_event(video_id, "stage3_extraction", "complete")
_emit_event(video_id, "stage3_extraction", "complete", run_id=run_id)
logger.info(
"Stage 3 (extraction) completed for video_id=%s in %.1fs — %d moments created",
video_id, elapsed, total_moments,
@ -462,7 +466,7 @@ def stage3_extraction(self, video_id: str) -> str:
raise
except Exception as exc:
session.rollback()
_emit_event(video_id, "stage3_extraction", "error", payload={"error": str(exc)})
_emit_event(video_id, "stage3_extraction", "error", run_id=run_id, payload={"error": str(exc)})
logger.error("Stage 3 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
@ -472,7 +476,7 @@ def stage3_extraction(self, video_id: str) -> str:
# ── Stage 4: Classification ─────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage4_classification(self, video_id: str) -> str:
def stage4_classification(self, video_id: str, run_id: str | None = None) -> str:
"""Classify key moments against the canonical tag taxonomy.
Loads all KeyMoment rows for the video, sends them to the LLM with the
@ -485,7 +489,7 @@ def stage4_classification(self, video_id: str) -> str:
"""
start = time.monotonic()
logger.info("Stage 4 (classification) starting for video_id=%s", video_id)
_emit_event(video_id, "stage4_classification", "start")
_emit_event(video_id, "stage4_classification", "start", run_id=run_id)
session = _get_sync_session()
try:
@ -532,7 +536,7 @@ def stage4_classification(self, video_id: str) -> str:
hard_limit = get_settings().llm_max_tokens_hard_limit
max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage4_classification", hard_limit=hard_limit)
logger.info("Stage 4 using model=%s, modality=%s, max_tokens=%d", model_override or "default", modality, max_tokens)
raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult, on_complete=_make_llm_callback(video_id, "stage4_classification", system_prompt=system_prompt, user_prompt=user_prompt),
raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult, on_complete=_make_llm_callback(video_id, "stage4_classification", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id),
modality=modality, model_override=model_override, max_tokens=max_tokens)
result = _safe_parse_llm_response(raw, ClassificationResult, llm, system_prompt, user_prompt,
modality=modality, model_override=model_override)
@ -564,7 +568,7 @@ def stage4_classification(self, video_id: str) -> str:
_store_classification_data(video_id, classification_data)
elapsed = time.monotonic() - start
_emit_event(video_id, "stage4_classification", "complete")
_emit_event(video_id, "stage4_classification", "complete", run_id=run_id)
logger.info(
"Stage 4 (classification) completed for video_id=%s in %.1fs — %d moments classified",
video_id, elapsed, len(classification_data),
@ -575,7 +579,7 @@ def stage4_classification(self, video_id: str) -> str:
raise
except Exception as exc:
session.rollback()
_emit_event(video_id, "stage4_classification", "error", payload={"error": str(exc)})
_emit_event(video_id, "stage4_classification", "error", run_id=run_id, payload={"error": str(exc)})
logger.error("Stage 4 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
@ -694,7 +698,7 @@ def _capture_pipeline_metadata() -> dict:
# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage5_synthesis(self, video_id: str) -> str:
def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
"""Synthesize technique pages from classified key moments.
Groups moments by (creator, topic_category), calls the LLM to synthesize
@ -707,7 +711,7 @@ def stage5_synthesis(self, video_id: str) -> str:
"""
start = time.monotonic()
logger.info("Stage 5 (synthesis) starting for video_id=%s", video_id)
_emit_event(video_id, "stage5_synthesis", "start")
_emit_event(video_id, "stage5_synthesis", "start", run_id=run_id)
settings = get_settings()
session = _get_sync_session()
@ -777,7 +781,7 @@ def stage5_synthesis(self, video_id: str) -> str:
user_prompt = f"<creator>{creator_name}</creator>\n<moments>\n{moments_text}\n</moments>"
max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage5_synthesis", hard_limit=hard_limit)
raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult, on_complete=_make_llm_callback(video_id, "stage5_synthesis", system_prompt=system_prompt, user_prompt=user_prompt),
raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult, on_complete=_make_llm_callback(video_id, "stage5_synthesis", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id),
modality=modality, model_override=model_override, max_tokens=max_tokens)
result = _safe_parse_llm_response(raw, SynthesisResult, llm, system_prompt, user_prompt,
modality=modality, model_override=model_override)
@ -885,7 +889,7 @@ def stage5_synthesis(self, video_id: str) -> str:
session.commit()
elapsed = time.monotonic() - start
_emit_event(video_id, "stage5_synthesis", "complete")
_emit_event(video_id, "stage5_synthesis", "complete", run_id=run_id)
logger.info(
"Stage 5 (synthesis) completed for video_id=%s in %.1fs — %d pages created/updated",
video_id, elapsed, pages_created,
@ -896,7 +900,7 @@ def stage5_synthesis(self, video_id: str) -> str:
raise
except Exception as exc:
session.rollback()
_emit_event(video_id, "stage5_synthesis", "error", payload={"error": str(exc)})
_emit_event(video_id, "stage5_synthesis", "error", run_id=run_id, payload={"error": str(exc)})
logger.error("Stage 5 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
@ -906,7 +910,7 @@ def stage5_synthesis(self, video_id: str) -> str:
# ── Stage 6: Embed & Index ───────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=0)
def stage6_embed_and_index(self, video_id: str) -> str:
def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> str:
"""Generate embeddings for technique pages and key moments, then upsert to Qdrant.
This is a non-blocking side-effect stage failures are logged but do not
@ -946,6 +950,8 @@ def stage6_embed_and_index(self, video_id: str) -> str:
if not moments and not pages:
logger.info("Stage 6: No moments or pages for video_id=%s, skipping.", video_id)
if run_id:
_finish_run(run_id, "complete")
return video_id
embed_client = EmbeddingClient(settings)
@ -1030,6 +1036,8 @@ def stage6_embed_and_index(self, video_id: str) -> str:
"%d pages, %d moments processed",
video_id, elapsed, len(pages), len(moments),
)
if run_id:
_finish_run(run_id, "complete")
return video_id
except Exception as exc:
@ -1039,6 +1047,8 @@ def stage6_embed_and_index(self, video_id: str) -> str:
"Pipeline continues — embeddings can be regenerated later.",
video_id, exc,
)
if run_id:
_finish_run(run_id, "complete") # Run is still "complete" — stage6 is best-effort
return video_id
finally:
session.close()
@ -1099,14 +1109,68 @@ def _load_prior_pages(video_id: str) -> list[str]:
# ── Orchestrator ─────────────────────────────────────────────────────────────
@celery_app.task
def mark_pipeline_error(request, exc, traceback, video_id: str) -> None:
def mark_pipeline_error(request, exc, traceback, video_id: str, run_id: str | None = None) -> None:
"""Error callback — marks video as errored when a pipeline stage fails."""
logger.error("Pipeline failed for video_id=%s: %s", video_id, exc)
_set_error_status(video_id, "pipeline", exc)
if run_id:
_finish_run(run_id, "error", error_stage="pipeline")
def _create_run(video_id: str, trigger: str) -> str:
"""Create a PipelineRun and return its id."""
from models import PipelineRun, PipelineRunTrigger
session = _get_sync_session()
try:
# Compute run_number: max existing + 1
from sqlalchemy import func as sa_func
max_num = session.execute(
select(sa_func.coalesce(sa_func.max(PipelineRun.run_number), 0))
.where(PipelineRun.video_id == video_id)
).scalar() or 0
run = PipelineRun(
video_id=video_id,
run_number=max_num + 1,
trigger=PipelineRunTrigger(trigger),
)
session.add(run)
session.commit()
run_id = str(run.id)
return run_id
finally:
session.close()
def _finish_run(run_id: str, status: str, error_stage: str | None = None) -> None:
"""Update a PipelineRun's status and finished_at."""
from models import PipelineRun, PipelineRunStatus, _now
session = _get_sync_session()
try:
run = session.execute(
select(PipelineRun).where(PipelineRun.id == run_id)
).scalar_one_or_none()
if run:
run.status = PipelineRunStatus(status)
run.finished_at = _now()
if error_stage:
run.error_stage = error_stage
# Aggregate total tokens from events
total = session.execute(
select(func.coalesce(func.sum(PipelineEvent.total_tokens), 0))
.where(PipelineEvent.run_id == run_id)
).scalar() or 0
run.total_tokens = total
session.commit()
except Exception as exc:
logger.warning("Failed to finish run %s: %s", run_id, exc)
finally:
session.close()
@celery_app.task
def run_pipeline(video_id: str) -> str:
def run_pipeline(video_id: str, trigger: str = "manual") -> str:
"""Orchestrate the full pipeline (stages 2-5) with resumability.
Checks the current processing_status of the video and chains only the
@ -1139,33 +1203,35 @@ def run_pipeline(video_id: str) -> str:
# Snapshot prior technique pages before pipeline resets key_moments
_snapshot_prior_pages(video_id)
# Create a pipeline run record
run_id = _create_run(video_id, trigger)
logger.info("run_pipeline: created run_id=%s for video_id=%s (trigger=%s)", run_id, video_id, trigger)
# Build the chain based on current status
stages = []
if status in (ProcessingStatus.not_started, ProcessingStatus.queued):
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(), # receives video_id from previous
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
stage2_segmentation.s(video_id, run_id=run_id),
stage3_extraction.s(run_id=run_id), # receives video_id from previous
stage4_classification.s(run_id=run_id),
stage5_synthesis.s(run_id=run_id),
stage6_embed_and_index.s(run_id=run_id),
]
elif status == ProcessingStatus.processing:
# Resuming a previously-started pipeline — re-run from stage 2
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(),
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
stage2_segmentation.s(video_id, run_id=run_id),
stage3_extraction.s(run_id=run_id),
stage4_classification.s(run_id=run_id),
stage5_synthesis.s(run_id=run_id),
stage6_embed_and_index.s(run_id=run_id),
]
elif status == ProcessingStatus.error:
# Retrigger after error — re-run full pipeline
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(),
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
stage2_segmentation.s(video_id, run_id=run_id),
stage3_extraction.s(run_id=run_id),
stage4_classification.s(run_id=run_id),
stage5_synthesis.s(run_id=run_id),
stage6_embed_and_index.s(run_id=run_id),
]
elif status == ProcessingStatus.complete:
logger.info(
@ -1187,11 +1253,11 @@ def run_pipeline(video_id: str) -> str:
session.close()
pipeline = celery_chain(*stages)
error_cb = mark_pipeline_error.s(video_id)
error_cb = mark_pipeline_error.s(video_id, run_id=run_id)
pipeline.apply_async(link_error=error_cb)
logger.info(
"run_pipeline: dispatched %d stages for video_id=%s",
len(stages), video_id,
"run_pipeline: dispatched %d stages for video_id=%s (run_id=%s)",
len(stages), video_id, run_id,
)
return video_id

View file

@ -13,6 +13,7 @@ Admin:
import logging
import uuid
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
@ -21,7 +22,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import get_session
from models import PipelineEvent, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus
from models import PipelineEvent, PipelineRun, PipelineRunStatus, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus
from redis_client import get_redis
from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse
@ -50,7 +51,7 @@ async def trigger_pipeline(
from pipeline.stages import run_pipeline
try:
run_pipeline.delay(str(video.id))
run_pipeline.delay(str(video.id), trigger="manual")
logger.info("Pipeline manually triggered for video_id=%s", video_id)
except Exception as exc:
logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc)
@ -88,6 +89,24 @@ async def list_pipeline_videos(
.subquery()
)
# Subquery for the most recent pipeline run per video
latest_run = (
select(
PipelineRun.video_id,
PipelineRun.id.label("run_id"),
PipelineRun.run_number,
PipelineRun.trigger.label("run_trigger"),
PipelineRun.status.label("run_status"),
PipelineRun.started_at.label("run_started_at"),
PipelineRun.finished_at.label("run_finished_at"),
PipelineRun.error_stage.label("run_error_stage"),
PipelineRun.total_tokens.label("run_total_tokens"),
)
.order_by(PipelineRun.video_id, PipelineRun.started_at.desc())
.distinct(PipelineRun.video_id)
.subquery()
)
# Subquery for the most recent stage start event per video (active stage indicator)
latest_stage = (
select(
@ -117,10 +136,19 @@ async def list_pipeline_videos(
latest_stage.c.active_stage,
latest_stage.c.active_stage_status,
latest_stage.c.stage_started_at,
latest_run.c.run_id,
latest_run.c.run_number,
latest_run.c.run_trigger,
latest_run.c.run_status,
latest_run.c.run_started_at,
latest_run.c.run_finished_at,
latest_run.c.run_error_stage,
latest_run.c.run_total_tokens,
)
.join(Creator, SourceVideo.creator_id == Creator.id)
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
.outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id)
.outerjoin(latest_run, SourceVideo.id == latest_run.c.video_id)
.order_by(SourceVideo.updated_at.desc())
)
@ -143,6 +171,16 @@ async def list_pipeline_videos(
"active_stage": r.active_stage,
"active_stage_status": r.active_stage_status,
"stage_started_at": r.stage_started_at.isoformat() if r.stage_started_at else None,
"latest_run": {
"id": str(r.run_id),
"run_number": r.run_number,
"trigger": r.run_trigger.value if hasattr(r.run_trigger, 'value') else r.run_trigger,
"status": r.run_status.value if hasattr(r.run_status, 'value') else r.run_status,
"started_at": r.run_started_at.isoformat() if r.run_started_at else None,
"finished_at": r.run_finished_at.isoformat() if r.run_finished_at else None,
"error_stage": r.run_error_stage,
"total_tokens": r.run_total_tokens or 0,
} if r.run_id else None,
}
for r in rows
],
@ -216,7 +254,7 @@ async def clean_retrigger_pipeline(
# Now trigger the pipeline
from pipeline.stages import run_pipeline
try:
run_pipeline.delay(str(video.id))
run_pipeline.delay(str(video.id), trigger="clean_reprocess")
logger.info("Clean retrigger dispatched for video_id=%s", video_id)
except Exception as exc:
logger.warning("Failed to dispatch pipeline after cleanup for video_id=%s: %s", video_id, exc)
@ -235,10 +273,14 @@ async def clean_retrigger_pipeline(
# ── Admin: Revoke ────────────────────────────────────────────────────────────
@router.post("/admin/pipeline/revoke/{video_id}")
async def revoke_pipeline(video_id: str):
async def revoke_pipeline(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Revoke/cancel active Celery tasks for a video.
Uses Celery's revoke with terminate=True to kill running tasks.
Also marks the latest running pipeline_run as cancelled.
This is best-effort the task may have already completed.
"""
from worker import celery_app
@ -256,6 +298,18 @@ async def revoke_pipeline(video_id: str):
celery_app.control.revoke(task["id"], terminate=True)
revoked_count += 1
logger.info("Revoked task %s for video_id=%s", task["id"], video_id)
# Mark any running pipeline_runs as cancelled
running_runs = await db.execute(
select(PipelineRun).where(
PipelineRun.video_id == video_id,
PipelineRun.status == PipelineRunStatus.running,
)
)
for run in running_runs.scalars().all():
run.status = PipelineRunStatus.cancelled
run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
await db.commit()
return {
"status": "revoked" if revoked_count > 0 else "no_active_tasks",
@ -317,6 +371,65 @@ async def recent_pipeline_activity(
}
# ── Admin: Pipeline runs ─────────────────────────────────────────────────────
@router.get("/admin/pipeline/runs/{video_id}")
async def list_pipeline_runs(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""List all pipeline runs for a video, newest first."""
# Count events per run
event_counts = (
select(
PipelineEvent.run_id,
func.count().label("event_count"),
)
.where(PipelineEvent.run_id.isnot(None))
.group_by(PipelineEvent.run_id)
.subquery()
)
stmt = (
select(
PipelineRun,
event_counts.c.event_count,
)
.outerjoin(event_counts, PipelineRun.id == event_counts.c.run_id)
.where(PipelineRun.video_id == video_id)
.order_by(PipelineRun.started_at.desc())
)
result = await db.execute(stmt)
rows = result.all()
# Also count legacy events (run_id IS NULL) for this video
legacy_count_result = await db.execute(
select(func.count())
.select_from(PipelineEvent)
.where(PipelineEvent.video_id == video_id, PipelineEvent.run_id.is_(None))
)
legacy_count = legacy_count_result.scalar() or 0
items = []
for run, evt_count in rows:
items.append({
"id": str(run.id),
"run_number": run.run_number,
"trigger": run.trigger.value if hasattr(run.trigger, 'value') else str(run.trigger),
"status": run.status.value if hasattr(run.status, 'value') else str(run.status),
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"error_stage": run.error_stage,
"total_tokens": run.total_tokens or 0,
"event_count": evt_count or 0,
})
return {
"items": items,
"legacy_event_count": legacy_count,
}
# ── Admin: Event log ─────────────────────────────────────────────────────────
@router.get("/admin/pipeline/events/{video_id}")
@ -326,12 +439,15 @@ async def list_pipeline_events(
limit: Annotated[int, Query(ge=1, le=200)] = 100,
stage: Annotated[str | None, Query(description="Filter by stage name")] = None,
event_type: Annotated[str | None, Query(description="Filter by event type")] = None,
run_id: Annotated[str | None, Query(description="Filter by pipeline run ID")] = None,
order: Annotated[str, Query(description="Sort order: asc or desc")] = "desc",
db: AsyncSession = Depends(get_session),
):
"""Get pipeline events for a video. Default: newest first (desc)."""
stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id)
if run_id:
stmt = stmt.where(PipelineEvent.run_id == run_id)
if stage:
stmt = stmt.where(PipelineEvent.stage == stage)
if event_type:

View file

@ -3740,6 +3740,107 @@ a.app-footer__repo:hover {
flex-shrink: 0;
}
/* ── Run List & Cards ─────────────────────────────────────────────────────── */
.run-list {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.run-card {
border: 1px solid var(--color-border);
border-radius: 8px;
overflow: hidden;
}
.run-card--running {
border-color: var(--color-accent);
}
.run-card--error {
border-color: rgba(244, 67, 54, 0.4);
}
.run-card--cancelled {
border-color: rgba(158, 158, 158, 0.3);
}
.run-card__header {
display: flex;
align-items: center;
gap: 0.5rem;
width: 100%;
padding: 0.6rem 1rem;
background: var(--color-surface);
border: none;
color: var(--color-text);
cursor: pointer;
font-size: 0.85rem;
text-align: left;
}
.run-card__header:hover {
background: var(--color-bg-input);
}
.run-card__arrow {
flex-shrink: 0;
width: 1rem;
color: var(--color-text-muted);
}
.run-card__number {
font-weight: 600;
white-space: nowrap;
}
.run-card__trigger {
color: var(--color-text-muted);
font-size: 0.8rem;
white-space: nowrap;
}
.run-card__time {
color: var(--color-text-muted);
font-size: 0.8rem;
white-space: nowrap;
}
.run-card__duration {
color: var(--color-text-muted);
font-size: 0.75rem;
white-space: nowrap;
}
.run-card__tokens {
color: var(--color-text-muted);
font-size: 0.8rem;
white-space: nowrap;
margin-left: auto;
}
.run-card__events {
color: var(--color-text-muted);
font-size: 0.8rem;
white-space: nowrap;
}
.run-card__error-stage {
color: #f44336;
font-size: 0.8rem;
white-space: nowrap;
}
.run-card__body {
border-top: 1px solid var(--color-border);
padding: 0.5rem;
}
.run-card--legacy .run-card__header {
opacity: 0.7;
}
/* ── Worker Status Indicator ────────────────────────────────────────────── */
.worker-status {

View file

@ -413,6 +413,16 @@ export interface PipelineVideoItem {
active_stage: string | null;
active_stage_status: string | null;
stage_started_at: string | null;
latest_run: {
id: string;
run_number: number;
trigger: string;
status: string;
started_at: string | null;
finished_at: string | null;
error_stage: string | null;
total_tokens: number;
} | null;
}
export interface PipelineVideoListResponse {
@ -502,15 +512,37 @@ export async function fetchRecentActivity(limit = 10): Promise<RecentActivityRes
return request<RecentActivityResponse>(`${BASE}/admin/pipeline/recent-activity?limit=${limit}`);
}
export interface PipelineRunItem {
id: string;
run_number: number;
trigger: string;
status: string;
started_at: string | null;
finished_at: string | null;
error_stage: string | null;
total_tokens: number;
event_count: number;
}
export interface PipelineRunsResponse {
items: PipelineRunItem[];
legacy_event_count: number;
}
export async function fetchPipelineRuns(videoId: string): Promise<PipelineRunsResponse> {
return request<PipelineRunsResponse>(`${BASE}/admin/pipeline/runs/${videoId}`);
}
export async function fetchPipelineEvents(
videoId: string,
params: { offset?: number; limit?: number; stage?: string; event_type?: string; order?: "asc" | "desc" } = {},
params: { offset?: number; limit?: number; stage?: string; event_type?: string; run_id?: string; order?: "asc" | "desc" } = {},
): Promise<PipelineEventListResponse> {
const qs = new URLSearchParams();
if (params.offset !== undefined) qs.set("offset", String(params.offset));
if (params.limit !== undefined) qs.set("limit", String(params.limit));
if (params.stage) qs.set("stage", params.stage);
if (params.event_type) qs.set("event_type", params.event_type);
if (params.run_id) qs.set("run_id", params.run_id);
if (params.order) qs.set("order", params.order);
const query = qs.toString();
return request<PipelineEventListResponse>(

View file

@ -9,6 +9,7 @@ import { useDocumentTitle } from "../hooks/useDocumentTitle";
import {
fetchPipelineVideos,
fetchPipelineEvents,
fetchPipelineRuns,
fetchWorkerStatus,
fetchDebugMode,
setDebugMode,
@ -19,6 +20,7 @@ import {
fetchRecentActivity,
type PipelineVideoItem,
type PipelineEvent,
type PipelineRunItem,
type WorkerStatusResponse,
type RecentActivityItem,
} from "../api/public-client";
@ -215,7 +217,7 @@ function DebugPayloadViewer({ event }: { event: PipelineEvent }) {
// ── Event Log ────────────────────────────────────────────────────────────────
function EventLog({ videoId, status }: { videoId: string; status: string }) {
function EventLog({ videoId, status, runId }: { videoId: string; status: string; runId?: string }) {
const [events, setEvents] = useState<PipelineEvent[]>([]);
const [total, setTotal] = useState(0);
const [loading, setLoading] = useState(true);
@ -233,6 +235,7 @@ function EventLog({ videoId, status }: { videoId: string; status: string }) {
offset,
limit,
order: viewMode === "head" ? "asc" : "desc",
run_id: runId,
});
setEvents(res.items);
setTotal(res.total);
@ -477,6 +480,125 @@ function StatusFilter({
);
}
// ── Run List ─────────────────────────────────────────────────────────────────
const TRIGGER_LABELS: Record<string, string> = {
manual: "Manual",
clean_reprocess: "Clean Reprocess",
auto_ingest: "Auto Ingest",
bulk: "Bulk",
};
const RUN_STATUS_CLASS: Record<string, string> = {
running: "pipeline-badge--active",
complete: "pipeline-badge--success",
error: "pipeline-badge--error",
cancelled: "pipeline-badge--pending",
};
function RunList({ videoId, videoStatus }: { videoId: string; videoStatus: string }) {
const [runs, setRuns] = useState<PipelineRunItem[]>([]);
const [legacyCount, setLegacyCount] = useState(0);
const [loading, setLoading] = useState(true);
const [expandedRunId, setExpandedRunId] = useState<string | null>(null);
const [showLegacy, setShowLegacy] = useState(false);
const load = useCallback(async (silent = false) => {
if (!silent) setLoading(true);
try {
const res = await fetchPipelineRuns(videoId);
setRuns(res.items);
setLegacyCount(res.legacy_event_count);
// Auto-expand the latest run on first load
if (!silent && res.items.length > 0 && expandedRunId === null) {
const firstRun = res.items[0];
if (firstRun) setExpandedRunId(firstRun.id);
}
} catch {
// silently fail
} finally {
if (!silent) setLoading(false);
}
}, [videoId, expandedRunId]);
useEffect(() => {
void load();
}, [load]);
// Auto-refresh when video is processing
useEffect(() => {
if (videoStatus !== "processing" && videoStatus !== "queued") return;
const id = setInterval(() => void load(true), 10_000);
return () => clearInterval(id);
}, [videoStatus, load]);
if (loading) return <div className="loading">Loading runs</div>;
if (runs.length === 0 && legacyCount === 0) {
return <div className="pipeline-events__empty">No pipeline runs recorded.</div>;
}
return (
<div className="run-list">
{runs.map((run) => {
const isExpanded = expandedRunId === run.id;
return (
<div key={run.id} className={`run-card run-card--${run.status}`}>
<button
className="run-card__header"
onClick={() => setExpandedRunId(isExpanded ? null : run.id)}
aria-expanded={isExpanded}
>
<span className="run-card__arrow">{isExpanded ? "▾" : "▸"}</span>
<span className="run-card__number">Run #{run.run_number}</span>
<span className="run-card__trigger">{TRIGGER_LABELS[run.trigger] ?? run.trigger}</span>
<span className={`pipeline-badge ${RUN_STATUS_CLASS[run.status] ?? ""}`}>
{run.status}
</span>
<span className="run-card__time">{formatDate(run.started_at)}</span>
{run.finished_at && (
<span className="run-card__duration" title={`Finished: ${formatDate(run.finished_at)}`}>
{formatDate(run.finished_at)}
</span>
)}
<span className="run-card__tokens">{formatTokens(run.total_tokens)} tokens</span>
<span className="run-card__events">{run.event_count} events</span>
{run.error_stage && (
<span className="run-card__error-stage">Failed at: {run.error_stage}</span>
)}
</button>
{isExpanded && (
<div className="run-card__body">
<EventLog videoId={videoId} status={run.status} runId={run.id} />
</div>
)}
</div>
);
})}
{legacyCount > 0 && (
<div className="run-card run-card--legacy">
<button
className="run-card__header"
onClick={() => setShowLegacy((v) => !v)}
aria-expanded={showLegacy}
>
<span className="run-card__arrow">{showLegacy ? "▾" : "▸"}</span>
<span className="run-card__number">Legacy</span>
<span className="run-card__trigger">Pre-run tracking</span>
<span className="run-card__events">{legacyCount} events</span>
</button>
{showLegacy && (
<div className="run-card__body">
<EventLog videoId={videoId} status="complete" />
</div>
)}
</div>
)}
</div>
);
}
// ── Stage Timeline ───────────────────────────────────────────────────────────
const PIPELINE_STAGES = [
@ -1065,7 +1187,7 @@ export default function AdminPipeline() {
<span>Created: {formatDate(video.created_at)}</span>
<span>Updated: {formatDate(video.updated_at)}</span>
</div>
<EventLog videoId={video.id} status={video.processing_status} />
<RunList videoId={video.id} videoStatus={video.processing_status} />
</div>
)}
</div>