- "backend/routers/videos.py" - "backend/schemas.py" - "backend/tests/test_video_detail.py" GSD-Task: S01/T01
104 lines
3.6 KiB
Python
104 lines
3.6 KiB
Python
"""Source video endpoints for Chrysopedia API."""
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from database import get_session
|
|
from models import SourceVideo, TranscriptSegment
|
|
from schemas import (
|
|
SourceVideoDetail,
|
|
SourceVideoRead,
|
|
TranscriptForPlayerResponse,
|
|
TranscriptSegmentRead,
|
|
VideoListResponse,
|
|
)
|
|
|
|
logger = logging.getLogger("chrysopedia.videos")
|
|
|
|
router = APIRouter(prefix="/videos", tags=["videos"])
|
|
|
|
|
|
@router.get("", response_model=VideoListResponse)
|
|
async def list_videos(
|
|
offset: Annotated[int, Query(ge=0)] = 0,
|
|
limit: Annotated[int, Query(ge=1, le=100)] = 50,
|
|
creator_id: str | None = None,
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> VideoListResponse:
|
|
"""List source videos with optional filtering by creator."""
|
|
base_stmt = select(SourceVideo).order_by(SourceVideo.created_at.desc())
|
|
|
|
if creator_id:
|
|
base_stmt = base_stmt.where(SourceVideo.creator_id == creator_id)
|
|
|
|
# Total count (before offset/limit)
|
|
count_stmt = select(func.count()).select_from(base_stmt.subquery())
|
|
count_result = await db.execute(count_stmt)
|
|
total = count_result.scalar() or 0
|
|
|
|
stmt = base_stmt.offset(offset).limit(limit)
|
|
result = await db.execute(stmt)
|
|
videos = result.scalars().all()
|
|
logger.debug("Listed %d videos (offset=%d, limit=%d)", len(videos), offset, limit)
|
|
return VideoListResponse(
|
|
items=[SourceVideoRead.model_validate(v) for v in videos],
|
|
total=total,
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
@router.get("/{video_id}", response_model=SourceVideoDetail)
|
|
async def get_video_detail(
|
|
video_id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> SourceVideoDetail:
|
|
"""Get a single video with creator info for the player page."""
|
|
stmt = (
|
|
select(SourceVideo)
|
|
.where(SourceVideo.id == video_id)
|
|
.options(selectinload(SourceVideo.creator))
|
|
)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail="Video not found")
|
|
|
|
detail = SourceVideoDetail.model_validate(video)
|
|
detail.creator_name = video.creator.name if video.creator else ""
|
|
detail.creator_slug = video.creator.slug if video.creator else ""
|
|
logger.debug("Video detail %s (creator=%s)", video_id, detail.creator_name)
|
|
return detail
|
|
|
|
|
|
@router.get("/{video_id}/transcript", response_model=TranscriptForPlayerResponse)
|
|
async def get_video_transcript(
|
|
video_id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> TranscriptForPlayerResponse:
|
|
"""Get all transcript segments for a video, ordered by segment_index."""
|
|
# Verify video exists
|
|
video_stmt = select(SourceVideo.id).where(SourceVideo.id == video_id)
|
|
video_result = await db.execute(video_stmt)
|
|
if video_result.scalar_one_or_none() is None:
|
|
raise HTTPException(status_code=404, detail="Video not found")
|
|
|
|
stmt = (
|
|
select(TranscriptSegment)
|
|
.where(TranscriptSegment.source_video_id == video_id)
|
|
.order_by(TranscriptSegment.segment_index)
|
|
)
|
|
result = await db.execute(stmt)
|
|
segments = result.scalars().all()
|
|
logger.debug("Transcript for %s: %d segments", video_id, len(segments))
|
|
return TranscriptForPlayerResponse(
|
|
video_id=video_id,
|
|
segments=[TranscriptSegmentRead.model_validate(s) for s in segments],
|
|
total=len(segments),
|
|
)
|