- Add pipeline_events table (migration 004) for structured stage logging - Add PipelineEvent model with token usage tracking - Admin pipeline dashboard with video list, event log, worker status, trigger/revoke controls, and collapsible JSON payload viewer - Version switcher on technique pages — view historical snapshots with pipeline metadata (model names, prompt hashes) - Frontend types for pipeline admin and version APIs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
277 lines
11 KiB
Python
277 lines
11 KiB
Python
"""Pipeline management endpoints — public trigger + admin dashboard.
|
|
|
|
Public:
|
|
POST /pipeline/trigger/{video_id} Trigger pipeline for a video
|
|
|
|
Admin:
|
|
GET /admin/pipeline/videos Video list with status + event counts
|
|
POST /admin/pipeline/trigger/{video_id} Retrigger (same as public but under admin prefix)
|
|
POST /admin/pipeline/revoke/{video_id} Revoke/cancel active tasks for a video
|
|
GET /admin/pipeline/events/{video_id} Event log for a video (paginated)
|
|
GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect
|
|
"""
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, select, case
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from database import get_session
|
|
from models import PipelineEvent, SourceVideo, Creator
|
|
|
|
logger = logging.getLogger("chrysopedia.pipeline")
|
|
|
|
router = APIRouter(tags=["pipeline"])
|
|
|
|
|
|
# ── Public trigger ───────────────────────────────────────────────────────────
|
|
|
|
@router.post("/pipeline/trigger/{video_id}")
|
|
async def trigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video."""
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
from pipeline.stages import run_pipeline
|
|
|
|
try:
|
|
run_pipeline.delay(str(video.id))
|
|
logger.info("Pipeline manually triggered for video_id=%s", video_id)
|
|
except Exception as exc:
|
|
logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
|
|
) from exc
|
|
|
|
return {
|
|
"status": "triggered",
|
|
"video_id": str(video.id),
|
|
"current_processing_status": video.processing_status.value,
|
|
}
|
|
|
|
|
|
# ── Admin: Video list ────────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/videos")
|
|
async def list_pipeline_videos(
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""List all videos with processing status and pipeline event counts."""
|
|
# Subquery for event counts per video
|
|
event_counts = (
|
|
select(
|
|
PipelineEvent.video_id,
|
|
func.count().label("event_count"),
|
|
func.sum(case(
|
|
(PipelineEvent.event_type == "llm_call", PipelineEvent.total_tokens),
|
|
else_=0
|
|
)).label("total_tokens_used"),
|
|
func.max(PipelineEvent.created_at).label("last_event_at"),
|
|
)
|
|
.group_by(PipelineEvent.video_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
SourceVideo.id,
|
|
SourceVideo.filename,
|
|
SourceVideo.processing_status,
|
|
SourceVideo.content_hash,
|
|
SourceVideo.created_at,
|
|
SourceVideo.updated_at,
|
|
Creator.name.label("creator_name"),
|
|
event_counts.c.event_count,
|
|
event_counts.c.total_tokens_used,
|
|
event_counts.c.last_event_at,
|
|
)
|
|
.join(Creator, SourceVideo.creator_id == Creator.id)
|
|
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
|
|
.order_by(SourceVideo.updated_at.desc())
|
|
)
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(r.id),
|
|
"filename": r.filename,
|
|
"processing_status": r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status),
|
|
"content_hash": r.content_hash,
|
|
"creator_name": r.creator_name,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None,
|
|
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
|
"event_count": r.event_count or 0,
|
|
"total_tokens_used": r.total_tokens_used or 0,
|
|
"last_event_at": r.last_event_at.isoformat() if r.last_event_at else None,
|
|
}
|
|
for r in rows
|
|
],
|
|
"total": len(rows),
|
|
}
|
|
|
|
|
|
# ── Admin: Retrigger ─────────────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/trigger/{video_id}")
|
|
async def admin_trigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Admin retrigger — same as public trigger."""
|
|
return await trigger_pipeline(video_id, db)
|
|
|
|
|
|
# ── Admin: Revoke ────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/revoke/{video_id}")
|
|
async def revoke_pipeline(video_id: str):
|
|
"""Revoke/cancel active Celery tasks for a video.
|
|
|
|
Uses Celery's revoke with terminate=True to kill running tasks.
|
|
This is best-effort — the task may have already completed.
|
|
"""
|
|
from worker import celery_app
|
|
|
|
try:
|
|
# Get active tasks and revoke any matching this video_id
|
|
inspector = celery_app.control.inspect()
|
|
active = inspector.active() or {}
|
|
revoked_count = 0
|
|
|
|
for _worker, tasks in active.items():
|
|
for task in tasks:
|
|
task_args = task.get("args", [])
|
|
if task_args and str(task_args[0]) == video_id:
|
|
celery_app.control.revoke(task["id"], terminate=True)
|
|
revoked_count += 1
|
|
logger.info("Revoked task %s for video_id=%s", task["id"], video_id)
|
|
|
|
return {
|
|
"status": "revoked" if revoked_count > 0 else "no_active_tasks",
|
|
"video_id": video_id,
|
|
"tasks_revoked": revoked_count,
|
|
}
|
|
except Exception as exc:
|
|
logger.warning("Failed to revoke tasks for video_id=%s: %s", video_id, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Failed to communicate with Celery worker",
|
|
) from exc
|
|
|
|
|
|
# ── Admin: Event log ─────────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/events/{video_id}")
|
|
async def list_pipeline_events(
|
|
video_id: str,
|
|
offset: Annotated[int, Query(ge=0)] = 0,
|
|
limit: Annotated[int, Query(ge=1, le=200)] = 100,
|
|
stage: Annotated[str | None, Query(description="Filter by stage name")] = None,
|
|
event_type: Annotated[str | None, Query(description="Filter by event type")] = None,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Get pipeline events for a video, newest first."""
|
|
stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id)
|
|
|
|
if stage:
|
|
stmt = stmt.where(PipelineEvent.stage == stage)
|
|
if event_type:
|
|
stmt = stmt.where(PipelineEvent.event_type == event_type)
|
|
|
|
# Count
|
|
count_stmt = select(func.count()).select_from(stmt.subquery())
|
|
total = (await db.execute(count_stmt)).scalar() or 0
|
|
|
|
# Fetch
|
|
stmt = stmt.order_by(PipelineEvent.created_at.desc())
|
|
stmt = stmt.offset(offset).limit(limit)
|
|
result = await db.execute(stmt)
|
|
events = result.scalars().all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(e.id),
|
|
"video_id": str(e.video_id),
|
|
"stage": e.stage,
|
|
"event_type": e.event_type,
|
|
"prompt_tokens": e.prompt_tokens,
|
|
"completion_tokens": e.completion_tokens,
|
|
"total_tokens": e.total_tokens,
|
|
"model": e.model,
|
|
"duration_ms": e.duration_ms,
|
|
"payload": e.payload,
|
|
"created_at": e.created_at.isoformat() if e.created_at else None,
|
|
}
|
|
for e in events
|
|
],
|
|
"total": total,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
|
|
# ── Admin: Worker status ─────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/worker-status")
|
|
async def worker_status():
|
|
"""Get current Celery worker status — active, reserved, and stats."""
|
|
from worker import celery_app
|
|
|
|
try:
|
|
inspector = celery_app.control.inspect()
|
|
active = inspector.active() or {}
|
|
reserved = inspector.reserved() or {}
|
|
stats = inspector.stats() or {}
|
|
|
|
workers = []
|
|
for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())):
|
|
worker_active = active.get(worker_name, [])
|
|
worker_reserved = reserved.get(worker_name, [])
|
|
worker_stats = stats.get(worker_name, {})
|
|
|
|
workers.append({
|
|
"name": worker_name,
|
|
"active_tasks": [
|
|
{
|
|
"id": t.get("id"),
|
|
"name": t.get("name"),
|
|
"args": t.get("args", []),
|
|
"time_start": t.get("time_start"),
|
|
}
|
|
for t in worker_active
|
|
],
|
|
"reserved_tasks": len(worker_reserved),
|
|
"total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0),
|
|
"uptime": worker_stats.get("clock", None),
|
|
"pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None,
|
|
})
|
|
|
|
return {
|
|
"online": len(workers) > 0,
|
|
"workers": workers,
|
|
}
|
|
except Exception as exc:
|
|
logger.warning("Failed to inspect Celery workers: %s", exc)
|
|
return {
|
|
"online": False,
|
|
"workers": [],
|
|
"error": str(exc),
|
|
}
|