test: Added GET /videos/{video_id} and GET /videos/{video_id}/transcrip…

- "backend/routers/videos.py"
- "backend/schemas.py"
- "backend/tests/test_video_detail.py"

GSD-Task: S01/T01
This commit is contained in:
jlightner 2026-04-03 23:42:43 +00:00
parent 51e01f8b7c
commit fe493d0647
3 changed files with 208 additions and 3 deletions

View file

@ -1,15 +1,23 @@
"""Source video endpoints for Chrysopedia API."""
import logging
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from database import get_session
from models import SourceVideo
from schemas import SourceVideoRead, VideoListResponse
from models import SourceVideo, TranscriptSegment
from schemas import (
SourceVideoDetail,
SourceVideoRead,
TranscriptForPlayerResponse,
TranscriptSegmentRead,
VideoListResponse,
)
logger = logging.getLogger("chrysopedia.videos")
@ -44,3 +52,53 @@ async def list_videos(
offset=offset,
limit=limit,
)
@router.get("/{video_id}", response_model=SourceVideoDetail)
async def get_video_detail(
video_id: uuid.UUID,
db: AsyncSession = Depends(get_session),
) -> SourceVideoDetail:
"""Get a single video with creator info for the player page."""
stmt = (
select(SourceVideo)
.where(SourceVideo.id == video_id)
.options(selectinload(SourceVideo.creator))
)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail="Video not found")
detail = SourceVideoDetail.model_validate(video)
detail.creator_name = video.creator.name if video.creator else ""
detail.creator_slug = video.creator.slug if video.creator else ""
logger.debug("Video detail %s (creator=%s)", video_id, detail.creator_name)
return detail
@router.get("/{video_id}/transcript", response_model=TranscriptForPlayerResponse)
async def get_video_transcript(
video_id: uuid.UUID,
db: AsyncSession = Depends(get_session),
) -> TranscriptForPlayerResponse:
"""Get all transcript segments for a video, ordered by segment_index."""
# Verify video exists
video_stmt = select(SourceVideo.id).where(SourceVideo.id == video_id)
video_result = await db.execute(video_stmt)
if video_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Video not found")
stmt = (
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
result = await db.execute(stmt)
segments = result.scalars().all()
logger.debug("Transcript for %s: %d segments", video_id, len(segments))
return TranscriptForPlayerResponse(
video_id=video_id,
segments=[TranscriptSegmentRead.model_validate(s) for s in segments],
total=len(segments),
)

View file

@ -386,6 +386,20 @@ class TopicListResponse(BaseModel):
total: int = 0
class SourceVideoDetail(SourceVideoRead):
"""Single video detail with creator info for player page."""
creator_name: str = ""
creator_slug: str = ""
video_url: str | None = None
class TranscriptForPlayerResponse(BaseModel):
"""Transcript segments for the video player sidebar."""
video_id: uuid.UUID
segments: list[TranscriptSegmentRead] = Field(default_factory=list)
total: int = 0
class VideoListResponse(BaseModel):
"""Paginated list of source videos."""
items: list[SourceVideoRead] = Field(default_factory=list)

View file

@ -0,0 +1,133 @@
"""Tests for video detail and transcript endpoints.
Covers:
- GET /api/v1/videos/{video_id} single video with creator info
- GET /api/v1/videos/{video_id}/transcript ordered segments
"""
import uuid
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from models import ContentType, Creator, ProcessingStatus, SourceVideo, TranscriptSegment
@pytest_asyncio.fixture()
async def video_with_creator(db_engine):
"""Create a Creator + SourceVideo pair. Returns dict with IDs."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
creator = Creator(
name="TestProducer",
slug="testproducer",
folder_name="TestProducer",
genres=["electronic"],
)
session.add(creator)
await session.flush()
video = SourceVideo(
creator_id=creator.id,
filename="synth-tutorial.mp4",
file_path="TestProducer/synth-tutorial.mp4",
duration_seconds=720,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.complete,
)
session.add(video)
await session.flush()
result = {
"creator_id": creator.id,
"creator_name": creator.name,
"creator_slug": creator.slug,
"video_id": video.id,
}
await session.commit()
return result
@pytest_asyncio.fixture()
async def video_with_segments(db_engine, video_with_creator):
"""Add transcript segments to the video. Returns video_with_creator + segment count."""
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with factory() as session:
segments = [
TranscriptSegment(
source_video_id=video_with_creator["video_id"],
start_time=float(i * 10),
end_time=float(i * 10 + 9),
text=f"Segment {i} text content.",
segment_index=i,
)
for i in range(5)
]
session.add_all(segments)
await session.commit()
return {**video_with_creator, "segment_count": 5}
# ── Video Detail ─────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_video_detail_success(client, video_with_creator):
vid = video_with_creator["video_id"]
resp = await client.get(f"/api/v1/videos/{vid}")
assert resp.status_code == 200
data = resp.json()
assert data["id"] == str(vid)
assert data["filename"] == "synth-tutorial.mp4"
assert data["creator_name"] == "TestProducer"
assert data["creator_slug"] == "testproducer"
# video_url is always None for now
assert data["video_url"] is None
@pytest.mark.asyncio
async def test_get_video_detail_404(client):
fake_id = uuid.uuid4()
resp = await client.get(f"/api/v1/videos/{fake_id}")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"].lower()
# ── Transcript ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_transcript_success(client, video_with_segments):
vid = video_with_segments["video_id"]
resp = await client.get(f"/api/v1/videos/{vid}/transcript")
assert resp.status_code == 200
data = resp.json()
assert data["video_id"] == str(vid)
assert data["total"] == 5
assert len(data["segments"]) == 5
# Verify ordering by segment_index
indices = [s["segment_index"] for s in data["segments"]]
assert indices == [0, 1, 2, 3, 4]
@pytest.mark.asyncio
async def test_get_transcript_404(client):
fake_id = uuid.uuid4()
resp = await client.get(f"/api/v1/videos/{fake_id}/transcript")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_get_transcript_empty(client, video_with_creator):
"""Video exists but has no segments — returns empty list."""
vid = video_with_creator["video_id"]
resp = await client.get(f"/api/v1/videos/{vid}/transcript")
assert resp.status_code == 200
data = resp.json()
assert data["video_id"] == str(vid)
assert data["total"] == 0
assert data["segments"] == []