feat: meaningful pipeline status lifecycle — Not Started → Queued → In Progress → Complete/Errored

Replace stage-level statuses (pending/transcribed/extracted/published) with
user-meaningful lifecycle states (not_started/queued/processing/error/complete).

Backend:
- ProcessingStatus enum: not_started, queued, processing, error, complete
- run_pipeline sets 'processing' before dispatching Celery chain
- stage5 sets 'complete' (was 'published')
- stage3 no longer sets intermediate status (stays 'processing')
- New mark_pipeline_error task wired as link_error on chain
- _set_error_status helper marks video on permanent failure
- Ingest sets 'queued' (was 'transcribed')
- Migration 008 renames all existing values

Frontend:
- StatusFilter shows fixed-order lifecycle tabs: Not Started | Queued | In Progress | Errored | Complete
- Per-video badges show friendly labels instead of raw enum values
- Badge colors mapped to new statuses
This commit is contained in:
jlightner 2026-03-31 02:43:49 +00:00
parent 52e7e3bbc2
commit 720c2f501f
7 changed files with 154 additions and 68 deletions

View file

@ -0,0 +1,35 @@
"""Rename processing_status values to user-meaningful lifecycle states.
Old: pending, transcribed, extracted, published
New: not_started, queued, processing, error, complete
Revision ID: 008_rename_processing_status
Revises: 007_drop_review_columns
"""
from alembic import op
revision = "008_rename_processing_status"
down_revision = "007_drop_review_columns"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add new enum values first
op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'not_started'")
op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'queued'")
op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'processing'")
op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'error'")
op.execute("ALTER TYPE processingstatus ADD VALUE IF NOT EXISTS 'complete'")
# Migrate existing rows
op.execute("UPDATE source_videos SET processing_status = 'not_started' WHERE processing_status = 'pending'")
op.execute("UPDATE source_videos SET processing_status = 'queued' WHERE processing_status = 'transcribed'")
op.execute("UPDATE source_videos SET processing_status = 'processing' WHERE processing_status = 'extracted'")
op.execute("UPDATE source_videos SET processing_status = 'complete' WHERE processing_status = 'published'")
def downgrade() -> None:
op.execute("UPDATE source_videos SET processing_status = 'pending' WHERE processing_status = 'not_started'")
op.execute("UPDATE source_videos SET processing_status = 'transcribed' WHERE processing_status = 'queued'")
op.execute("UPDATE source_videos SET processing_status = 'extracted' WHERE processing_status = 'processing'")
op.execute("UPDATE source_videos SET processing_status = 'published' WHERE processing_status = 'complete'")

View file

@ -39,11 +39,16 @@ class ContentType(str, enum.Enum):
class ProcessingStatus(str, enum.Enum):
"""Pipeline processing status for a source video."""
pending = "pending"
transcribed = "transcribed"
extracted = "extracted"
published = "published"
"""Pipeline processing status for a source video.
User-facing lifecycle: not_started queued processing complete
Error branch: processing error (retrigger resets to queued)
"""
not_started = "not_started"
queued = "queued"
processing = "processing"
error = "error"
complete = "complete"
class KeyMomentContentType(str, enum.Enum):
@ -129,8 +134,8 @@ class SourceVideo(Base):
content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True)
processing_status: Mapped[ProcessingStatus] = mapped_column(
Enum(ProcessingStatus, name="processing_status", create_constraint=True),
default=ProcessingStatus.pending,
server_default="pending",
default=ProcessingStatus.not_started,
server_default="not_started",
)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()

View file

