diff --git a/alembic/versions/008_rename_processing_status.py b/alembic/versions/008_rename_processing_status.py new file mode 100644 index 0000000..fe08d1f --- /dev/null +++ b/alembic/versions/008_rename_processing_status.py @@ -0,0 +1,35 @@ +"""Rename processing_status values to user-meaningful lifecycle states. + +Old: pending, transcribed, extracted, published +New: not_started, queued, processing, error, complete + +Revision ID: 008_rename_processing_status +Revises: 007_drop_review_columns +""" +from alembic import op + +revision = "008_rename_processing_status" +down_revision = "007_drop_review_columns" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add new enum values first + op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'not_started'") + op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'queued'") + op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'processing'") + op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'error'") + op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'complete'") + # Migrate existing rows + op.execute("UPDATE source_videos SET processing_status = 'not_started' WHERE processing_status = 'pending'") + op.execute("UPDATE source_videos SET processing_status = 'queued' WHERE processing_status = 'transcribed'") + op.execute("UPDATE source_videos SET processing_status = 'processing' WHERE processing_status = 'extracted'") + op.execute("UPDATE source_videos SET processing_status = 'complete' WHERE processing_status = 'published'") + + +def downgrade() -> None: + op.execute("UPDATE source_videos SET processing_status = 'pending' WHERE processing_status = 'not_started'") + op.execute("UPDATE source_videos SET processing_status = 'transcribed' WHERE processing_status = 'queued'") + op.execute("UPDATE source_videos SET processing_status = 'extracted' WHERE processing_status = 'processing'") + op.execute("UPDATE source_videos SET processing_status = 'published' WHERE processing_status = 'complete'") diff --git a/backend/models.py b/backend/models.py index ec71bb9..e7b21cc 100644 --- a/backend/models.py +++ b/backend/models.py @@ -39,11 +39,16 @@ class ContentType(str, enum.Enum): class ProcessingStatus(str, enum.Enum): - """Pipeline processing status for a source video.""" - pending = "pending" - transcribed = "transcribed" - extracted = "extracted" - published = "published" + """Pipeline processing status for a source video. + + User-facing lifecycle: not_started → queued → processing → complete + Error branch: processing → error (retrigger resets to queued) + """ + not_started = "not_started" + queued = "queued" + processing = "processing" + error = "error" + complete = "complete" class KeyMomentContentType(str, enum.Enum): @@ -129,8 +134,8 @@ class SourceVideo(Base): content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True) processing_status: Mapped[ProcessingStatus] = mapped_column( Enum(ProcessingStatus, name="processing_status", create_constraint=True), - default=ProcessingStatus.pending, - server_default="pending", + default=ProcessingStatus.not_started, + server_default="not_started", ) created_at: Mapped[datetime] = mapped_column( default=_now, server_default=func.now() diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index 1fef96e..f18e1fc 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -49,6 +49,26 @@ from worker import celery_app logger = logging.getLogger(__name__) +# ── Error status helper ────────────────────────────────────────────────────── + +def _set_error_status(video_id: str, stage_name: str, error: Exception) -> None: + """Mark a video as errored when a pipeline stage fails permanently.""" + try: + session = _get_sync_session() + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one_or_none() + if video: + video.processing_status = ProcessingStatus.error + session.commit() + session.close() + except Exception as mark_exc: + logger.error( + "Failed to mark video_id=%s as error after %s failure: %s", + video_id, stage_name, mark_exc, + ) + + # ── Pipeline event persistence ─────────────────────────────────────────────── def _emit_event( @@ -429,12 +449,6 @@ def stage3_extraction(self, video_id: str) -> str: session.add(km) total_moments += 1 - # Update processing_status to extracted - video = session.execute( - select(SourceVideo).where(SourceVideo.id == video_id) - ).scalar_one() - video.processing_status = ProcessingStatus.extracted - session.commit() elapsed = time.monotonic() - start _emit_event(video_id, "stage3_extraction", "complete") @@ -867,7 +881,7 @@ def stage5_synthesis(self, video_id: str) -> str: m.technique_page_id = page.id # Update processing_status - video.processing_status = ProcessingStatus.published + video.processing_status = ProcessingStatus.complete session.commit() elapsed = time.monotonic() - start @@ -1074,15 +1088,22 @@ def _load_prior_pages(video_id: str) -> list[str]: # ── Orchestrator ───────────────────────────────────────────────────────────── +@celery_app.task +def mark_pipeline_error(request, exc, traceback, video_id: str) -> None: + """Error callback — marks video as errored when a pipeline stage fails.""" + logger.error("Pipeline failed for video_id=%s: %s", video_id, exc) + _set_error_status(video_id, "pipeline", exc) + + @celery_app.task def run_pipeline(video_id: str) -> str: """Orchestrate the full pipeline (stages 2-5) with resumability. Checks the current processing_status of the video and chains only the stages that still need to run. For example: - - pending/transcribed → stages 2, 3, 4, 5 - - extracted → stages 4, 5 - - published → no-op + - queued → stages 2, 3, 4, 5 + - processing/error → re-run full pipeline + - complete → no-op Returns the video_id. """ @@ -1110,7 +1131,7 @@ def run_pipeline(video_id: str) -> str: # Build the chain based on current status stages = [] - if status in (ProcessingStatus.pending, ProcessingStatus.transcribed): + if status in (ProcessingStatus.not_started, ProcessingStatus.queued): stages = [ stage2_segmentation.s(video_id), stage3_extraction.s(), # receives video_id from previous @@ -1118,13 +1139,25 @@ def run_pipeline(video_id: str) -> str: stage5_synthesis.s(), stage6_embed_and_index.s(), ] - elif status == ProcessingStatus.extracted: + elif status == ProcessingStatus.processing: + # Resuming a previously-started pipeline — re-run from stage 2 stages = [ - stage4_classification.s(video_id), + stage2_segmentation.s(video_id), + stage3_extraction.s(), + stage4_classification.s(), stage5_synthesis.s(), stage6_embed_and_index.s(), ] - elif status == ProcessingStatus.published: + elif status == ProcessingStatus.error: + # Retrigger after error — re-run full pipeline + stages = [ + stage2_segmentation.s(video_id), + stage3_extraction.s(), + stage4_classification.s(), + stage5_synthesis.s(), + stage6_embed_and_index.s(), + ] + elif status == ProcessingStatus.complete: logger.info( "run_pipeline: video_id=%s already at status=%s, nothing to do.", video_id, status.value, @@ -1132,8 +1165,20 @@ def run_pipeline(video_id: str) -> str: return video_id if stages: + # Mark as processing before dispatching + session = _get_sync_session() + try: + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + video.processing_status = ProcessingStatus.processing + session.commit() + finally: + session.close() + pipeline = celery_chain(*stages) - pipeline.apply_async() + error_cb = mark_pipeline_error.s(video_id) + pipeline.apply_async(link_error=error_cb) logger.info( "run_pipeline: dispatched %d stages for video_id=%s", len(stages), video_id, diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py index 61b3e30..192c8ae 100644 --- a/backend/routers/ingest.py +++ b/backend/routers/ingest.py @@ -180,7 +180,7 @@ async def ingest_transcript( ) video.duration_seconds = duration_seconds video.content_hash = content_hash - video.processing_status = ProcessingStatus.transcribed + video.processing_status = ProcessingStatus.queued elif is_duplicate_content: # Same content, different filename — update the existing record video = matched_video @@ -193,7 +193,7 @@ async def ingest_transcript( video.file_path = f"{creator_folder}/{source_file}" video.duration_seconds = duration_seconds video.content_hash = content_hash - video.processing_status = ProcessingStatus.transcribed + video.processing_status = ProcessingStatus.queued is_reupload = True # Treat as reupload for response else: video = SourceVideo( @@ -203,7 +203,7 @@ async def ingest_transcript( duration_seconds=duration_seconds, content_type=ContentType.tutorial, content_hash=content_hash, - processing_status=ProcessingStatus.transcribed, + processing_status=ProcessingStatus.queued, ) db.add(video) await db.flush() # assign id diff --git a/backend/schemas.py b/backend/schemas.py index de0d716..a977caf 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -64,7 +64,7 @@ class SourceVideoRead(SourceVideoBase): id: uuid.UUID creator_id: uuid.UUID content_hash: str | None = None - processing_status: str = "pending" + processing_status: str = "not_started" created_at: datetime updated_at: datetime diff --git a/backend/tests/test_pipeline.py b/backend/tests/test_pipeline.py index 4641a78..d5b7119 100644 --- a/backend/tests/test_pipeline.py +++ b/backend/tests/test_pipeline.py @@ -191,7 +191,7 @@ def test_stage2_segmentation_updates_topic_labels( def test_stage3_extraction_creates_key_moments( db_engine, sync_engine, pre_ingested_video, tmp_path ): - """Stages 2+3 should create KeyMoment rows and set processing_status=extracted.""" + """Stages 2+3 should create KeyMoment rows.""" video_id = pre_ingested_video["video_id"] prompts_dir = tmp_path / "prompts" @@ -250,12 +250,6 @@ def test_stage3_extraction_creates_key_moments( assert len(moments) >= 2 assert moments[0].title == "Setting Levels for Gain Staging" assert moments[0].content_type == KeyMomentContentType.technique - - # Verify processing_status - video = session.execute( - select(SourceVideo).where(SourceVideo.id == video_id) - ).scalar_one() - assert video.processing_status == ProcessingStatus.extracted finally: session.close() @@ -459,7 +453,7 @@ def test_stage5_synthesis_creates_technique_pages( video = session.execute( select(SourceVideo).where(SourceVideo.id == video_id) ).scalar_one() - assert video.processing_status == ProcessingStatus.published + assert video.processing_status == ProcessingStatus.complete finally: session.close() @@ -580,19 +574,19 @@ def test_stage6_embeds_and_upserts_to_qdrant( # ── (f) Resumability ──────────────────────────────────────────────────────── -def test_run_pipeline_resumes_from_extracted( +def test_run_pipeline_resumes_from_processing( db_engine, sync_engine, pre_ingested_video, tmp_path ): - """When status=extracted, run_pipeline should skip stages 2+3 and run 4+5+6.""" + """When status=processing, run_pipeline should re-run the full pipeline.""" video_id = pre_ingested_video["video_id"] - # Set video status to "extracted" directly + # Set video status to "processing" directly factory = sessionmaker(bind=sync_engine) session = factory() video = session.execute( select(SourceVideo).where(SourceVideo.id == video_id) ).scalar_one() - video.processing_status = ProcessingStatus.extracted + video.processing_status = ProcessingStatus.processing session.commit() session.close() @@ -606,32 +600,30 @@ def test_run_pipeline_resumes_from_extracted( patch("pipeline.stages.stage4_classification") as mock_s4, \ patch("pipeline.stages.stage5_synthesis") as mock_s5, \ patch("pipeline.stages.stage6_embed_and_index") as mock_s6, \ - patch("pipeline.stages.celery_chain") as mock_chain: + patch("pipeline.stages.celery_chain") as mock_chain, \ + patch("pipeline.stages.mark_pipeline_error") as mock_err: s = MagicMock() s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") mock_settings.return_value = s - # Mock chain to inspect what stages it gets mock_pipeline = MagicMock() mock_chain.return_value = mock_pipeline - # Mock the .s() method on each task mock_s2.s = MagicMock(return_value="s2_sig") mock_s3.s = MagicMock(return_value="s3_sig") mock_s4.s = MagicMock(return_value="s4_sig") mock_s5.s = MagicMock(return_value="s5_sig") mock_s6.s = MagicMock(return_value="s6_sig") + mock_err.s = MagicMock(return_value="err_sig") from pipeline.stages import run_pipeline run_pipeline(video_id) - # Verify: stages 2 and 3 should NOT have .s() called with video_id - mock_s2.s.assert_not_called() - mock_s3.s.assert_not_called() - - # Stages 4, 5, 6 should have .s() called - mock_s4.s.assert_called_once_with(video_id) + # All stages should be called — full re-run + mock_s2.s.assert_called_once_with(video_id) + mock_s3.s.assert_called_once() + mock_s4.s.assert_called_once() mock_s5.s.assert_called_once() mock_s6.s.assert_called_once() diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx index 8dfa7da..2d1cedc 100644 --- a/frontend/src/pages/AdminPipeline.tsx +++ b/frontend/src/pages/AdminPipeline.tsx @@ -38,20 +38,23 @@ function formatTokens(n: number): string { return String(n); } +const STATUS_LABELS: Record = { + not_started: "Not Started", + queued: "Queued", + processing: "In Progress", + error: "Errored", + complete: "Complete", +}; + function statusBadgeClass(status: string): string { switch (status) { - case "completed": - case "indexed": + case "complete": return "pipeline-badge--success"; case "processing": - case "extracted": - case "classified": - case "synthesized": return "pipeline-badge--active"; - case "failed": case "error": return "pipeline-badge--error"; - case "pending": + case "not_started": case "queued": return "pipeline-badge--pending"; default: @@ -419,9 +422,12 @@ function StatusFilter({ activeFilter: string | null; onFilterChange: (filter: string | null) => void; }) { - const statuses = Array.from(new Set(videos.map((v) => v.processing_status))).sort(); + // Fixed display order for pipeline lifecycle + const STATUS_ORDER = ["not_started", "queued", "processing", "error", "complete"]; + const present = new Set(videos.map((v) => v.processing_status)); + const ordered = STATUS_ORDER.filter((s) => present.has(s)); - if (statuses.length <= 1) return null; + if (ordered.length <= 1) return null; return (
@@ -430,18 +436,21 @@ function StatusFilter({ className={`filter-tab ${activeFilter === null ? "filter-tab--active" : ""}`} onClick={() => onFilterChange(null)} > - All + All ({videos.length}) - {statuses.map((status) => ( - - ))} + {ordered.map((status) => { + const count = videos.filter((v) => v.processing_status === status).length; + return ( + + ); + })}
); } @@ -602,7 +611,7 @@ export default function AdminPipeline() {
- {video.processing_status} + {STATUS_LABELS[video.processing_status] ?? video.processing_status} {video.event_count} events