"""Pipeline management endpoints — public trigger + admin dashboard. Public: POST /pipeline/trigger/{video_id} Trigger pipeline for a video Admin: GET /admin/pipeline/videos Video list with status + event counts POST /admin/pipeline/trigger/{video_id} Retrigger (same as public but under admin prefix) POST /admin/pipeline/revoke/{video_id} Revoke/cancel active tasks for a video GET /admin/pipeline/events/{video_id} Event log for a video (paginated) GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect """ import logging import uuid from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select, case from sqlalchemy.ext.asyncio import AsyncSession from database import get_session from models import PipelineEvent, SourceVideo, Creator logger = logging.getLogger("chrysopedia.pipeline") router = APIRouter(tags=["pipeline"]) # ── Public trigger ─────────────────────────────────────────────────────────── @router.post("/pipeline/trigger/{video_id}") async def trigger_pipeline( video_id: str, db: AsyncSession = Depends(get_session), ): """Manually trigger (or re-trigger) the LLM extraction pipeline for a video.""" stmt = select(SourceVideo).where(SourceVideo.id == video_id) result = await db.execute(stmt) video = result.scalar_one_or_none() if video is None: raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") from pipeline.stages import run_pipeline try: run_pipeline.delay(str(video.id)) logger.info("Pipeline manually triggered for video_id=%s", video_id) except Exception as exc: logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc) raise HTTPException( status_code=503, detail="Pipeline dispatch failed — Celery/Redis may be unavailable", ) from exc return { "status": "triggered", "video_id": str(video.id), "current_processing_status": video.processing_status.value, } # ── Admin: Video list ──────────────────────────────────────────────────────── @router.get("/admin/pipeline/videos") async def list_pipeline_videos( db: AsyncSession = Depends(get_session), ): """List all videos with processing status and pipeline event counts.""" # Subquery for event counts per video event_counts = ( select( PipelineEvent.video_id, func.count().label("event_count"), func.sum(case( (PipelineEvent.event_type == "llm_call", PipelineEvent.total_tokens), else_=0 )).label("total_tokens_used"), func.max(PipelineEvent.created_at).label("last_event_at"), ) .group_by(PipelineEvent.video_id) .subquery() ) stmt = ( select( SourceVideo.id, SourceVideo.filename, SourceVideo.processing_status, SourceVideo.content_hash, SourceVideo.created_at, SourceVideo.updated_at, Creator.name.label("creator_name"), event_counts.c.event_count, event_counts.c.total_tokens_used, event_counts.c.last_event_at, ) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id) .order_by(SourceVideo.updated_at.desc()) ) result = await db.execute(stmt) rows = result.all() return { "items": [ { "id": str(r.id), "filename": r.filename, "processing_status": r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status), "content_hash": r.content_hash, "creator_name": r.creator_name, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, "event_count": r.event_count or 0, "total_tokens_used": r.total_tokens_used or 0, "last_event_at": r.last_event_at.isoformat() if r.last_event_at else None, } for r in rows ], "total": len(rows), } # ── Admin: Retrigger ───────────────────────────────────────────────────────── @router.post("/admin/pipeline/trigger/{video_id}") async def admin_trigger_pipeline( video_id: str, db: AsyncSession = Depends(get_session), ): """Admin retrigger — same as public trigger.""" return await trigger_pipeline(video_id, db) # ── Admin: Revoke ──────────────────────────────────────────────────────────── @router.post("/admin/pipeline/revoke/{video_id}") async def revoke_pipeline(video_id: str): """Revoke/cancel active Celery tasks for a video. Uses Celery's revoke with terminate=True to kill running tasks. This is best-effort — the task may have already completed. """ from worker import celery_app try: # Get active tasks and revoke any matching this video_id inspector = celery_app.control.inspect() active = inspector.active() or {} revoked_count = 0 for _worker, tasks in active.items(): for task in tasks: task_args = task.get("args", []) if task_args and str(task_args[0]) == video_id: celery_app.control.revoke(task["id"], terminate=True) revoked_count += 1 logger.info("Revoked task %s for video_id=%s", task["id"], video_id) return { "status": "revoked" if revoked_count > 0 else "no_active_tasks", "video_id": video_id, "tasks_revoked": revoked_count, } except Exception as exc: logger.warning("Failed to revoke tasks for video_id=%s: %s", video_id, exc) raise HTTPException( status_code=503, detail="Failed to communicate with Celery worker", ) from exc # ── Admin: Event log ───────────────────────────────────────────────────────── @router.get("/admin/pipeline/events/{video_id}") async def list_pipeline_events( video_id: str, offset: Annotated[int, Query(ge=0)] = 0, limit: Annotated[int, Query(ge=1, le=200)] = 100, stage: Annotated[str | None, Query(description="Filter by stage name")] = None, event_type: Annotated[str | None, Query(description="Filter by event type")] = None, db: AsyncSession = Depends(get_session), ): """Get pipeline events for a video, newest first.""" stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id) if stage: stmt = stmt.where(PipelineEvent.stage == stage) if event_type: stmt = stmt.where(PipelineEvent.event_type == event_type) # Count count_stmt = select(func.count()).select_from(stmt.subquery()) total = (await db.execute(count_stmt)).scalar() or 0 # Fetch stmt = stmt.order_by(PipelineEvent.created_at.desc()) stmt = stmt.offset(offset).limit(limit) result = await db.execute(stmt) events = result.scalars().all() return { "items": [ { "id": str(e.id), "video_id": str(e.video_id), "stage": e.stage, "event_type": e.event_type, "prompt_tokens": e.prompt_tokens, "completion_tokens": e.completion_tokens, "total_tokens": e.total_tokens, "model": e.model, "duration_ms": e.duration_ms, "payload": e.payload, "created_at": e.created_at.isoformat() if e.created_at else None, } for e in events ], "total": total, "offset": offset, "limit": limit, } # ── Admin: Worker status ───────────────────────────────────────────────────── @router.get("/admin/pipeline/worker-status") async def worker_status(): """Get current Celery worker status — active, reserved, and stats.""" from worker import celery_app try: inspector = celery_app.control.inspect() active = inspector.active() or {} reserved = inspector.reserved() or {} stats = inspector.stats() or {} workers = [] for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())): worker_active = active.get(worker_name, []) worker_reserved = reserved.get(worker_name, []) worker_stats = stats.get(worker_name, {}) workers.append({ "name": worker_name, "active_tasks": [ { "id": t.get("id"), "name": t.get("name"), "args": t.get("args", []), "time_start": t.get("time_start"), } for t in worker_active ], "reserved_tasks": len(worker_reserved), "total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0) + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0) + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0) + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0), "uptime": worker_stats.get("clock", None), "pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None, }) return { "online": len(workers) > 0, "workers": workers, } except Exception as exc: logger.warning("Failed to inspect Celery workers: %s", exc) return { "online": False, "workers": [], "error": str(exc), }