@ -49,6 +49,26 @@ from worker import celery_app
logger = logging.getLogger(__name__)
# ── Error status helper ──────────────────────────────────────────────────────
def _set_error_status(video_id: str, stage_name: str, error: Exception) -> None:
"""Mark a video as errored when a pipeline stage fails permanently."""
try:
session = _get_sync_session()
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video:
video.processing_status = ProcessingStatus.error
session.commit()
session.close()
except Exception as mark_exc:
logger.error(
"Failed to mark video_id=%s as error after %s failure: %s",
video_id, stage_name, mark_exc,
)
# ── Pipeline event persistence ───────────────────────────────────────────────
def _emit_event(
@ -429,12 +449,6 @@ def stage3_extraction(self, video_id: str) -> str:
session.add(km)
total_moments += 1
# Update processing_status to extracted
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.extracted
session.commit()
elapsed = time.monotonic() - start
_emit_event(video_id, "stage3_extraction", "complete")
@ -867,7 +881,7 @@ def stage5_synthesis(self, video_id: str) -> str:
m.technique_page_id = page.id
# Update processing_status
video.processing_status = ProcessingStatus.published
video.processing_status = ProcessingStatus.complete
session.commit()
elapsed = time.monotonic() - start
@ -1074,15 +1088,22 @@ def _load_prior_pages(video_id: str) -> list[str]:
# ── Orchestrator ─────────────────────────────────────────────────────────────
@celery_app.task
def mark_pipeline_error(request, exc, traceback, video_id: str) -> None:
"""Error callback — marks video as errored when a pipeline stage fails."""
logger.error("Pipeline failed for video_id=%s: %s", video_id, exc)
_set_error_status(video_id, "pipeline", exc)
@celery_app.task
def run_pipeline(video_id: str) -> str:
"""Orchestrate the full pipeline (stages 2-5) with resumability.
Checks the current processing_status of the video and chains only the
stages that still need to run. For example:
- pending/transcribed stages 2, 3, 4, 5
- extracted stages 4, 5
- published no-op
- queued stages 2, 3, 4, 5
- processing/error re-run full pipeline
- complete no-op
Returns the video_id.
"""
@ -1110,7 +1131,7 @@ def run_pipeline(video_id: str) -> str:
# Build the chain based on current status
stages = []
if status in (ProcessingStatus.pending, ProcessingStatus.transcribed):
if status in (ProcessingStatus.not_started, ProcessingStatus.queued):
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(), # receives video_id from previous
@ -1118,13 +1139,25 @@ def run_pipeline(video_id: str) -> str:
stage5_synthesis.s(),
stage6_embed_and_index.s(),
]
elif status == ProcessingStatus.extracted:
elif status == ProcessingStatus.processing:
# Resuming a previously-started pipeline — re-run from stage 2
stages = [
stage4_classification.s(video_id),
stage2_segmentation.s(video_id),
stage3_extraction.s(),
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
]
elif status == ProcessingStatus.published:
elif status == ProcessingStatus.error:
# Retrigger after error — re-run full pipeline
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(),
stage4_classification.s(),
stage5_synthesis.s(),
stage6_embed_and_index.s(),
]
elif status == ProcessingStatus.complete:
logger.info(
"run_pipeline: video_id=%s already at status=%s, nothing to do.",
video_id, status.value,
@ -1132,8 +1165,20 @@ def run_pipeline(video_id: str) -> str:
return video_id
if stages:
# Mark as processing before dispatching
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.processing
session.commit()
finally:
session.close()
pipeline = celery_chain(*stages)
pipeline.apply_async()
error_cb = mark_pipeline_error.s(video_id)
pipeline.apply_async(link_error=error_cb)
logger.info(
"run_pipeline: dispatched %d stages for video_id=%s",
len(stages), video_id,

View file

@ -180,7 +180,7 @@ async def ingest_transcript(
)
video.duration_seconds = duration_seconds
video.content_hash = content_hash
video.processing_status = ProcessingStatus.transcribed
video.processing_status = ProcessingStatus.queued
elif is_duplicate_content:
# Same content, different filename — update the existing record
video = matched_video
@ -193,7 +193,7 @@ async def ingest_transcript(
video.file_path = f"{creator_folder}/{source_file}"
video.duration_seconds = duration_seconds
video.content_hash = content_hash
video.processing_status = ProcessingStatus.transcribed
video.processing_status = ProcessingStatus.queued
is_reupload = True # Treat as reupload for response
else:
video = SourceVideo(
@ -203,7 +203,7 @@ async def ingest_transcript(
duration_seconds=duration_seconds,
content_type=ContentType.tutorial,
content_hash=content_hash,
processing_status=ProcessingStatus.transcribed,
processing_status=ProcessingStatus.queued,
)
db.add(video)
await db.flush() # assign id

View file

@ -64,7 +64,7 @@ class SourceVideoRead(SourceVideoBase):
id: uuid.UUID
creator_id: uuid.UUID
content_hash: str | None = None
processing_status: str = "pending"
processing_status: str = "not_started"
created_at: datetime
updated_at: datetime

View file

@ -191,7 +191,7 @@ def test_stage2_segmentation_updates_topic_labels(
def test_stage3_extraction_creates_key_moments(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Stages 2+3 should create KeyMoment rows and set processing_status=extracted."""
"""Stages 2+3 should create KeyMoment rows."""
video_id = pre_ingested_video["video_id"]
prompts_dir = tmp_path / "prompts"
@ -250,12 +250,6 @@ def test_stage3_extraction_creates_key_moments(
assert len(moments) >= 2
assert moments[0].title == "Setting Levels for Gain Staging"
assert moments[0].content_type == KeyMomentContentType.technique
# Verify processing_status
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
assert video.processing_status == ProcessingStatus.extracted
finally:
session.close()
@ -459,7 +453,7 @@ def test_stage5_synthesis_creates_technique_pages(
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
assert video.processing_status == ProcessingStatus.published
assert video.processing_status == ProcessingStatus.complete
finally:
session.close()
@ -580,19 +574,19 @@ def test_stage6_embeds_and_upserts_to_qdrant(
# ── (f) Resumability ────────────────────────────────────────────────────────
def test_run_pipeline_resumes_from_extracted(
def test_run_pipeline_resumes_from_processing(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""When status=extracted, run_pipeline should skip stages 2+3 and run 4+5+6."""
"""When status=processing, run_pipeline should re-run the full pipeline."""
video_id = pre_ingested_video["video_id"]
# Set video status to "extracted" directly
# Set video status to "processing" directly
factory = sessionmaker(bind=sync_engine)
session = factory()
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.extracted
video.processing_status = ProcessingStatus.processing
session.commit()
session.close()
@ -606,32 +600,30 @@ def test_run_pipeline_resumes_from_extracted(
patch("pipeline.stages.stage4_classification") as mock_s4, \
patch("pipeline.stages.stage5_synthesis") as mock_s5, \
patch("pipeline.stages.stage6_embed_and_index") as mock_s6, \
patch("pipeline.stages.celery_chain") as mock_chain:
patch("pipeline.stages.celery_chain") as mock_chain, \
patch("pipeline.stages.mark_pipeline_error") as mock_err:
s = MagicMock()
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
mock_settings.return_value = s
# Mock chain to inspect what stages it gets
mock_pipeline = MagicMock()
mock_chain.return_value = mock_pipeline
# Mock the .s() method on each task
mock_s2.s = MagicMock(return_value="s2_sig")
mock_s3.s = MagicMock(return_value="s3_sig")
mock_s4.s = MagicMock(return_value="s4_sig")
mock_s5.s = MagicMock(return_value="s5_sig")
mock_s6.s = MagicMock(return_value="s6_sig")
mock_err.s = MagicMock(return_value="err_sig")
from pipeline.stages import run_pipeline
run_pipeline(video_id)
# Verify: stages 2 and 3 should NOT have .s() called with video_id
mock_s2.s.assert_not_called()
mock_s3.s.assert_not_called()
# Stages 4, 5, 6 should have .s() called
mock_s4.s.assert_called_once_with(video_id)
# All stages should be called — full re-run
mock_s2.s.assert_called_once_with(video_id)
mock_s3.s.assert_called_once()
mock_s4.s.assert_called_once()
mock_s5.s.assert_called_once()
mock_s6.s.assert_called_once()

View file

@ -38,20 +38,23 @@ function formatTokens(n: number): string {
return String(n);
}
const STATUS_LABELS: Record<string, string> = {
not_started: "Not Started",
queued: "Queued",
processing: "In Progress",
error: "Errored",
complete: "Complete",
};
function statusBadgeClass(status: string): string {
switch (status) {
case "completed":
case "indexed":
case "complete":
return "pipeline-badge--success";
case "processing":
case "extracted":
case "classified":
case "synthesized":
return "pipeline-badge--active";
case "failed":
case "error":
return "pipeline-badge--error";
case "pending":
case "not_started":
case "queued":
return "pipeline-badge--pending";
default:
@ -419,9 +422,12 @@ function StatusFilter({
activeFilter: string | null;
onFilterChange: (filter: string | null) => void;
}) {
const statuses = Array.from(new Set(videos.map((v) => v.processing_status))).sort();
// Fixed display order for pipeline lifecycle
const STATUS_ORDER = ["not_started", "queued", "processing", "error", "complete"];
const present = new Set(videos.map((v) => v.processing_status));
const ordered = STATUS_ORDER.filter((s) => present.has(s));
if (statuses.length <= 1) return null;
if (ordered.length <= 1) return null;
return (
<div className="filter-tabs">
@ -430,18 +436,21 @@ function StatusFilter({
className={`filter-tab ${activeFilter === null ? "filter-tab--active" : ""}`}
onClick={() => onFilterChange(null)}
>
All
All ({videos.length})
</button>
{statuses.map((status) => (
<button
key={status}
type="button"
className={`filter-tab ${activeFilter === status ? "filter-tab--active" : ""}`}
onClick={() => onFilterChange(status)}
>
{status} ({videos.filter((v) => v.processing_status === status).length})
</button>
))}
{ordered.map((status) => {
const count = videos.filter((v) => v.processing_status === status).length;
return (
<button
key={status}
type="button"
className={`filter-tab ${activeFilter === status ? "filter-tab--active" : ""}`}
onClick={() => onFilterChange(status)}
>
{STATUS_LABELS[status] ?? status} ({count})
</button>
);
})}
</div>
);
}
@ -602,7 +611,7 @@ export default function AdminPipeline() {
<div className="pipeline-video__meta">
<span className={`pipeline-badge ${statusBadgeClass(video.processing_status)}`}>
{video.processing_status}
{STATUS_LABELS[video.processing_status] ?? video.processing_status}
</span>
<span className="pipeline-video__stat" title="Events">
{video.event_count} events