test: Added version list/detail API endpoints, Pydantic schemas, versio…

- "backend/schemas.py"
- "backend/routers/techniques.py"
- "backend/tests/test_public_api.py"

GSD-Task: S04/T02
This commit is contained in:
jlightner 2026-03-30 07:17:42 +00:00
parent a4d298502c
commit 872ffe0543
3 changed files with 224 additions and 2 deletions

View file

@ -6,12 +6,12 @@ import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from database import get_session
from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage
from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage, TechniquePageVersion
from schemas import (
CreatorInfo,
KeyMomentSummary,
@ -19,6 +19,9 @@ from schemas import (
RelatedLinkItem,
TechniquePageDetail,
TechniquePageRead,
TechniquePageVersionDetail,
TechniquePageVersionListResponse,
TechniquePageVersionSummary,
)
logger = logging.getLogger("chrysopedia.techniques")
@ -130,9 +133,77 @@ async def get_technique(
)
base = TechniquePageRead.model_validate(page)
# Count versions for this page
version_count_stmt = select(func.count()).where(
TechniquePageVersion.technique_page_id == page.id
)
version_count_result = await db.execute(version_count_stmt)
version_count = version_count_result.scalar() or 0
return TechniquePageDetail(
**base.model_dump(),
key_moments=key_moment_items,
creator_info=creator_info,
related_links=related_links,
version_count=version_count,
)
@router.get("/{slug}/versions", response_model=TechniquePageVersionListResponse)
async def list_technique_versions(
slug: str,
db: AsyncSession = Depends(get_session),
) -> TechniquePageVersionListResponse:
"""List all version snapshots for a technique page, newest first."""
# Resolve the technique page
page_stmt = select(TechniquePage).where(TechniquePage.slug == slug)
page_result = await db.execute(page_stmt)
page = page_result.scalar_one_or_none()
if page is None:
raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found")
# Fetch versions ordered by version_number DESC
versions_stmt = (
select(TechniquePageVersion)
.where(TechniquePageVersion.technique_page_id == page.id)
.order_by(TechniquePageVersion.version_number.desc())
)
versions_result = await db.execute(versions_stmt)
versions = versions_result.scalars().all()
items = [TechniquePageVersionSummary.model_validate(v) for v in versions]
return TechniquePageVersionListResponse(items=items, total=len(items))
@router.get("/{slug}/versions/{version_number}", response_model=TechniquePageVersionDetail)
async def get_technique_version(
slug: str,
version_number: int,
db: AsyncSession = Depends(get_session),
) -> TechniquePageVersionDetail:
"""Get a specific version snapshot by version number."""
# Resolve the technique page
page_stmt = select(TechniquePage).where(TechniquePage.slug == slug)
page_result = await db.execute(page_stmt)
page = page_result.scalar_one_or_none()
if page is None:
raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found")
# Fetch the specific version
version_stmt = (
select(TechniquePageVersion)
.where(
TechniquePageVersion.technique_page_id == page.id,
TechniquePageVersion.version_number == version_number,
)
)
version_result = await db.execute(version_stmt)
version = version_result.scalar_one_or_none()
if version is None:
raise HTTPException(
status_code=404,
detail=f"Version {version_number} not found for technique '{slug}'",
)
return TechniquePageVersionDetail.model_validate(version)

View file

@ -312,6 +312,34 @@ class TechniquePageDetail(TechniquePageRead):
key_moments: list[KeyMomentSummary] = Field(default_factory=list)
creator_info: CreatorInfo | None = None
related_links: list[RelatedLinkItem] = Field(default_factory=list)
version_count: int = 0
# ── Technique Page Versions ──────────────────────────────────────────────────
class TechniquePageVersionSummary(BaseModel):
"""Lightweight version entry for list responses."""
model_config = ConfigDict(from_attributes=True)
version_number: int
created_at: datetime
pipeline_metadata: dict | None = None
class TechniquePageVersionDetail(BaseModel):
"""Full version snapshot for detail responses."""
model_config = ConfigDict(from_attributes=True)
version_number: int
content_snapshot: dict
pipeline_metadata: dict | None = None
created_at: datetime
class TechniquePageVersionListResponse(BaseModel):
"""Response for version list endpoint."""
items: list[TechniquePageVersionSummary] = Field(default_factory=list)
total: int = 0
# ── Topics ───────────────────────────────────────────────────────────────────

View file

@ -401,3 +401,126 @@ async def test_creators_empty_list(client, db_engine):
data = resp.json()
assert data == []
# ── Version Tests ────────────────────────────────────────────────────────────
async def _insert_version(db_engine, technique_page_id: str, version_number: int, content_snapshot: dict, pipeline_metadata: dict | None = None):
"""Insert a TechniquePageVersion row directly for testing."""
from models import TechniquePageVersion
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
v = TechniquePageVersion(
technique_page_id=uuid.UUID(technique_page_id) if isinstance(technique_page_id, str) else technique_page_id,
version_number=version_number,
content_snapshot=content_snapshot,
pipeline_metadata=pipeline_metadata,
)
session.add(v)
await session.commit()
@pytest.mark.asyncio
async def test_version_list_empty(client, db_engine):
"""GET /techniques/{slug}/versions returns empty list when page has no versions."""
seed = await _seed_full_data(db_engine)
resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}/versions")
assert resp.status_code == 200
data = resp.json()
assert data["items"] == []
assert data["total"] == 0
@pytest.mark.asyncio
async def test_version_list_with_versions(client, db_engine):
"""GET /techniques/{slug}/versions returns versions after inserting them."""
seed = await _seed_full_data(db_engine)
# Get the technique page ID by fetching the detail
detail_resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}")
page_id = detail_resp.json()["id"]
# Insert two versions
snapshot1 = {"title": "Old Reese Bass v1", "summary": "First draft"}
snapshot2 = {"title": "Old Reese Bass v2", "summary": "Second draft"}
await _insert_version(db_engine, page_id, 1, snapshot1, {"model": "gpt-4o"})
await _insert_version(db_engine, page_id, 2, snapshot2, {"model": "gpt-4o-mini"})
resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}/versions")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 2
assert len(data["items"]) == 2
# Ordered by version_number DESC
assert data["items"][0]["version_number"] == 2
assert data["items"][1]["version_number"] == 1
assert data["items"][0]["pipeline_metadata"]["model"] == "gpt-4o-mini"
assert data["items"][1]["pipeline_metadata"]["model"] == "gpt-4o"
@pytest.mark.asyncio
async def test_version_detail_returns_content_snapshot(client, db_engine):
"""GET /techniques/{slug}/versions/{version_number} returns full snapshot."""
seed = await _seed_full_data(db_engine)
detail_resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}")
page_id = detail_resp.json()["id"]
snapshot = {"title": "Old Title", "summary": "Old summary", "body_sections": {"intro": "Old intro"}}
metadata = {"model": "gpt-4o", "prompt_hash": "abc123"}
await _insert_version(db_engine, page_id, 1, snapshot, metadata)
resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}/versions/1")
assert resp.status_code == 200
data = resp.json()
assert data["version_number"] == 1
assert data["content_snapshot"] == snapshot
assert data["pipeline_metadata"] == metadata
assert "created_at" in data
@pytest.mark.asyncio
async def test_version_detail_404_for_nonexistent_version(client, db_engine):
"""GET /techniques/{slug}/versions/999 returns 404."""
seed = await _seed_full_data(db_engine)
resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}/versions/999")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_versions_404_for_nonexistent_slug(client, db_engine):
"""GET /techniques/nonexistent-slug/versions returns 404."""
await _seed_full_data(db_engine)
resp = await client.get(f"{TECHNIQUES_URL}/nonexistent-slug-xyz/versions")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_technique_detail_includes_version_count(client, db_engine):
"""GET /techniques/{slug} includes version_count field."""
seed = await _seed_full_data(db_engine)
# Initially version_count should be 0
resp = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}")
assert resp.status_code == 200
data = resp.json()
assert data["version_count"] == 0
# Insert a version and check again
page_id = data["id"]
await _insert_version(db_engine, page_id, 1, {"title": "Snapshot"})
resp2 = await client.get(f"{TECHNIQUES_URL}/{seed['tp1_slug']}")
assert resp2.status_code == 200
assert resp2.json()["version_count"] == 1