- "backend/routers/pipeline.py" - "backend/routers/ingest.py" - "backend/main.py" GSD-Task: S03/T04
54 lines
1.7 KiB
Python
54 lines
1.7 KiB
Python
"""Pipeline management endpoints for manual re-trigger and status inspection."""
|
|
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from database import get_session
|
|
from models import SourceVideo
|
|
|
|
logger = logging.getLogger("chrysopedia.pipeline")
|
|
|
|
router = APIRouter(prefix="/pipeline", tags=["pipeline"])
|
|
|
|
|
|
@router.post("/trigger/{video_id}")
|
|
async def trigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video.
|
|
|
|
Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``,
|
|
and returns the current processing status. Returns 404 if the video
|
|
does not exist.
|
|
"""
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
# Import inside handler to avoid circular import at module level
|
|
from pipeline.stages import run_pipeline
|
|
|
|
try:
|
|
run_pipeline.delay(str(video.id))
|
|
logger.info("Pipeline manually triggered for video_id=%s", video_id)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to dispatch pipeline for video_id=%s: %s", video_id, exc
|
|
)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
|
|
) from exc
|
|
|
|
return {
|
|
"status": "triggered",
|
|
"video_id": str(video.id),
|
|
"current_processing_status": video.processing_status.value,
|
|
}
|