chrysopedia/backend/routers/videos.py
jlightner 87cb667848 test: Added GET /videos/{video_id} and GET /videos/{video_id}/transcrip…
- "backend/routers/videos.py"
- "backend/schemas.py"
- "backend/tests/test_video_detail.py"

GSD-Task: S01/T01
2026-04-03 23:42:43 +00:00

104 lines
3.6 KiB
Python

"""Source video endpoints for Chrysopedia API."""
import logging
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from database import get_session
from models import SourceVideo, TranscriptSegment
from schemas import (
SourceVideoDetail,
SourceVideoRead,
TranscriptForPlayerResponse,
TranscriptSegmentRead,
VideoListResponse,
)
logger = logging.getLogger("chrysopedia.videos")
router = APIRouter(prefix="/videos", tags=["videos"])
@router.get("", response_model=VideoListResponse)
async def list_videos(
offset: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 50,
creator_id: str | None = None,
db: AsyncSession = Depends(get_session),
) -> VideoListResponse:
"""List source videos with optional filtering by creator."""
base_stmt = select(SourceVideo).order_by(SourceVideo.created_at.desc())
if creator_id:
base_stmt = base_stmt.where(SourceVideo.creator_id == creator_id)
# Total count (before offset/limit)
count_stmt = select(func.count()).select_from(base_stmt.subquery())
count_result = await db.execute(count_stmt)
total = count_result.scalar() or 0
stmt = base_stmt.offset(offset).limit(limit)
result = await db.execute(stmt)
videos = result.scalars().all()
logger.debug("Listed %d videos (offset=%d, limit=%d)", len(videos), offset, limit)
return VideoListResponse(
items=[SourceVideoRead.model_validate(v) for v in videos],
total=total,
offset=offset,
limit=limit,
)
@router.get("/{video_id}", response_model=SourceVideoDetail)
async def get_video_detail(
video_id: uuid.UUID,
db: AsyncSession = Depends(get_session),
) -> SourceVideoDetail:
"""Get a single video with creator info for the player page."""
stmt = (
select(SourceVideo)
.where(SourceVideo.id == video_id)
.options(selectinload(SourceVideo.creator))
)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail="Video not found")
detail = SourceVideoDetail.model_validate(video)
detail.creator_name = video.creator.name if video.creator else ""
detail.creator_slug = video.creator.slug if video.creator else ""
logger.debug("Video detail %s (creator=%s)", video_id, detail.creator_name)
return detail
@router.get("/{video_id}/transcript", response_model=TranscriptForPlayerResponse)
async def get_video_transcript(
video_id: uuid.UUID,
db: AsyncSession = Depends(get_session),
) -> TranscriptForPlayerResponse:
"""Get all transcript segments for a video, ordered by segment_index."""
# Verify video exists
video_stmt = select(SourceVideo.id).where(SourceVideo.id == video_id)
video_result = await db.execute(video_stmt)
if video_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Video not found")
stmt = (
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
result = await db.execute(stmt)
segments = result.scalars().all()
logger.debug("Transcript for %s: %d segments", video_id, len(segments))
return TranscriptForPlayerResponse(
video_id=video_id,
segments=[TranscriptSegmentRead.model_validate(s) for s in segments],
total=len(segments),
)