feat: Wired automatic run_pipeline.delay() dispatch after ingest commit…
- "backend/routers/pipeline.py" - "backend/routers/ingest.py" - "backend/main.py" GSD-Task: S03/T04
This commit is contained in:
parent
f59718f8c7
commit
aa2ef4e153
3 changed files with 69 additions and 1 deletions
|
|
@ -12,7 +12,7 @@ from fastapi import FastAPI
|
|||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from config import get_settings
|
||||
from routers import creators, health, ingest, videos
|
||||
from routers import creators, health, ingest, pipeline, videos
|
||||
|
||||
|
||||
def _setup_logging() -> None:
|
||||
|
|
@ -80,6 +80,7 @@ app.include_router(health.router)
|
|||
# Versioned API
|
||||
app.include_router(creators.router, prefix="/api/v1")
|
||||
app.include_router(ingest.router, prefix="/api/v1")
|
||||
app.include_router(pipeline.router, prefix="/api/v1")
|
||||
app.include_router(videos.router, prefix="/api/v1")
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -174,6 +174,19 @@ async def ingest_transcript(
|
|||
await db.refresh(video)
|
||||
await db.refresh(creator)
|
||||
|
||||
# ── 7. Dispatch LLM pipeline (best-effort) ──────────────────────────
|
||||
try:
|
||||
from pipeline.stages import run_pipeline
|
||||
|
||||
run_pipeline.delay(str(video.id))
|
||||
logger.info("Pipeline dispatched for video_id=%s", video.id)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Pipeline dispatch failed for video_id=%s (ingest still succeeds): %s",
|
||||
video.id,
|
||||
exc,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
|
||||
creator.name,
|
||||
|
|
|
|||
54
backend/routers/pipeline.py
Normal file
54
backend/routers/pipeline.py
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
"""Pipeline management endpoints for manual re-trigger and status inspection."""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from database import get_session
|
||||
from models import SourceVideo
|
||||
|
||||
logger = logging.getLogger("chrysopedia.pipeline")
|
||||
|
||||
router = APIRouter(prefix="/pipeline", tags=["pipeline"])
|
||||
|
||||
|
||||
@router.post("/trigger/{video_id}")
|
||||
async def trigger_pipeline(
|
||||
video_id: str,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video.
|
||||
|
||||
Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``,
|
||||
and returns the current processing status. Returns 404 if the video
|
||||
does not exist.
|
||||
"""
|
||||
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
||||
result = await db.execute(stmt)
|
||||
video = result.scalar_one_or_none()
|
||||
|
||||
if video is None:
|
||||
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
||||
|
||||
# Import inside handler to avoid circular import at module level
|
||||
from pipeline.stages import run_pipeline
|
||||
|
||||
try:
|
||||
run_pipeline.delay(str(video.id))
|
||||
logger.info("Pipeline manually triggered for video_id=%s", video_id)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Failed to dispatch pipeline for video_id=%s: %s", video_id, exc
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
|
||||
) from exc
|
||||
|
||||
return {
|
||||
"status": "triggered",
|
||||
"video_id": str(video.id),
|
||||
"current_processing_status": video.processing_status.value,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue