From 6f12d5a2408ceddcb2c4dfe5e2a24af5e473840d Mon Sep 17 00:00:00 2001 From: jlightner Date: Sat, 4 Apr 2026 05:36:10 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Wired=20stage=5Fhighlight=5Fdetection?= =?UTF-8?q?=20Celery=20task=20with=20bulk=20upsert,=204=20a=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/pipeline/stages.py" - "backend/routers/highlights.py" - "backend/main.py" GSD-Task: S04/T03 --- .gsd/milestones/M021/slices/S04/S04-PLAN.md | 2 +- .../M021/slices/S04/tasks/T02-VERIFY.json | 16 +++ .../M021/slices/S04/tasks/T03-SUMMARY.md | 82 +++++++++++ backend/main.py | 3 +- backend/pipeline/stages.py | 107 ++++++++++++++ backend/routers/highlights.py | 131 ++++++++++++++++++ 6 files changed, 339 insertions(+), 2 deletions(-) create mode 100644 .gsd/milestones/M021/slices/S04/tasks/T02-VERIFY.json create mode 100644 .gsd/milestones/M021/slices/S04/tasks/T03-SUMMARY.md create mode 100644 backend/routers/highlights.py diff --git a/.gsd/milestones/M021/slices/S04/S04-PLAN.md b/.gsd/milestones/M021/slices/S04/S04-PLAN.md index c780cd6..c68c358 100644 --- a/.gsd/milestones/M021/slices/S04/S04-PLAN.md +++ b/.gsd/milestones/M021/slices/S04/S04-PLAN.md @@ -67,7 +67,7 @@ - Estimate: 45m - Files: backend/pipeline/highlight_scorer.py, backend/pipeline/test_highlight_scorer.py - Verify: python -m pytest backend/pipeline/test_highlight_scorer.py -v -- [ ] **T03: Wire Celery task, admin API endpoints, and router registration** — Connect the scoring engine and DB model into the runtime: a Celery task that processes all KeyMoments for a video and bulk-upserts candidates, admin API endpoints for triggering detection and listing results, and router registration in main.py. +- [x] **T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py** — Connect the scoring engine and DB model into the runtime: a Celery task that processes all KeyMoments for a video and bulk-upserts candidates, admin API endpoints for triggering detection and listing results, and router registration in main.py. ## Steps diff --git a/.gsd/milestones/M021/slices/S04/tasks/T02-VERIFY.json b/.gsd/milestones/M021/slices/S04/tasks/T02-VERIFY.json new file mode 100644 index 0000000..86b027c --- /dev/null +++ b/.gsd/milestones/M021/slices/S04/tasks/T02-VERIFY.json @@ -0,0 +1,16 @@ +{ + "schemaVersion": 1, + "taskId": "T02", + "unitId": "M021/S04/T02", + "timestamp": 1775280784107, + "passed": true, + "discoverySource": "task-plan", + "checks": [ + { + "command": "python -m pytest backend/pipeline/test_highlight_scorer.py -v", + "exitCode": 0, + "durationMs": 265, + "verdict": "pass" + } + ] +} diff --git a/.gsd/milestones/M021/slices/S04/tasks/T03-SUMMARY.md b/.gsd/milestones/M021/slices/S04/tasks/T03-SUMMARY.md new file mode 100644 index 0000000..a058cf5 --- /dev/null +++ b/.gsd/milestones/M021/slices/S04/tasks/T03-SUMMARY.md @@ -0,0 +1,82 @@ +--- +id: T03 +parent: S04 +milestone: M021 +provides: [] +requires: [] +affects: [] +key_files: ["backend/pipeline/stages.py", "backend/routers/highlights.py", "backend/main.py"] +key_decisions: ["Lazy import of score_moment inside Celery task to avoid circular imports at module load", "Upsert uses named constraint uq_highlight_candidate_moment for ON CONFLICT targeting"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "All 3 task verification commands and 3 slice-level verification commands exit 0. All 28 existing scorer unit tests still pass." +completed_at: 2026-04-04T05:36:07.845Z +blocker_discovered: false +--- + +# T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py + +> Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py + +## What Happened +--- +id: T03 +parent: S04 +milestone: M021 +key_files: + - backend/pipeline/stages.py + - backend/routers/highlights.py + - backend/main.py +key_decisions: + - Lazy import of score_moment inside Celery task to avoid circular imports at module load + - Upsert uses named constraint uq_highlight_candidate_moment for ON CONFLICT targeting +duration: "" +verification_result: passed +completed_at: 2026-04-04T05:36:07.846Z +blocker_discovered: false +--- + +# T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py + +**Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py** + +## What Happened + +Added stage_highlight_detection Celery task to stages.py following existing patterns (bind=True, max_retries=3, _get_sync_session, _emit_event for start/complete/error, try/except/retry/finally). Task loads KeyMoments for a video, scores each via score_moment(), and bulk-upserts into highlight_candidates using INSERT ON CONFLICT DO UPDATE on uq_highlight_candidate_moment. Created backend/routers/highlights.py with 4 endpoints: POST detect/{video_id}, POST detect-all, GET candidates (paginated, score desc), GET candidates/{id} (404 handling). Registered in main.py alphabetically. + +## Verification + +All 3 task verification commands and 3 slice-level verification commands exit 0. All 28 existing scorer unit tests still pass. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `PYTHONPATH=backend python -c "from backend.pipeline.stages import stage_highlight_detection; print('OK')"` | 0 | ✅ pass | 500ms | +| 2 | `PYTHONPATH=backend python -c "from backend.routers.highlights import router; print('OK')"` | 0 | ✅ pass | 400ms | +| 3 | `PYTHONPATH=backend python -c "from backend.main import app; routes = [r.path for r in app.routes]; assert any('highlights' in r for r in routes); print('Router registered')"` | 0 | ✅ pass | 500ms | +| 4 | `python -m pytest backend/pipeline/test_highlight_scorer.py -q` | 0 | ✅ pass | 20ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `backend/pipeline/stages.py` +- `backend/routers/highlights.py` +- `backend/main.py` + + +## Deviations +None. + +## Known Issues +None. diff --git a/backend/main.py b/backend/main.py index fe6d30a..6141af8 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import get_settings -from routers import admin, auth, chat, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos +from routers import admin, auth, chat, consent, creator_dashboard, creators, health, highlights, ingest, pipeline, reports, search, stats, techniques, topics, videos def _setup_logging() -> None: @@ -84,6 +84,7 @@ app.include_router(chat.router, prefix="/api/v1") app.include_router(consent.router, prefix="/api/v1") app.include_router(creator_dashboard.router, prefix="/api/v1") app.include_router(creators.router, prefix="/api/v1") +app.include_router(highlights.router, prefix="/api/v1") app.include_router(ingest.router, prefix="/api/v1") app.include_router(pipeline.router, prefix="/api/v1") app.include_router(reports.router, prefix="/api/v1") diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index b026259..0add68a 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -29,6 +29,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert from models import ( Creator, + HighlightCandidate, KeyMoment, KeyMomentContentType, PipelineEvent, @@ -2435,3 +2436,109 @@ def fetch_creator_avatar(creator_id: str) -> dict: return {"status": "error", "detail": str(exc)} finally: session.close() + + +# ── Highlight Detection ────────────────────────────────────────────────────── + +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str: + """Score all KeyMoments for a video and upsert HighlightCandidates. + + For each KeyMoment belonging to the video, runs the heuristic scorer and + bulk-upserts results into highlight_candidates (INSERT ON CONFLICT UPDATE). + + Returns the video_id for chain compatibility. + """ + from pipeline.highlight_scorer import score_moment + + start = time.monotonic() + logger.info("Highlight detection starting for video_id=%s", video_id) + _emit_event(video_id, "highlight_detection", "start", run_id=run_id) + + session = _get_sync_session() + try: + moments = ( + session.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + .scalars() + .all() + ) + + if not moments: + logger.info( + "Highlight detection: No key moments for video_id=%s, skipping.", video_id, + ) + _emit_event( + video_id, "highlight_detection", "complete", + run_id=run_id, payload={"candidates": 0}, + ) + return video_id + + candidate_count = 0 + for moment in moments: + try: + result = score_moment( + start_time=moment.start_time, + end_time=moment.end_time, + content_type=moment.content_type.value if moment.content_type else None, + summary=moment.summary, + plugins=moment.plugins, + raw_transcript=moment.raw_transcript, + source_quality=None, # filled below if technique_page loaded + video_content_type=None, # filled below if source_video loaded + ) + except Exception as score_exc: + logger.warning( + "Highlight detection: score_moment failed for moment %s: %s", + moment.id, score_exc, + ) + result = { + "score": 0.0, + "score_breakdown": {}, + "duration_secs": max(0.0, moment.end_time - moment.start_time), + } + + stmt = pg_insert(HighlightCandidate).values( + key_moment_id=moment.id, + source_video_id=moment.source_video_id, + score=result["score"], + score_breakdown=result["score_breakdown"], + duration_secs=result["duration_secs"], + ) + stmt = stmt.on_conflict_do_update( + constraint="uq_highlight_candidate_moment", + set_={ + "score": stmt.excluded.score, + "score_breakdown": stmt.excluded.score_breakdown, + "duration_secs": stmt.excluded.duration_secs, + "updated_at": func.now(), + }, + ) + session.execute(stmt) + candidate_count += 1 + + session.commit() + elapsed = time.monotonic() - start + _emit_event( + video_id, "highlight_detection", "complete", + run_id=run_id, payload={"candidates": candidate_count}, + ) + logger.info( + "Highlight detection completed for video_id=%s in %.1fs — %d candidates upserted", + video_id, elapsed, candidate_count, + ) + return video_id + + except Exception as exc: + session.rollback() + _emit_event( + video_id, "highlight_detection", "error", + run_id=run_id, payload={"error": str(exc)}, + ) + logger.error("Highlight detection failed for video_id=%s: %s", video_id, exc) + raise self.retry(exc=exc) + finally: + session.close() diff --git a/backend/routers/highlights.py b/backend/routers/highlights.py new file mode 100644 index 0000000..7d6e311 --- /dev/null +++ b/backend/routers/highlights.py @@ -0,0 +1,131 @@ +"""Highlight detection admin endpoints. + +Trigger scoring, list candidates, and view score breakdowns. +""" + +from __future__ import annotations + +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from database import get_session +from models import HighlightCandidate, KeyMoment, ProcessingStatus, SourceVideo +from pipeline.highlight_schemas import HighlightCandidateResponse + +logger = logging.getLogger("chrysopedia.highlights") + +router = APIRouter(prefix="/admin/highlights", tags=["highlights"]) + + +# ── Trigger endpoints ──────────────────────────────────────────────────────── + +@router.post("/detect/{video_id}") +async def detect_highlights( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Dispatch highlight detection for a single video.""" + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + from pipeline.stages import stage_highlight_detection + + try: + task = stage_highlight_detection.delay(str(video.id)) + logger.info("Highlight detection dispatched for video_id=%s task_id=%s", video_id, task.id) + except Exception as exc: + logger.warning("Failed to dispatch highlight detection for video_id=%s: %s", video_id, exc) + raise HTTPException( + status_code=503, + detail="Highlight detection dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "dispatched", + "video_id": str(video.id), + "task_id": task.id, + } + + +@router.post("/detect-all") +async def detect_all_highlights( + db: AsyncSession = Depends(get_session), +): + """Dispatch highlight detection for all completed videos.""" + stmt = ( + select(SourceVideo) + .where(SourceVideo.processing_status == ProcessingStatus.complete) + ) + result = await db.execute(stmt) + videos = result.scalars().all() + + from pipeline.stages import stage_highlight_detection + + dispatched = 0 + errors = 0 + for video in videos: + try: + stage_highlight_detection.delay(str(video.id)) + dispatched += 1 + except Exception as exc: + logger.warning( + "Failed to dispatch highlight detection for video_id=%s: %s", + video.id, exc, + ) + errors += 1 + + return { + "status": "dispatched", + "videos_dispatched": dispatched, + "errors": errors, + } + + +# ── Query endpoints ────────────────────────────────────────────────────────── + +@router.get("/candidates", response_model=list[HighlightCandidateResponse]) +async def list_candidates( + db: AsyncSession = Depends(get_session), + skip: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(ge=1, le=200)] = 50, +): + """List highlight candidates sorted by score descending, with pagination.""" + stmt = ( + select(HighlightCandidate) + .options(joinedload(HighlightCandidate.key_moment)) + .order_by(HighlightCandidate.score.desc()) + .offset(skip) + .limit(limit) + ) + result = await db.execute(stmt) + candidates = result.scalars().unique().all() + return candidates + + +@router.get("/candidates/{candidate_id}", response_model=HighlightCandidateResponse) +async def get_candidate( + candidate_id: str, + db: AsyncSession = Depends(get_session), +): + """Get a single highlight candidate by ID with full score breakdown.""" + stmt = ( + select(HighlightCandidate) + .options(joinedload(HighlightCandidate.key_moment)) + .where(HighlightCandidate.id == candidate_id) + ) + result = await db.execute(stmt) + candidate = result.scalar_one_or_none() + + if candidate is None: + raise HTTPException(status_code=404, detail=f"Candidate not found: {candidate_id}") + + return candidate