diff --git a/backend/main.py b/backend/main.py index fe6d30a..6141af8 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import get_settings -from routers import admin, auth, chat, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos +from routers import admin, auth, chat, consent, creator_dashboard, creators, health, highlights, ingest, pipeline, reports, search, stats, techniques, topics, videos def _setup_logging() -> None: @@ -84,6 +84,7 @@ app.include_router(chat.router, prefix="/api/v1") app.include_router(consent.router, prefix="/api/v1") app.include_router(creator_dashboard.router, prefix="/api/v1") app.include_router(creators.router, prefix="/api/v1") +app.include_router(highlights.router, prefix="/api/v1") app.include_router(ingest.router, prefix="/api/v1") app.include_router(pipeline.router, prefix="/api/v1") app.include_router(reports.router, prefix="/api/v1") diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index b026259..0add68a 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -29,6 +29,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert from models import ( Creator, + HighlightCandidate, KeyMoment, KeyMomentContentType, PipelineEvent, @@ -2435,3 +2436,109 @@ def fetch_creator_avatar(creator_id: str) -> dict: return {"status": "error", "detail": str(exc)} finally: session.close() + + +# ── Highlight Detection ────────────────────────────────────────────────────── + +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str: + """Score all KeyMoments for a video and upsert HighlightCandidates. + + For each KeyMoment belonging to the video, runs the heuristic scorer and + bulk-upserts results into highlight_candidates (INSERT ON CONFLICT UPDATE). + + Returns the video_id for chain compatibility. + """ + from pipeline.highlight_scorer import score_moment + + start = time.monotonic() + logger.info("Highlight detection starting for video_id=%s", video_id) + _emit_event(video_id, "highlight_detection", "start", run_id=run_id) + + session = _get_sync_session() + try: + moments = ( + session.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + .scalars() + .all() + ) + + if not moments: + logger.info( + "Highlight detection: No key moments for video_id=%s, skipping.", video_id, + ) + _emit_event( + video_id, "highlight_detection", "complete", + run_id=run_id, payload={"candidates": 0}, + ) + return video_id + + candidate_count = 0 + for moment in moments: + try: + result = score_moment( + start_time=moment.start_time, + end_time=moment.end_time, + content_type=moment.content_type.value if moment.content_type else None, + summary=moment.summary, + plugins=moment.plugins, + raw_transcript=moment.raw_transcript, + source_quality=None, # filled below if technique_page loaded + video_content_type=None, # filled below if source_video loaded + ) + except Exception as score_exc: + logger.warning( + "Highlight detection: score_moment failed for moment %s: %s", + moment.id, score_exc, + ) + result = { + "score": 0.0, + "score_breakdown": {}, + "duration_secs": max(0.0, moment.end_time - moment.start_time), + } + + stmt = pg_insert(HighlightCandidate).values( + key_moment_id=moment.id, + source_video_id=moment.source_video_id, + score=result["score"], + score_breakdown=result["score_breakdown"], + duration_secs=result["duration_secs"], + ) + stmt = stmt.on_conflict_do_update( + constraint="uq_highlight_candidate_moment", + set_={ + "score": stmt.excluded.score, + "score_breakdown": stmt.excluded.score_breakdown, + "duration_secs": stmt.excluded.duration_secs, + "updated_at": func.now(), + }, + ) + session.execute(stmt) + candidate_count += 1 + + session.commit() + elapsed = time.monotonic() - start + _emit_event( + video_id, "highlight_detection", "complete", + run_id=run_id, payload={"candidates": candidate_count}, + ) + logger.info( + "Highlight detection completed for video_id=%s in %.1fs — %d candidates upserted", + video_id, elapsed, candidate_count, + ) + return video_id + + except Exception as exc: + session.rollback() + _emit_event( + video_id, "highlight_detection", "error", + run_id=run_id, payload={"error": str(exc)}, + ) + logger.error("Highlight detection failed for video_id=%s: %s", video_id, exc) + raise self.retry(exc=exc) + finally: + session.close() diff --git a/backend/routers/highlights.py b/backend/routers/highlights.py new file mode 100644 index 0000000..7d6e311 --- /dev/null +++ b/backend/routers/highlights.py @@ -0,0 +1,131 @@ +"""Highlight detection admin endpoints. + +Trigger scoring, list candidates, and view score breakdowns. +""" + +from __future__ import annotations + +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from database import get_session +from models import HighlightCandidate, KeyMoment, ProcessingStatus, SourceVideo +from pipeline.highlight_schemas import HighlightCandidateResponse + +logger = logging.getLogger("chrysopedia.highlights") + +router = APIRouter(prefix="/admin/highlights", tags=["highlights"]) + + +# ── Trigger endpoints ──────────────────────────────────────────────────────── + +@router.post("/detect/{video_id}") +async def detect_highlights( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Dispatch highlight detection for a single video.""" + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + from pipeline.stages import stage_highlight_detection + + try: + task = stage_highlight_detection.delay(str(video.id)) + logger.info("Highlight detection dispatched for video_id=%s task_id=%s", video_id, task.id) + except Exception as exc: + logger.warning("Failed to dispatch highlight detection for video_id=%s: %s", video_id, exc) + raise HTTPException( + status_code=503, + detail="Highlight detection dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "dispatched", + "video_id": str(video.id), + "task_id": task.id, + } + + +@router.post("/detect-all") +async def detect_all_highlights( + db: AsyncSession = Depends(get_session), +): + """Dispatch highlight detection for all completed videos.""" + stmt = ( + select(SourceVideo) + .where(SourceVideo.processing_status == ProcessingStatus.complete) + ) + result = await db.execute(stmt) + videos = result.scalars().all() + + from pipeline.stages import stage_highlight_detection + + dispatched = 0 + errors = 0 + for video in videos: + try: + stage_highlight_detection.delay(str(video.id)) + dispatched += 1 + except Exception as exc: + logger.warning( + "Failed to dispatch highlight detection for video_id=%s: %s", + video.id, exc, + ) + errors += 1 + + return { + "status": "dispatched", + "videos_dispatched": dispatched, + "errors": errors, + } + + +# ── Query endpoints ────────────────────────────────────────────────────────── + +@router.get("/candidates", response_model=list[HighlightCandidateResponse]) +async def list_candidates( + db: AsyncSession = Depends(get_session), + skip: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(ge=1, le=200)] = 50, +): + """List highlight candidates sorted by score descending, with pagination.""" + stmt = ( + select(HighlightCandidate) + .options(joinedload(HighlightCandidate.key_moment)) + .order_by(HighlightCandidate.score.desc()) + .offset(skip) + .limit(limit) + ) + result = await db.execute(stmt) + candidates = result.scalars().unique().all() + return candidates + + +@router.get("/candidates/{candidate_id}", response_model=HighlightCandidateResponse) +async def get_candidate( + candidate_id: str, + db: AsyncSession = Depends(get_session), +): + """Get a single highlight candidate by ID with full score breakdown.""" + stmt = ( + select(HighlightCandidate) + .options(joinedload(HighlightCandidate.key_moment)) + .where(HighlightCandidate.id == candidate_id) + ) + result = await db.execute(stmt) + candidate = result.scalar_one_or_none() + + if candidate is None: + raise HTTPException(status_code=404, detail=f"Candidate not found: {candidate_id}") + + return candidate