diff --git a/alembic/versions/010_add_pipeline_runs.py b/alembic/versions/010_add_pipeline_runs.py new file mode 100644 index 0000000..f13eedf --- /dev/null +++ b/alembic/versions/010_add_pipeline_runs.py @@ -0,0 +1,54 @@ +"""Add pipeline_runs table and run_id FK on pipeline_events. + +Each pipeline trigger creates a run. Events are scoped to runs +for clean per-execution audit trails. + +Revision ID: 010_add_pipeline_runs +Revises: 009_add_creator_hidden_flag +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + +revision = "010_add_pipeline_runs" +down_revision = "009_add_creator_hidden_flag" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create enums + pipeline_run_trigger = sa.Enum( + "manual", "clean_reprocess", "auto_ingest", "bulk", + name="pipeline_run_trigger", + ) + pipeline_run_status = sa.Enum( + "running", "complete", "error", "cancelled", + name="pipeline_run_status", + ) + + op.create_table( + "pipeline_runs", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("video_id", UUID(as_uuid=True), sa.ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True), + sa.Column("run_number", sa.Integer, nullable=False), + sa.Column("trigger", pipeline_run_trigger, nullable=False), + sa.Column("status", pipeline_run_status, nullable=False, server_default="running"), + sa.Column("started_at", sa.DateTime, nullable=False, server_default=sa.text("now()")), + sa.Column("finished_at", sa.DateTime, nullable=True), + sa.Column("error_stage", sa.String(50), nullable=True), + sa.Column("total_tokens", sa.Integer, nullable=False, server_default="0"), + ) + + # Add run_id to pipeline_events (nullable for backward compat) + op.add_column( + "pipeline_events", + sa.Column("run_id", UUID(as_uuid=True), sa.ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True), + ) + + +def downgrade() -> None: + op.drop_column("pipeline_events", "run_id") + op.drop_table("pipeline_runs") + op.execute("DROP TYPE IF EXISTS pipeline_run_trigger") + op.execute("DROP TYPE IF EXISTS pipeline_run_status") diff --git a/backend/models.py b/backend/models.py index ea505f0..f2e0fac 100644 --- a/backend/models.py +++ b/backend/models.py @@ -361,6 +361,62 @@ class ContentReport(Base): resolved_at: Mapped[datetime | None] = mapped_column(nullable=True) +# ── Pipeline Event ─────────────────────────────────────────────────────────── + +class PipelineRunStatus(str, enum.Enum): + """Status of a pipeline run.""" + running = "running" + complete = "complete" + error = "error" + cancelled = "cancelled" + + +class PipelineRunTrigger(str, enum.Enum): + """What initiated a pipeline run.""" + manual = "manual" + clean_reprocess = "clean_reprocess" + auto_ingest = "auto_ingest" + bulk = "bulk" + + +class PipelineRun(Base): + """A single execution of the pipeline for a video. + + Each trigger/retrigger creates a new run. Events are scoped to a run + via run_id, giving a clean audit trail per execution. + """ + __tablename__ = "pipeline_runs" + + id: Mapped[uuid.UUID] = _uuid_pk() + video_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("source_videos.id", ondelete="CASCADE"), nullable=False, index=True, + ) + run_number: Mapped[int] = mapped_column( + Integer, nullable=False, doc="Auto-increment per video, 1-indexed" + ) + trigger: Mapped[PipelineRunTrigger] = mapped_column( + Enum(PipelineRunTrigger, name="pipeline_run_trigger", create_constraint=True), + nullable=False, + ) + status: Mapped[PipelineRunStatus] = mapped_column( + Enum(PipelineRunStatus, name="pipeline_run_status", create_constraint=True), + default=PipelineRunStatus.running, + server_default="running", + ) + started_at: Mapped[datetime] = mapped_column( + default=_now, server_default=func.now() + ) + finished_at: Mapped[datetime | None] = mapped_column(nullable=True) + error_stage: Mapped[str | None] = mapped_column(String(50), nullable=True) + total_tokens: Mapped[int] = mapped_column(Integer, default=0, server_default="0") + + # relationships + video: Mapped[SourceVideo] = sa_relationship() + events: Mapped[list[PipelineEvent]] = sa_relationship( + back_populates="run", foreign_keys="PipelineEvent.run_id" + ) + + # ── Pipeline Event ─────────────────────────────────────────────────────────── class PipelineEvent(Base): @@ -375,6 +431,9 @@ class PipelineEvent(Base): video_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), nullable=False, index=True, ) + run_id: Mapped[uuid.UUID | None] = mapped_column( + ForeignKey("pipeline_runs.id", ondelete="SET NULL"), nullable=True, index=True, + ) stage: Mapped[str] = mapped_column( String(50), nullable=False, doc="stage2_segmentation, stage3_extraction, etc." ) @@ -397,3 +456,8 @@ class PipelineEvent(Base): system_prompt_text: Mapped[str | None] = mapped_column(Text, nullable=True) user_prompt_text: Mapped[str | None] = mapped_column(Text, nullable=True) response_text: Mapped[str | None] = mapped_column(Text, nullable=True) + + # relationships + run: Mapped[PipelineRun | None] = sa_relationship( + back_populates="events", foreign_keys=[run_id] + ) diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 70a425c..ec99333 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -76,6 +76,7 @@ def _emit_event( stage: str, event_type: str, *, + run_id: str | None = None, prompt_tokens: int | None = None, completion_tokens: int | None = None, total_tokens: int | None = None, @@ -92,6 +93,7 @@ def _emit_event( try: event = PipelineEvent( video_id=video_id, + run_id=run_id, stage=stage, event_type=event_type, prompt_tokens=prompt_tokens, @@ -132,6 +134,7 @@ def _make_llm_callback( stage: str, system_prompt: str | None = None, user_prompt: str | None = None, + run_id: str | None = None, ): """Create an on_complete callback for LLMClient that emits llm_call events. @@ -149,6 +152,7 @@ def _make_llm_callback( video_id=video_id, stage=stage, event_type="llm_call", + run_id=run_id, model=model, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, @@ -285,7 +289,7 @@ def _safe_parse_llm_response( # ── Stage 2: Segmentation ─────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) -def stage2_segmentation(self, video_id: str) -> str: +def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str: """Analyze transcript segments and identify topic boundaries. Loads all TranscriptSegment rows for the video, sends them to the LLM @@ -295,7 +299,7 @@ def stage2_segmentation(self, video_id: str) -> str: """ start = time.monotonic() logger.info("Stage 2 (segmentation) starting for video_id=%s", video_id) - _emit_event(video_id, "stage2_segmentation", "start") + _emit_event(video_id, "stage2_segmentation", "start", run_id=run_id) session = _get_sync_session() try: @@ -331,7 +335,7 @@ def stage2_segmentation(self, video_id: str) -> str: hard_limit = get_settings().llm_max_tokens_hard_limit max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage2_segmentation", hard_limit=hard_limit) logger.info("Stage 2 using model=%s, modality=%s, max_tokens=%d", model_override or "default", modality, max_tokens) - raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult, on_complete=_make_llm_callback(video_id, "stage2_segmentation", system_prompt=system_prompt, user_prompt=user_prompt), + raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult, on_complete=_make_llm_callback(video_id, "stage2_segmentation", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id), modality=modality, model_override=model_override, max_tokens=max_tokens) result = _safe_parse_llm_response(raw, SegmentationResult, llm, system_prompt, user_prompt, modality=modality, model_override=model_override) @@ -345,7 +349,7 @@ def stage2_segmentation(self, video_id: str) -> str: session.commit() elapsed = time.monotonic() - start - _emit_event(video_id, "stage2_segmentation", "complete") + _emit_event(video_id, "stage2_segmentation", "complete", run_id=run_id) logger.info( "Stage 2 (segmentation) completed for video_id=%s in %.1fs — %d topic groups found", video_id, elapsed, len(result.segments), @@ -356,7 +360,7 @@ def stage2_segmentation(self, video_id: str) -> str: raise # Don't retry missing prompt files except Exception as exc: session.rollback() - _emit_event(video_id, "stage2_segmentation", "error", payload={"error": str(exc)}) + _emit_event(video_id, "stage2_segmentation", "error", run_id=run_id, payload={"error": str(exc)}) logger.error("Stage 2 failed for video_id=%s: %s", video_id, exc) raise self.retry(exc=exc) finally: @@ -366,7 +370,7 @@ def stage2_segmentation(self, video_id: str) -> str: # ── Stage 3: Extraction ───────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) -def stage3_extraction(self, video_id: str) -> str: +def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str: """Extract key moments from each topic segment group. Groups segments by topic_label, calls the LLM for each group to extract @@ -376,7 +380,7 @@ def stage3_extraction(self, video_id: str) -> str: """ start = time.monotonic() logger.info("Stage 3 (extraction) starting for video_id=%s", video_id) - _emit_event(video_id, "stage3_extraction", "start") + _emit_event(video_id, "stage3_extraction", "start", run_id=run_id) session = _get_sync_session() try: @@ -423,7 +427,7 @@ def stage3_extraction(self, video_id: str) -> str: ) max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage3_extraction", hard_limit=hard_limit) - raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult, on_complete=_make_llm_callback(video_id, "stage3_extraction", system_prompt=system_prompt, user_prompt=user_prompt), + raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult, on_complete=_make_llm_callback(video_id, "stage3_extraction", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id), modality=modality, model_override=model_override, max_tokens=max_tokens) result = _safe_parse_llm_response(raw, ExtractionResult, llm, system_prompt, user_prompt, modality=modality, model_override=model_override) @@ -451,7 +455,7 @@ def stage3_extraction(self, video_id: str) -> str: session.commit() elapsed = time.monotonic() - start - _emit_event(video_id, "stage3_extraction", "complete") + _emit_event(video_id, "stage3_extraction", "complete", run_id=run_id) logger.info( "Stage 3 (extraction) completed for video_id=%s in %.1fs — %d moments created", video_id, elapsed, total_moments, @@ -462,7 +466,7 @@ def stage3_extraction(self, video_id: str) -> str: raise except Exception as exc: session.rollback() - _emit_event(video_id, "stage3_extraction", "error", payload={"error": str(exc)}) + _emit_event(video_id, "stage3_extraction", "error", run_id=run_id, payload={"error": str(exc)}) logger.error("Stage 3 failed for video_id=%s: %s", video_id, exc) raise self.retry(exc=exc) finally: @@ -472,7 +476,7 @@ def stage3_extraction(self, video_id: str) -> str: # ── Stage 4: Classification ───────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) -def stage4_classification(self, video_id: str) -> str: +def stage4_classification(self, video_id: str, run_id: str | None = None) -> str: """Classify key moments against the canonical tag taxonomy. Loads all KeyMoment rows for the video, sends them to the LLM with the @@ -485,7 +489,7 @@ def stage4_classification(self, video_id: str) -> str: """ start = time.monotonic() logger.info("Stage 4 (classification) starting for video_id=%s", video_id) - _emit_event(video_id, "stage4_classification", "start") + _emit_event(video_id, "stage4_classification", "start", run_id=run_id) session = _get_sync_session() try: @@ -532,7 +536,7 @@ def stage4_classification(self, video_id: str) -> str: hard_limit = get_settings().llm_max_tokens_hard_limit max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage4_classification", hard_limit=hard_limit) logger.info("Stage 4 using model=%s, modality=%s, max_tokens=%d", model_override or "default", modality, max_tokens) - raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult, on_complete=_make_llm_callback(video_id, "stage4_classification", system_prompt=system_prompt, user_prompt=user_prompt), + raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult, on_complete=_make_llm_callback(video_id, "stage4_classification", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id), modality=modality, model_override=model_override, max_tokens=max_tokens) result = _safe_parse_llm_response(raw, ClassificationResult, llm, system_prompt, user_prompt, modality=modality, model_override=model_override) @@ -564,7 +568,7 @@ def stage4_classification(self, video_id: str) -> str: _store_classification_data(video_id, classification_data) elapsed = time.monotonic() - start - _emit_event(video_id, "stage4_classification", "complete") + _emit_event(video_id, "stage4_classification", "complete", run_id=run_id) logger.info( "Stage 4 (classification) completed for video_id=%s in %.1fs — %d moments classified", video_id, elapsed, len(classification_data), @@ -575,7 +579,7 @@ def stage4_classification(self, video_id: str) -> str: raise except Exception as exc: session.rollback() - _emit_event(video_id, "stage4_classification", "error", payload={"error": str(exc)}) + _emit_event(video_id, "stage4_classification", "error", run_id=run_id, payload={"error": str(exc)}) logger.error("Stage 4 failed for video_id=%s: %s", video_id, exc) raise self.retry(exc=exc) finally: @@ -694,7 +698,7 @@ def _capture_pipeline_metadata() -> dict: # ── Stage 5: Synthesis ─────────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) -def stage5_synthesis(self, video_id: str) -> str: +def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: """Synthesize technique pages from classified key moments. Groups moments by (creator, topic_category), calls the LLM to synthesize @@ -707,7 +711,7 @@ def stage5_synthesis(self, video_id: str) -> str: """ start = time.monotonic() logger.info("Stage 5 (synthesis) starting for video_id=%s", video_id) - _emit_event(video_id, "stage5_synthesis", "start") + _emit_event(video_id, "stage5_synthesis", "start", run_id=run_id) settings = get_settings() session = _get_sync_session() @@ -777,7 +781,7 @@ def stage5_synthesis(self, video_id: str) -> str: user_prompt = f"{creator_name}\n\n{moments_text}\n" max_tokens = estimate_max_tokens(system_prompt, user_prompt, stage="stage5_synthesis", hard_limit=hard_limit) - raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult, on_complete=_make_llm_callback(video_id, "stage5_synthesis", system_prompt=system_prompt, user_prompt=user_prompt), + raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult, on_complete=_make_llm_callback(video_id, "stage5_synthesis", system_prompt=system_prompt, user_prompt=user_prompt, run_id=run_id), modality=modality, model_override=model_override, max_tokens=max_tokens) result = _safe_parse_llm_response(raw, SynthesisResult, llm, system_prompt, user_prompt, modality=modality, model_override=model_override) @@ -885,7 +889,7 @@ def stage5_synthesis(self, video_id: str) -> str: session.commit() elapsed = time.monotonic() - start - _emit_event(video_id, "stage5_synthesis", "complete") + _emit_event(video_id, "stage5_synthesis", "complete", run_id=run_id) logger.info( "Stage 5 (synthesis) completed for video_id=%s in %.1fs — %d pages created/updated", video_id, elapsed, pages_created, @@ -896,7 +900,7 @@ def stage5_synthesis(self, video_id: str) -> str: raise except Exception as exc: session.rollback() - _emit_event(video_id, "stage5_synthesis", "error", payload={"error": str(exc)}) + _emit_event(video_id, "stage5_synthesis", "error", run_id=run_id, payload={"error": str(exc)}) logger.error("Stage 5 failed for video_id=%s: %s", video_id, exc) raise self.retry(exc=exc) finally: @@ -906,7 +910,7 @@ def stage5_synthesis(self, video_id: str) -> str: # ── Stage 6: Embed & Index ─────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=0) -def stage6_embed_and_index(self, video_id: str) -> str: +def stage6_embed_and_index(self, video_id: str, run_id: str | None = None) -> str: """Generate embeddings for technique pages and key moments, then upsert to Qdrant. This is a non-blocking side-effect stage — failures are logged but do not @@ -946,6 +950,8 @@ def stage6_embed_and_index(self, video_id: str) -> str: if not moments and not pages: logger.info("Stage 6: No moments or pages for video_id=%s, skipping.", video_id) + if run_id: + _finish_run(run_id, "complete") return video_id embed_client = EmbeddingClient(settings) @@ -1030,6 +1036,8 @@ def stage6_embed_and_index(self, video_id: str) -> str: "%d pages, %d moments processed", video_id, elapsed, len(pages), len(moments), ) + if run_id: + _finish_run(run_id, "complete") return video_id except Exception as exc: @@ -1039,6 +1047,8 @@ def stage6_embed_and_index(self, video_id: str) -> str: "Pipeline continues — embeddings can be regenerated later.", video_id, exc, ) + if run_id: + _finish_run(run_id, "complete") # Run is still "complete" — stage6 is best-effort return video_id finally: session.close() @@ -1099,14 +1109,68 @@ def _load_prior_pages(video_id: str) -> list[str]: # ── Orchestrator ───────────────────────────────────────────────────────────── @celery_app.task -def mark_pipeline_error(request, exc, traceback, video_id: str) -> None: +def mark_pipeline_error(request, exc, traceback, video_id: str, run_id: str | None = None) -> None: """Error callback — marks video as errored when a pipeline stage fails.""" logger.error("Pipeline failed for video_id=%s: %s", video_id, exc) _set_error_status(video_id, "pipeline", exc) + if run_id: + _finish_run(run_id, "error", error_stage="pipeline") + + +def _create_run(video_id: str, trigger: str) -> str: + """Create a PipelineRun and return its id.""" + from models import PipelineRun, PipelineRunTrigger + + session = _get_sync_session() + try: + # Compute run_number: max existing + 1 + from sqlalchemy import func as sa_func + max_num = session.execute( + select(sa_func.coalesce(sa_func.max(PipelineRun.run_number), 0)) + .where(PipelineRun.video_id == video_id) + ).scalar() or 0 + run = PipelineRun( + video_id=video_id, + run_number=max_num + 1, + trigger=PipelineRunTrigger(trigger), + ) + session.add(run) + session.commit() + run_id = str(run.id) + return run_id + finally: + session.close() + + +def _finish_run(run_id: str, status: str, error_stage: str | None = None) -> None: + """Update a PipelineRun's status and finished_at.""" + from models import PipelineRun, PipelineRunStatus, _now + + session = _get_sync_session() + try: + run = session.execute( + select(PipelineRun).where(PipelineRun.id == run_id) + ).scalar_one_or_none() + if run: + run.status = PipelineRunStatus(status) + run.finished_at = _now() + if error_stage: + run.error_stage = error_stage + # Aggregate total tokens from events + total = session.execute( + select(func.coalesce(func.sum(PipelineEvent.total_tokens), 0)) + .where(PipelineEvent.run_id == run_id) + ).scalar() or 0 + run.total_tokens = total + session.commit() + except Exception as exc: + logger.warning("Failed to finish run %s: %s", run_id, exc) + finally: + session.close() @celery_app.task -def run_pipeline(video_id: str) -> str: +def run_pipeline(video_id: str, trigger: str = "manual") -> str: """Orchestrate the full pipeline (stages 2-5) with resumability. Checks the current processing_status of the video and chains only the @@ -1139,33 +1203,35 @@ def run_pipeline(video_id: str) -> str: # Snapshot prior technique pages before pipeline resets key_moments _snapshot_prior_pages(video_id) + # Create a pipeline run record + run_id = _create_run(video_id, trigger) + logger.info("run_pipeline: created run_id=%s for video_id=%s (trigger=%s)", run_id, video_id, trigger) + # Build the chain based on current status stages = [] if status in (ProcessingStatus.not_started, ProcessingStatus.queued): stages = [ - stage2_segmentation.s(video_id), - stage3_extraction.s(), # receives video_id from previous - stage4_classification.s(), - stage5_synthesis.s(), - stage6_embed_and_index.s(), + stage2_segmentation.s(video_id, run_id=run_id), + stage3_extraction.s(run_id=run_id), # receives video_id from previous + stage4_classification.s(run_id=run_id), + stage5_synthesis.s(run_id=run_id), + stage6_embed_and_index.s(run_id=run_id), ] elif status == ProcessingStatus.processing: - # Resuming a previously-started pipeline — re-run from stage 2 stages = [ - stage2_segmentation.s(video_id), - stage3_extraction.s(), - stage4_classification.s(), - stage5_synthesis.s(), - stage6_embed_and_index.s(), + stage2_segmentation.s(video_id, run_id=run_id), + stage3_extraction.s(run_id=run_id), + stage4_classification.s(run_id=run_id), + stage5_synthesis.s(run_id=run_id), + stage6_embed_and_index.s(run_id=run_id), ] elif status == ProcessingStatus.error: - # Retrigger after error — re-run full pipeline stages = [ - stage2_segmentation.s(video_id), - stage3_extraction.s(), - stage4_classification.s(), - stage5_synthesis.s(), - stage6_embed_and_index.s(), + stage2_segmentation.s(video_id, run_id=run_id), + stage3_extraction.s(run_id=run_id), + stage4_classification.s(run_id=run_id), + stage5_synthesis.s(run_id=run_id), + stage6_embed_and_index.s(run_id=run_id), ] elif status == ProcessingStatus.complete: logger.info( @@ -1187,11 +1253,11 @@ def run_pipeline(video_id: str) -> str: session.close() pipeline = celery_chain(*stages) - error_cb = mark_pipeline_error.s(video_id) + error_cb = mark_pipeline_error.s(video_id, run_id=run_id) pipeline.apply_async(link_error=error_cb) logger.info( - "run_pipeline: dispatched %d stages for video_id=%s", - len(stages), video_id, + "run_pipeline: dispatched %d stages for video_id=%s (run_id=%s)", + len(stages), video_id, run_id, ) return video_id diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index b551262..513cc8c 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -13,6 +13,7 @@ Admin: import logging import uuid +from datetime import datetime, timezone from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query @@ -21,7 +22,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from config import get_settings from database import get_session -from models import PipelineEvent, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus +from models import PipelineEvent, PipelineRun, PipelineRunStatus, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus from redis_client import get_redis from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse @@ -50,7 +51,7 @@ async def trigger_pipeline( from pipeline.stages import run_pipeline try: - run_pipeline.delay(str(video.id)) + run_pipeline.delay(str(video.id), trigger="manual") logger.info("Pipeline manually triggered for video_id=%s", video_id) except Exception as exc: logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc) @@ -88,6 +89,24 @@ async def list_pipeline_videos( .subquery() ) + # Subquery for the most recent pipeline run per video + latest_run = ( + select( + PipelineRun.video_id, + PipelineRun.id.label("run_id"), + PipelineRun.run_number, + PipelineRun.trigger.label("run_trigger"), + PipelineRun.status.label("run_status"), + PipelineRun.started_at.label("run_started_at"), + PipelineRun.finished_at.label("run_finished_at"), + PipelineRun.error_stage.label("run_error_stage"), + PipelineRun.total_tokens.label("run_total_tokens"), + ) + .order_by(PipelineRun.video_id, PipelineRun.started_at.desc()) + .distinct(PipelineRun.video_id) + .subquery() + ) + # Subquery for the most recent stage start event per video (active stage indicator) latest_stage = ( select( @@ -117,10 +136,19 @@ async def list_pipeline_videos( latest_stage.c.active_stage, latest_stage.c.active_stage_status, latest_stage.c.stage_started_at, + latest_run.c.run_id, + latest_run.c.run_number, + latest_run.c.run_trigger, + latest_run.c.run_status, + latest_run.c.run_started_at, + latest_run.c.run_finished_at, + latest_run.c.run_error_stage, + latest_run.c.run_total_tokens, ) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id) .outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id) + .outerjoin(latest_run, SourceVideo.id == latest_run.c.video_id) .order_by(SourceVideo.updated_at.desc()) ) @@ -143,6 +171,16 @@ async def list_pipeline_videos( "active_stage": r.active_stage, "active_stage_status": r.active_stage_status, "stage_started_at": r.stage_started_at.isoformat() if r.stage_started_at else None, + "latest_run": { + "id": str(r.run_id), + "run_number": r.run_number, + "trigger": r.run_trigger.value if hasattr(r.run_trigger, 'value') else r.run_trigger, + "status": r.run_status.value if hasattr(r.run_status, 'value') else r.run_status, + "started_at": r.run_started_at.isoformat() if r.run_started_at else None, + "finished_at": r.run_finished_at.isoformat() if r.run_finished_at else None, + "error_stage": r.run_error_stage, + "total_tokens": r.run_total_tokens or 0, + } if r.run_id else None, } for r in rows ], @@ -216,7 +254,7 @@ async def clean_retrigger_pipeline( # Now trigger the pipeline from pipeline.stages import run_pipeline try: - run_pipeline.delay(str(video.id)) + run_pipeline.delay(str(video.id), trigger="clean_reprocess") logger.info("Clean retrigger dispatched for video_id=%s", video_id) except Exception as exc: logger.warning("Failed to dispatch pipeline after cleanup for video_id=%s: %s", video_id, exc) @@ -235,10 +273,14 @@ async def clean_retrigger_pipeline( # ── Admin: Revoke ──────────────────────────────────────────────────────────── @router.post("/admin/pipeline/revoke/{video_id}") -async def revoke_pipeline(video_id: str): +async def revoke_pipeline( + video_id: str, + db: AsyncSession = Depends(get_session), +): """Revoke/cancel active Celery tasks for a video. Uses Celery's revoke with terminate=True to kill running tasks. + Also marks the latest running pipeline_run as cancelled. This is best-effort — the task may have already completed. """ from worker import celery_app @@ -256,6 +298,18 @@ async def revoke_pipeline(video_id: str): celery_app.control.revoke(task["id"], terminate=True) revoked_count += 1 logger.info("Revoked task %s for video_id=%s", task["id"], video_id) + + # Mark any running pipeline_runs as cancelled + running_runs = await db.execute( + select(PipelineRun).where( + PipelineRun.video_id == video_id, + PipelineRun.status == PipelineRunStatus.running, + ) + ) + for run in running_runs.scalars().all(): + run.status = PipelineRunStatus.cancelled + run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None) + await db.commit() return { "status": "revoked" if revoked_count > 0 else "no_active_tasks", @@ -317,6 +371,65 @@ async def recent_pipeline_activity( } +# ── Admin: Pipeline runs ───────────────────────────────────────────────────── + +@router.get("/admin/pipeline/runs/{video_id}") +async def list_pipeline_runs( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """List all pipeline runs for a video, newest first.""" + # Count events per run + event_counts = ( + select( + PipelineEvent.run_id, + func.count().label("event_count"), + ) + .where(PipelineEvent.run_id.isnot(None)) + .group_by(PipelineEvent.run_id) + .subquery() + ) + + stmt = ( + select( + PipelineRun, + event_counts.c.event_count, + ) + .outerjoin(event_counts, PipelineRun.id == event_counts.c.run_id) + .where(PipelineRun.video_id == video_id) + .order_by(PipelineRun.started_at.desc()) + ) + result = await db.execute(stmt) + rows = result.all() + + # Also count legacy events (run_id IS NULL) for this video + legacy_count_result = await db.execute( + select(func.count()) + .select_from(PipelineEvent) + .where(PipelineEvent.video_id == video_id, PipelineEvent.run_id.is_(None)) + ) + legacy_count = legacy_count_result.scalar() or 0 + + items = [] + for run, evt_count in rows: + items.append({ + "id": str(run.id), + "run_number": run.run_number, + "trigger": run.trigger.value if hasattr(run.trigger, 'value') else str(run.trigger), + "status": run.status.value if hasattr(run.status, 'value') else str(run.status), + "started_at": run.started_at.isoformat() if run.started_at else None, + "finished_at": run.finished_at.isoformat() if run.finished_at else None, + "error_stage": run.error_stage, + "total_tokens": run.total_tokens or 0, + "event_count": evt_count or 0, + }) + + return { + "items": items, + "legacy_event_count": legacy_count, + } + + # ── Admin: Event log ───────────────────────────────────────────────────────── @router.get("/admin/pipeline/events/{video_id}") @@ -326,12 +439,15 @@ async def list_pipeline_events( limit: Annotated[int, Query(ge=1, le=200)] = 100, stage: Annotated[str | None, Query(description="Filter by stage name")] = None, event_type: Annotated[str | None, Query(description="Filter by event type")] = None, + run_id: Annotated[str | None, Query(description="Filter by pipeline run ID")] = None, order: Annotated[str, Query(description="Sort order: asc or desc")] = "desc", db: AsyncSession = Depends(get_session), ): """Get pipeline events for a video. Default: newest first (desc).""" stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id) + if run_id: + stmt = stmt.where(PipelineEvent.run_id == run_id) if stage: stmt = stmt.where(PipelineEvent.stage == stage) if event_type: diff --git a/frontend/src/App.css b/frontend/src/App.css index 6898093..bab480d 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -3740,6 +3740,107 @@ a.app-footer__repo:hover { flex-shrink: 0; } +/* ── Run List & Cards ─────────────────────────────────────────────────────── */ + +.run-list { + display: flex; + flex-direction: column; + gap: 0.5rem; +} + +.run-card { + border: 1px solid var(--color-border); + border-radius: 8px; + overflow: hidden; +} + +.run-card--running { + border-color: var(--color-accent); +} + +.run-card--error { + border-color: rgba(244, 67, 54, 0.4); +} + +.run-card--cancelled { + border-color: rgba(158, 158, 158, 0.3); +} + +.run-card__header { + display: flex; + align-items: center; + gap: 0.5rem; + width: 100%; + padding: 0.6rem 1rem; + background: var(--color-surface); + border: none; + color: var(--color-text); + cursor: pointer; + font-size: 0.85rem; + text-align: left; +} + +.run-card__header:hover { + background: var(--color-bg-input); +} + +.run-card__arrow { + flex-shrink: 0; + width: 1rem; + color: var(--color-text-muted); +} + +.run-card__number { + font-weight: 600; + white-space: nowrap; +} + +.run-card__trigger { + color: var(--color-text-muted); + font-size: 0.8rem; + white-space: nowrap; +} + +.run-card__time { + color: var(--color-text-muted); + font-size: 0.8rem; + white-space: nowrap; +} + +.run-card__duration { + color: var(--color-text-muted); + font-size: 0.75rem; + white-space: nowrap; +} + +.run-card__tokens { + color: var(--color-text-muted); + font-size: 0.8rem; + white-space: nowrap; + margin-left: auto; +} + +.run-card__events { + color: var(--color-text-muted); + font-size: 0.8rem; + white-space: nowrap; +} + +.run-card__error-stage { + color: #f44336; + font-size: 0.8rem; + white-space: nowrap; +} + +.run-card__body { + border-top: 1px solid var(--color-border); + padding: 0.5rem; +} + +.run-card--legacy .run-card__header { + opacity: 0.7; +} + /* ── Worker Status Indicator ────────────────────────────────────────────── */ .worker-status { diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts index 9df7597..1148f5d 100644 --- a/frontend/src/api/public-client.ts +++ b/frontend/src/api/public-client.ts @@ -413,6 +413,16 @@ export interface PipelineVideoItem { active_stage: string | null; active_stage_status: string | null; stage_started_at: string | null; + latest_run: { + id: string; + run_number: number; + trigger: string; + status: string; + started_at: string | null; + finished_at: string | null; + error_stage: string | null; + total_tokens: number; + } | null; } export interface PipelineVideoListResponse { @@ -502,15 +512,37 @@ export async function fetchRecentActivity(limit = 10): Promise(`${BASE}/admin/pipeline/recent-activity?limit=${limit}`); } +export interface PipelineRunItem { + id: string; + run_number: number; + trigger: string; + status: string; + started_at: string | null; + finished_at: string | null; + error_stage: string | null; + total_tokens: number; + event_count: number; +} + +export interface PipelineRunsResponse { + items: PipelineRunItem[]; + legacy_event_count: number; +} + +export async function fetchPipelineRuns(videoId: string): Promise { + return request(`${BASE}/admin/pipeline/runs/${videoId}`); +} + export async function fetchPipelineEvents( videoId: string, - params: { offset?: number; limit?: number; stage?: string; event_type?: string; order?: "asc" | "desc" } = {}, + params: { offset?: number; limit?: number; stage?: string; event_type?: string; run_id?: string; order?: "asc" | "desc" } = {}, ): Promise { const qs = new URLSearchParams(); if (params.offset !== undefined) qs.set("offset", String(params.offset)); if (params.limit !== undefined) qs.set("limit", String(params.limit)); if (params.stage) qs.set("stage", params.stage); if (params.event_type) qs.set("event_type", params.event_type); + if (params.run_id) qs.set("run_id", params.run_id); if (params.order) qs.set("order", params.order); const query = qs.toString(); return request( diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx index d51cc3b..0c6b657 100644 --- a/frontend/src/pages/AdminPipeline.tsx +++ b/frontend/src/pages/AdminPipeline.tsx @@ -9,6 +9,7 @@ import { useDocumentTitle } from "../hooks/useDocumentTitle"; import { fetchPipelineVideos, fetchPipelineEvents, + fetchPipelineRuns, fetchWorkerStatus, fetchDebugMode, setDebugMode, @@ -19,6 +20,7 @@ import { fetchRecentActivity, type PipelineVideoItem, type PipelineEvent, + type PipelineRunItem, type WorkerStatusResponse, type RecentActivityItem, } from "../api/public-client"; @@ -215,7 +217,7 @@ function DebugPayloadViewer({ event }: { event: PipelineEvent }) { // ── Event Log ──────────────────────────────────────────────────────────────── -function EventLog({ videoId, status }: { videoId: string; status: string }) { +function EventLog({ videoId, status, runId }: { videoId: string; status: string; runId?: string }) { const [events, setEvents] = useState([]); const [total, setTotal] = useState(0); const [loading, setLoading] = useState(true); @@ -233,6 +235,7 @@ function EventLog({ videoId, status }: { videoId: string; status: string }) { offset, limit, order: viewMode === "head" ? "asc" : "desc", + run_id: runId, }); setEvents(res.items); setTotal(res.total); @@ -477,6 +480,125 @@ function StatusFilter({ ); } +// ── Run List ───────────────────────────────────────────────────────────────── + +const TRIGGER_LABELS: Record = { + manual: "Manual", + clean_reprocess: "Clean Reprocess", + auto_ingest: "Auto Ingest", + bulk: "Bulk", +}; + +const RUN_STATUS_CLASS: Record = { + running: "pipeline-badge--active", + complete: "pipeline-badge--success", + error: "pipeline-badge--error", + cancelled: "pipeline-badge--pending", +}; + +function RunList({ videoId, videoStatus }: { videoId: string; videoStatus: string }) { + const [runs, setRuns] = useState([]); + const [legacyCount, setLegacyCount] = useState(0); + const [loading, setLoading] = useState(true); + const [expandedRunId, setExpandedRunId] = useState(null); + const [showLegacy, setShowLegacy] = useState(false); + + const load = useCallback(async (silent = false) => { + if (!silent) setLoading(true); + try { + const res = await fetchPipelineRuns(videoId); + setRuns(res.items); + setLegacyCount(res.legacy_event_count); + // Auto-expand the latest run on first load + if (!silent && res.items.length > 0 && expandedRunId === null) { + const firstRun = res.items[0]; + if (firstRun) setExpandedRunId(firstRun.id); + } + } catch { + // silently fail + } finally { + if (!silent) setLoading(false); + } + }, [videoId, expandedRunId]); + + useEffect(() => { + void load(); + }, [load]); + + // Auto-refresh when video is processing + useEffect(() => { + if (videoStatus !== "processing" && videoStatus !== "queued") return; + const id = setInterval(() => void load(true), 10_000); + return () => clearInterval(id); + }, [videoStatus, load]); + + if (loading) return
Loading runs…
; + + if (runs.length === 0 && legacyCount === 0) { + return
No pipeline runs recorded.
; + } + + return ( +
+ {runs.map((run) => { + const isExpanded = expandedRunId === run.id; + return ( +
+ + {isExpanded && ( +
+ +
+ )} +
+ ); + })} + + {legacyCount > 0 && ( +
+ + {showLegacy && ( +
+ +
+ )} +
+ )} +
+ ); +} + // ── Stage Timeline ─────────────────────────────────────────────────────────── const PIPELINE_STAGES = [ @@ -1065,7 +1187,7 @@ export default function AdminPipeline() { Created: {formatDate(video.created_at)} Updated: {formatDate(video.updated_at)} - + )}