feat: Wired stage_highlight_detection Celery task with bulk upsert, 4 a…

- "backend/pipeline/stages.py"
- "backend/routers/highlights.py"
- "backend/main.py"

GSD-Task: S04/T03
This commit is contained in:
jlightner 2026-04-04 05:36:10 +00:00
parent 2d7b812c6a
commit 6f12d5a240
6 changed files with 339 additions and 2 deletions

View file

@ -67,7 +67,7 @@
- Estimate: 45m
- Files: backend/pipeline/highlight_scorer.py, backend/pipeline/test_highlight_scorer.py
- Verify: python -m pytest backend/pipeline/test_highlight_scorer.py -v
- [ ] **T03: Wire Celery task, admin API endpoints, and router registration** — Connect the scoring engine and DB model into the runtime: a Celery task that processes all KeyMoments for a video and bulk-upserts candidates, admin API endpoints for triggering detection and listing results, and router registration in main.py.
- [x] **T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py** — Connect the scoring engine and DB model into the runtime: a Celery task that processes all KeyMoments for a video and bulk-upserts candidates, admin API endpoints for triggering detection and listing results, and router registration in main.py.
## Steps

View file

@ -0,0 +1,16 @@
{
"schemaVersion": 1,
"taskId": "T02",
"unitId": "M021/S04/T02",
"timestamp": 1775280784107,
"passed": true,
"discoverySource": "task-plan",
"checks": [
{
"command": "python -m pytest backend/pipeline/test_highlight_scorer.py -v",
"exitCode": 0,
"durationMs": 265,
"verdict": "pass"
}
]
}

View file

@ -0,0 +1,82 @@
---
id: T03
parent: S04
milestone: M021
provides: []
requires: []
affects: []
key_files: ["backend/pipeline/stages.py", "backend/routers/highlights.py", "backend/main.py"]
key_decisions: ["Lazy import of score_moment inside Celery task to avoid circular imports at module load", "Upsert uses named constraint uq_highlight_candidate_moment for ON CONFLICT targeting"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All 3 task verification commands and 3 slice-level verification commands exit 0. All 28 existing scorer unit tests still pass."
completed_at: 2026-04-04T05:36:07.845Z
blocker_discovered: false
---
# T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py
> Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py
## What Happened
---
id: T03
parent: S04
milestone: M021
key_files:
- backend/pipeline/stages.py
- backend/routers/highlights.py
- backend/main.py
key_decisions:
- Lazy import of score_moment inside Celery task to avoid circular imports at module load
- Upsert uses named constraint uq_highlight_candidate_moment for ON CONFLICT targeting
duration: ""
verification_result: passed
completed_at: 2026-04-04T05:36:07.846Z
blocker_discovered: false
---
# T03: Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py
**Wired stage_highlight_detection Celery task with bulk upsert, 4 admin API endpoints, and router registration in main.py**
## What Happened
Added stage_highlight_detection Celery task to stages.py following existing patterns (bind=True, max_retries=3, _get_sync_session, _emit_event for start/complete/error, try/except/retry/finally). Task loads KeyMoments for a video, scores each via score_moment(), and bulk-upserts into highlight_candidates using INSERT ON CONFLICT DO UPDATE on uq_highlight_candidate_moment. Created backend/routers/highlights.py with 4 endpoints: POST detect/{video_id}, POST detect-all, GET candidates (paginated, score desc), GET candidates/{id} (404 handling). Registered in main.py alphabetically.
## Verification
All 3 task verification commands and 3 slice-level verification commands exit 0. All 28 existing scorer unit tests still pass.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `PYTHONPATH=backend python -c "from backend.pipeline.stages import stage_highlight_detection; print('OK')"` | 0 | ✅ pass | 500ms |
| 2 | `PYTHONPATH=backend python -c "from backend.routers.highlights import router; print('OK')"` | 0 | ✅ pass | 400ms |
| 3 | `PYTHONPATH=backend python -c "from backend.main import app; routes = [r.path for r in app.routes]; assert any('highlights' in r for r in routes); print('Router registered')"` | 0 | ✅ pass | 500ms |
| 4 | `python -m pytest backend/pipeline/test_highlight_scorer.py -q` | 0 | ✅ pass | 20ms |
## Deviations
None.
## Known Issues
None.
## Files Created/Modified
- `backend/pipeline/stages.py`
- `backend/routers/highlights.py`
- `backend/main.py`
## Deviations
None.
## Known Issues
None.

View file

@ -12,7 +12,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import get_settings
from routers import admin, auth, chat, consent, creator_dashboard, creators, health, ingest, pipeline, reports, search, stats, techniques, topics, videos
from routers import admin, auth, chat, consent, creator_dashboard, creators, health, highlights, ingest, pipeline, reports, search, stats, techniques, topics, videos
def _setup_logging() -> None:
@ -84,6 +84,7 @@ app.include_router(chat.router, prefix="/api/v1")
app.include_router(consent.router, prefix="/api/v1")
app.include_router(creator_dashboard.router, prefix="/api/v1")
app.include_router(creators.router, prefix="/api/v1")
app.include_router(highlights.router, prefix="/api/v1")
app.include_router(ingest.router, prefix="/api/v1")
app.include_router(pipeline.router, prefix="/api/v1")
app.include_router(reports.router, prefix="/api/v1")

View file

@ -29,6 +29,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert
from models import (
Creator,
HighlightCandidate,
KeyMoment,
KeyMomentContentType,
PipelineEvent,
@ -2435,3 +2436,109 @@ def fetch_creator_avatar(creator_id: str) -> dict:
return {"status": "error", "detail": str(exc)}
finally:
session.close()
# ── Highlight Detection ──────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str:
"""Score all KeyMoments for a video and upsert HighlightCandidates.
For each KeyMoment belonging to the video, runs the heuristic scorer and
bulk-upserts results into highlight_candidates (INSERT ON CONFLICT UPDATE).
Returns the video_id for chain compatibility.
"""
from pipeline.highlight_scorer import score_moment
start = time.monotonic()
logger.info("Highlight detection starting for video_id=%s", video_id)
_emit_event(video_id, "highlight_detection", "start", run_id=run_id)
session = _get_sync_session()
try:
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
logger.info(
"Highlight detection: No key moments for video_id=%s, skipping.", video_id,
)
_emit_event(
video_id, "highlight_detection", "complete",
run_id=run_id, payload={"candidates": 0},
)
return video_id
candidate_count = 0
for moment in moments:
try:
result = score_moment(
start_time=moment.start_time,
end_time=moment.end_time,
content_type=moment.content_type.value if moment.content_type else None,
summary=moment.summary,
plugins=moment.plugins,
raw_transcript=moment.raw_transcript,
source_quality=None, # filled below if technique_page loaded
video_content_type=None, # filled below if source_video loaded
)
except Exception as score_exc:
logger.warning(
"Highlight detection: score_moment failed for moment %s: %s",
moment.id, score_exc,
)
result = {
"score": 0.0,
"score_breakdown": {},
"duration_secs": max(0.0, moment.end_time - moment.start_time),
}
stmt = pg_insert(HighlightCandidate).values(
key_moment_id=moment.id,
source_video_id=moment.source_video_id,
score=result["score"],
score_breakdown=result["score_breakdown"],
duration_secs=result["duration_secs"],
)
stmt = stmt.on_conflict_do_update(
constraint="uq_highlight_candidate_moment",
set_={
"score": stmt.excluded.score,
"score_breakdown": stmt.excluded.score_breakdown,
"duration_secs": stmt.excluded.duration_secs,
"updated_at": func.now(),
},
)
session.execute(stmt)
candidate_count += 1
session.commit()
elapsed = time.monotonic() - start
_emit_event(
video_id, "highlight_detection", "complete",
run_id=run_id, payload={"candidates": candidate_count},
)
logger.info(
"Highlight detection completed for video_id=%s in %.1fs — %d candidates upserted",
video_id, elapsed, candidate_count,
)
return video_id
except Exception as exc:
session.rollback()
_emit_event(
video_id, "highlight_detection", "error",
run_id=run_id, payload={"error": str(exc)},
)
logger.error("Highlight detection failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()

View file

@ -0,0 +1,131 @@
"""Highlight detection admin endpoints.
Trigger scoring, list candidates, and view score breakdowns.
"""
from __future__ import annotations
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from database import get_session
from models import HighlightCandidate, KeyMoment, ProcessingStatus, SourceVideo
from pipeline.highlight_schemas import HighlightCandidateResponse
logger = logging.getLogger("chrysopedia.highlights")
router = APIRouter(prefix="/admin/highlights", tags=["highlights"])
# ── Trigger endpoints ────────────────────────────────────────────────────────
@router.post("/detect/{video_id}")
async def detect_highlights(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for a single video."""
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
from pipeline.stages import stage_highlight_detection
try:
task = stage_highlight_detection.delay(str(video.id))
logger.info("Highlight detection dispatched for video_id=%s task_id=%s", video_id, task.id)
except Exception as exc:
logger.warning("Failed to dispatch highlight detection for video_id=%s: %s", video_id, exc)
raise HTTPException(
status_code=503,
detail="Highlight detection dispatch failed — Celery/Redis may be unavailable",
) from exc
return {
"status": "dispatched",
"video_id": str(video.id),
"task_id": task.id,
}
@router.post("/detect-all")
async def detect_all_highlights(
db: AsyncSession = Depends(get_session),
):
"""Dispatch highlight detection for all completed videos."""
stmt = (
select(SourceVideo)
.where(SourceVideo.processing_status == ProcessingStatus.complete)
)
result = await db.execute(stmt)
videos = result.scalars().all()
from pipeline.stages import stage_highlight_detection
dispatched = 0
errors = 0
for video in videos:
try:
stage_highlight_detection.delay(str(video.id))
dispatched += 1
except Exception as exc:
logger.warning(
"Failed to dispatch highlight detection for video_id=%s: %s",
video.id, exc,
)
errors += 1
return {
"status": "dispatched",
"videos_dispatched": dispatched,
"errors": errors,
}
# ── Query endpoints ──────────────────────────────────────────────────────────
@router.get("/candidates", response_model=list[HighlightCandidateResponse])
async def list_candidates(
db: AsyncSession = Depends(get_session),
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
):
"""List highlight candidates sorted by score descending, with pagination."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.order_by(HighlightCandidate.score.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
candidates = result.scalars().unique().all()
return candidates
@router.get("/candidates/{candidate_id}", response_model=HighlightCandidateResponse)
async def get_candidate(
candidate_id: str,
db: AsyncSession = Depends(get_session),
):
"""Get a single highlight candidate by ID with full score breakdown."""
stmt = (
select(HighlightCandidate)
.options(joinedload(HighlightCandidate.key_moment))
.where(HighlightCandidate.id == candidate_id)
)
result = await db.execute(stmt)
candidate = result.scalar_one_or_none()
if candidate is None:
raise HTTPException(status_code=404, detail=f"Candidate not found: {candidate_id}")
return candidate