feat: Wired stage_highlight_detection Celery task with bulk upsert, 4 a…

- "backend/pipeline/stages.py"
- "backend/routers/highlights.py"
- "backend/main.py"

GSD-Task: S04/T03
This commit is contained in:
jlightner 2026-04-04 05:36:10 +00:00
parent 91cdc5e0b1
commit a2372788d5
3 changed files with 240 additions and 1 deletions

View file

@ -12,7 +12,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import get_settings
from routers import admin, auth, chat, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos
from routers import admin, auth, chat, consent, creator_dashboard, creators, health, highlights, ingest, pipeline, reports, search, stats, techniques, topics, videos
def _setup_logging() -> None:
@ -84,6 +84,7 @@ app.include_router(chat.router, prefix="/api/v1")
app.include_router(consent.router, prefix="/api/v1")
app.include_router(creator_dashboard.router, prefix="/api/v1")
app.include_router(creators.router, prefix="/api/v1")
app.include_router(highlights.router, prefix="/api/v1")
app.include_router(ingest.router, prefix="/api/v1")
app.include_router(pipeline.router, prefix="/api/v1")
app.include_router(reports.router, prefix="/api/v1")

View file

@ -29,6 +29,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert
from models import (
Creator,
HighlightCandidate,
KeyMoment,
KeyMomentContentType,
PipelineEvent,
@ -2435,3 +2436,109 @@ def fetch_creator_avatar(creator_id: str) -> dict:
return {"status": "error", "detail": str(exc)}
finally:
session.close()
# ── Highlight Detection ──────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str:
"""Score all KeyMoments for a video and upsert HighlightCandidates.
For each KeyMoment belonging to the video, runs the heuristic scorer and
bulk-upserts results into highlight_candidates (INSERT ON CONFLICT UPDATE).
Returns the video_id for chain compatibility.
"""
from pipeline.highlight_scorer import score_moment
start = time.monotonic()
logger.info("Highlight detection starting for video_id=%s", video_id)
_emit_event(video_id, "highlight_detection", "start", run_id=run_id)
session = _get_sync_session()
try:
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
logger.info(
"Highlight detection: No key moments for video_id=%s, skipping.", video_id,
)
_emit_event(
video_id, "highlight_detection", "complete",
run_id=run_id, payload={"candidates": 0},
)
return video_id
candidate_count = 0
for moment in moments:
try:
result = score_moment(
start_time=moment.start_time,
end_time=moment.end_time,
content_type=moment.content_type.value if moment.content_type else None,
summary=moment.summary,
plugins=moment.plugins,
raw_transcript=moment.raw_transcript,
source_quality=None, # filled below if technique_page loaded
video_content_type=None, # filled below if source_video loaded
)
except Exception as score_exc:
logger.warning(
"Highlight detection: score_moment failed for moment %s: %s",
moment.id, score_exc,
)
result = {
"score": 0.0,
"score_breakdown": {},
"duration_secs": max(0.0, moment.end_time - moment.start_time),
}
stmt = pg_insert(HighlightCandidate).values(
key_moment_id=moment.id,
source_video_id=moment.source_video_id,
score=result["score"],
score_breakdown=result["score_breakdown"],
duration_secs=result["duration_secs"],
)
stmt = stmt.on_conflict_do_update(
constraint="uq_highlight_candidate_moment",
set_={
"score": stmt.excluded.score,
"score_breakdown": stmt.excluded.score_breakdown,
"duration_secs": stmt.excluded.duration_secs,
"updated_at": func.now(),
},
)
session.execute(stmt)
candidate_count += 1
session.commit()
elapsed = time.monotonic() - start
_emit_event(
video_id, "highlight_detection", "complete",
run_id=run_id, payload={"candidates": candidate_count},
)
logger.info(
"Highlight detection completed for video_id=%s in %.1fs — %d candidates upserted",
video_id, elapsed, candidate_count,
)
return video_id
except Exception as exc:
session.rollback()
_emit_event(
video_id, "highlight_detection", "error",
run_id=run_id, payload={"error": str(exc)},
)
logger.error("Highlight detection failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()

View file

@ -0,0 +1,131 @@
"""Highlight detection admin endpoints.
Trigger scoring, list candidates, and view score breakdowns.
"""
from __future__ import annotations
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from database import get_session
from models import HighlightCandidate, KeyMoment, ProcessingStatus, SourceVideo
from pipeline.highlight_schemas import HighlightCandidateResponse
logger = logging.getLogger("chrysopedia.highlights")
router = APIRouter(prefix="/admin/highlights", tags=["highlights"])
# ── Trigger endpoints ────────────────────────────────────────────────────────
@router.post("/detect/{video_id}")
async def detect_highlights(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for a single video."""
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
from pipeline.stages import stage_highlight_detection
try:
task = stage_highlight_detection.delay(str(video.id))
logger.info("Highlight detection dispatched for video_id=%s task_id=%s", video_id, task.id)
except Exception as exc:
logger.warning("Failed to dispatch highlight detection for video_id=%s: %s", video_id, exc)
raise HTTPException(
status_code=503,
detail="Highlight detection dispatch failed — Celery/Redis may be unavailable",
) from exc
return {
"status": "dispatched",
"video_id": str(video.id),
"task_id": task.id,
}
@router.post("/detect-all")
async def detect_all_highlights(
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for all completed videos."""
stmt = (
select(SourceVideo)
.where(SourceVideo.processing_status == ProcessingStatus.complete)
)
result = await db.execute(stmt)
videos = result.scalars().all()
from pipeline.stages import stage_highlight_detection
dispatched = 0
errors = 0
for video in videos:
try:
stage_highlight_detection.delay(str(video.id))
dispatched += 1
except Exception as exc:
logger.warning(
"Failed to dispatch highlight detection for video_id=%s: %s",
video.id, exc,
)
errors += 1
return {
"status": "dispatched",
"videos_dispatched": dispatched,
"errors": errors,
}
# ── Query endpoints ──────────────────────────────────────────────────────────
@router.get("/candidates", response_model=list[HighlightCandidateResponse])
async def list_candidates(
db: AsyncSession = Depends(get_session),
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
):
"""List highlight candidates sorted by score descending, with pagination."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.order_by(HighlightCandidate.score.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
candidates = result.scalars().unique().all()
return candidates
@router.get("/candidates/{candidate_id}", response_model=HighlightCandidateResponse)
async def get_candidate(
candidate_id: str,
db: AsyncSession = Depends(get_session),
):
"""Get a single highlight candidate by ID with full score breakdown."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.where(HighlightCandidate.id == candidate_id)
)
result = await db.execute(stmt)
candidate = result.scalar_one_or_none()
if candidate is None:
raise HTTPException(status_code=404, detail=f"Candidate not found: {candidate_id}")
return candidate