From aa2ef4e1534d526b9fee2dffa2c6ec3ef85dc2ba Mon Sep 17 00:00:00 2001 From: jlightner Date: Sun, 29 Mar 2026 22:41:02 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Wired=20automatic=20run=5Fpipeline.dela?= =?UTF-8?q?y()=20dispatch=20after=20ingest=20commit=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/routers/pipeline.py" - "backend/routers/ingest.py" - "backend/main.py" GSD-Task: S03/T04 --- backend/main.py | 3 ++- backend/routers/ingest.py | 13 +++++++++ backend/routers/pipeline.py | 54 +++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 backend/routers/pipeline.py diff --git a/backend/main.py b/backend/main.py index 53abc33..bad1de2 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import get_settings -from routers import creators, health, ingest, videos +from routers import creators, health, ingest, pipeline, videos def _setup_logging() -> None: @@ -80,6 +80,7 @@ app.include_router(health.router) # Versioned API app.include_router(creators.router, prefix="/api/v1") app.include_router(ingest.router, prefix="/api/v1") +app.include_router(pipeline.router, prefix="/api/v1") app.include_router(videos.router, prefix="/api/v1") diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py index a08ce7a..77b1d1a 100644 --- a/backend/routers/ingest.py +++ b/backend/routers/ingest.py @@ -174,6 +174,19 @@ async def ingest_transcript( await db.refresh(video) await db.refresh(creator) + # ── 7. Dispatch LLM pipeline (best-effort) ────────────────────────── + try: + from pipeline.stages import run_pipeline + + run_pipeline.delay(str(video.id)) + logger.info("Pipeline dispatched for video_id=%s", video.id) + except Exception as exc: + logger.warning( + "Pipeline dispatch failed for video_id=%s (ingest still succeeds): %s", + video.id, + exc, + ) + logger.info( "Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s", creator.name, diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py new file mode 100644 index 0000000..a5b9367 --- /dev/null +++ b/backend/routers/pipeline.py @@ -0,0 +1,54 @@ +"""Pipeline management endpoints for manual re-trigger and status inspection.""" + +import logging + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from database import get_session +from models import SourceVideo + +logger = logging.getLogger("chrysopedia.pipeline") + +router = APIRouter(prefix="/pipeline", tags=["pipeline"]) + + +@router.post("/trigger/{video_id}") +async def trigger_pipeline( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Manually trigger (or re-trigger) the LLM extraction pipeline for a video. + + Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``, + and returns the current processing status. Returns 404 if the video + does not exist. + """ + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + # Import inside handler to avoid circular import at module level + from pipeline.stages import run_pipeline + + try: + run_pipeline.delay(str(video.id)) + logger.info("Pipeline manually triggered for video_id=%s", video_id) + except Exception as exc: + logger.warning( + "Failed to dispatch pipeline for video_id=%s: %s", video_id, exc + ) + raise HTTPException( + status_code=503, + detail="Pipeline dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "triggered", + "video_id": str(video.id), + "current_processing_status": video.processing_status.value, + }