chrysopedia/backend/routers/highlights.py
jlightner 6f12d5a240 feat: Wired stage_highlight_detection Celery task with bulk upsert, 4 a…
- "backend/pipeline/stages.py"
- "backend/routers/highlights.py"
- "backend/main.py"

GSD-Task: S04/T03
2026-04-04 05:36:10 +00:00

131 lines
4.3 KiB
Python

"""Highlight detection admin endpoints.
Trigger scoring, list candidates, and view score breakdowns.
"""
from __future__ import annotations
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from database import get_session
from models import HighlightCandidate, KeyMoment, ProcessingStatus, SourceVideo
from pipeline.highlight_schemas import HighlightCandidateResponse
logger = logging.getLogger("chrysopedia.highlights")
router = APIRouter(prefix="/admin/highlights", tags=["highlights"])
# ── Trigger endpoints ────────────────────────────────────────────────────────
@router.post("/detect/{video_id}")
async def detect_highlights(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for a single video."""
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
from pipeline.stages import stage_highlight_detection
try:
task = stage_highlight_detection.delay(str(video.id))
logger.info("Highlight detection dispatched for video_id=%s task_id=%s", video_id, task.id)
except Exception as exc:
logger.warning("Failed to dispatch highlight detection for video_id=%s: %s", video_id, exc)
raise HTTPException(
status_code=503,
detail="Highlight detection dispatch failed — Celery/Redis may be unavailable",
) from exc
return {
"status": "dispatched",
"video_id": str(video.id),
"task_id": task.id,
}
@router.post("/detect-all")
async def detect_all_highlights(
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for all completed videos."""
stmt = (
select(SourceVideo)
.where(SourceVideo.processing_status == ProcessingStatus.complete)
)
result = await db.execute(stmt)
videos = result.scalars().all()
from pipeline.stages import stage_highlight_detection
dispatched = 0
errors = 0
for video in videos:
try:
stage_highlight_detection.delay(str(video.id))
dispatched += 1
except Exception as exc:
logger.warning(
"Failed to dispatch highlight detection for video_id=%s: %s",
video.id, exc,
)
errors += 1
return {
"status": "dispatched",
"videos_dispatched": dispatched,
"errors": errors,
}
# ── Query endpoints ──────────────────────────────────────────────────────────
@router.get("/candidates", response_model=list[HighlightCandidateResponse])
async def list_candidates(
db: AsyncSession = Depends(get_session),
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
):
"""List highlight candidates sorted by score descending, with pagination."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.order_by(HighlightCandidate.score.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
candidates = result.scalars().unique().all()
return candidates
@router.get("/candidates/{candidate_id}", response_model=HighlightCandidateResponse)
async def get_candidate(
candidate_id: str,
db: AsyncSession = Depends(get_session),
):
"""Get a single highlight candidate by ID with full score breakdown."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.where(HighlightCandidate.id == candidate_id)
)
result = await db.execute(stmt)
candidate = result.scalar_one_or_none()
if candidate is None:
raise HTTPException(status_code=404, detail=f"Candidate not found: {candidate_id}")
return candidate