chrysopedia/backend/routers/techniques.py
jlightner 61546bf25b perf: eliminate N+1 queries in stale-pages, add videos pagination, cache related techniques
- Rewrote stale-pages endpoint to use a single query with row_number
  window function instead of per-page queries for latest version + creator
- Added optional offset/limit/status/creator_id params to videos endpoint
  (backward compatible — defaults return all results)
- Added 1-hour Redis cache to _find_dynamic_related technique scoring
2026-04-03 05:50:53 +00:00

411 lines
14 KiB
Python

"""Technique page endpoints — list and detail with eager-loaded relations."""
from __future__ import annotations
import json
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from database import get_session
from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage, TechniquePageVersion, TechniquePageVideo
from redis_client import get_redis
from schemas import (
CreatorInfo,
KeyMomentSummary,
PaginatedResponse,
RelatedLinkItem,
SourceVideoSummary,
TechniquePageDetail,
TechniquePageRead,
TechniquePageVersionDetail,
TechniquePageVersionListResponse,
TechniquePageVersionSummary,
)
logger = logging.getLogger("chrysopedia.techniques")
RELATED_CACHE_TTL = 3600 # 1 hour — related links only change when pages are created
router = APIRouter(prefix="/techniques", tags=["techniques"])
async def _find_dynamic_related(
db: AsyncSession,
page: TechniquePage,
exclude_slugs: set[str],
limit: int,
) -> list[RelatedLinkItem]:
"""Score and return dynamically related technique pages.
Results are cached in Redis for 1 hour per page slug since related
links only change when technique pages are created or updated.
Scoring:
- Same creator + same topic_category: +3
- Same creator, different category: +2
- Same topic_category, different creator: +2
- Each overlapping topic_tag: +1
"""
cache_key = f"chrysopedia:related:{page.slug}"
# Try Redis cache
try:
redis = await get_redis()
cached = await redis.get(cache_key)
await redis.aclose()
if cached:
items = json.loads(cached)
# Filter out any that should be excluded and respect limit
filtered = [i for i in items if i["target_slug"] not in exclude_slugs]
return [RelatedLinkItem(**i) for i in filtered[:limit]]
except Exception:
pass
# Cache miss — compute from DB
stmt = (
select(TechniquePage)
.options(selectinload(TechniquePage.creator))
.where(TechniquePage.id != page.id)
)
if exclude_slugs:
stmt = stmt.where(TechniquePage.slug.notin_(exclude_slugs))
result = await db.execute(stmt)
candidates = result.scalars().all()
if not candidates:
return []
current_tags = set(page.topic_tags) if page.topic_tags else set()
scored: list[tuple[int, str, TechniquePage]] = []
for cand in candidates:
score = 0
reasons: list[str] = []
same_creator = cand.creator_id == page.creator_id
same_category = cand.topic_category == page.topic_category
if same_creator and same_category:
score += 3
reasons.append("Same creator, same topic")
elif same_creator:
score += 2
reasons.append("Same creator")
elif same_category:
score += 2
reasons.append(f"Also about {page.topic_category}")
if current_tags:
cand_tags = set(cand.topic_tags) if cand.topic_tags else set()
shared = current_tags & cand_tags
if shared:
score += len(shared)
reasons.append(f"Shared tags: {', '.join(sorted(shared))}")
if score > 0:
scored.append((score, "; ".join(reasons), cand))
scored.sort(key=lambda x: (-x[0], x[2].title))
results: list[RelatedLinkItem] = []
for score, reason, cand in scored[:limit]:
creator_name = cand.creator.name if cand.creator else ""
results.append(
RelatedLinkItem(
target_title=cand.title,
target_slug=cand.slug,
relationship="dynamic",
creator_name=creator_name,
topic_category=cand.topic_category,
reason=reason,
)
)
# Write to Redis cache (best-effort)
try:
redis = await get_redis()
await redis.set(
cache_key,
json.dumps([r.model_dump() for r in results]),
ex=RELATED_CACHE_TTL,
)
await redis.aclose()
except Exception:
pass
return results
@router.get("", response_model=PaginatedResponse)
async def list_techniques(
category: Annotated[str | None, Query()] = None,
creator_slug: Annotated[str | None, Query()] = None,
sort: Annotated[str, Query()] = "recent",
offset: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 50,
db: AsyncSession = Depends(get_session),
) -> PaginatedResponse:
"""List technique pages with optional category/creator filtering."""
# Correlated subquery for key moment count (same pattern as creators.py)
key_moment_count_sq = (
select(func.count())
.where(KeyMoment.technique_page_id == TechniquePage.id)
.correlate(TechniquePage)
.scalar_subquery()
)
# Build base query with filters
base_stmt = select(TechniquePage.id)
if category:
base_stmt = base_stmt.where(TechniquePage.topic_category == category)
if creator_slug:
base_stmt = base_stmt.join(Creator, TechniquePage.creator_id == Creator.id).where(
Creator.slug == creator_slug
)
# Count total before pagination
count_stmt = select(func.count()).select_from(base_stmt.subquery())
count_result = await db.execute(count_stmt)
total = count_result.scalar() or 0
# Main query with subquery column
stmt = select(
TechniquePage,
key_moment_count_sq.label("key_moment_count"),
)
if category:
stmt = stmt.where(TechniquePage.topic_category == category)
if creator_slug:
stmt = stmt.join(Creator, TechniquePage.creator_id == Creator.id).where(
Creator.slug == creator_slug
)
stmt = stmt.options(selectinload(TechniquePage.creator))
if sort == "random":
stmt = stmt.order_by(func.random())
elif sort == "oldest":
stmt = stmt.order_by(TechniquePage.created_at.asc())
elif sort == "alpha":
stmt = stmt.order_by(TechniquePage.title.asc())
elif sort == "creator":
# Need a join for creator name ordering; avoid duplicate join if creator_slug filter already joined
if not creator_slug:
stmt = stmt.join(Creator, TechniquePage.creator_id == Creator.id, isouter=True)
stmt = stmt.order_by(Creator.name.asc(), TechniquePage.title.asc())
else:
# Default: "recent" — newest first
stmt = stmt.order_by(TechniquePage.created_at.desc())
stmt = stmt.offset(offset).limit(limit)
result = await db.execute(stmt)
rows = result.all()
items = []
for row in rows:
p = row[0]
km_count = row[1] or 0
item = TechniquePageRead.model_validate(p)
if p.creator:
item.creator_name = p.creator.name
item.creator_slug = p.creator.slug
item.key_moment_count = km_count
items.append(item)
return PaginatedResponse(
items=items,
total=total,
offset=offset,
limit=limit,
)
@router.get("/random")
async def random_technique(
db: AsyncSession = Depends(get_session),
) -> dict:
"""Return the slug of a single random technique page."""
stmt = select(TechniquePage.slug).order_by(func.random()).limit(1)
result = await db.execute(stmt)
slug = result.scalar_one_or_none()
if slug is None:
raise HTTPException(status_code=404, detail="No techniques available")
return {"slug": slug}
@router.get("/{slug}", response_model=TechniquePageDetail)
async def get_technique(
slug: str,
db: AsyncSession = Depends(get_session),
) -> TechniquePageDetail:
"""Get full technique page detail with key moments, creator, and related links."""
stmt = (
select(TechniquePage)
.where(TechniquePage.slug == slug)
.options(
selectinload(TechniquePage.key_moments).selectinload(KeyMoment.source_video),
selectinload(TechniquePage.creator),
selectinload(TechniquePage.outgoing_links).selectinload(
RelatedTechniqueLink.target_page
),
selectinload(TechniquePage.incoming_links).selectinload(
RelatedTechniqueLink.source_page
),
selectinload(TechniquePage.source_video_links).selectinload(
TechniquePageVideo.source_video
),
)
)
result = await db.execute(stmt)
page = result.scalar_one_or_none()
if page is None:
raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found")
# Build key moments (ordered by start_time)
key_moments = sorted(page.key_moments, key=lambda km: km.start_time)
key_moment_items = []
for km in key_moments:
item = KeyMomentSummary.model_validate(km)
item.video_filename = km.source_video.filename if km.source_video else ""
key_moment_items.append(item)
# Build creator info
creator_info = None
if page.creator:
creator_info = CreatorInfo(
name=page.creator.name,
slug=page.creator.slug,
genres=page.creator.genres,
)
# Build related links (outgoing + incoming)
related_links: list[RelatedLinkItem] = []
for link in page.outgoing_links:
if link.target_page:
related_links.append(
RelatedLinkItem(
target_title=link.target_page.title,
target_slug=link.target_page.slug,
relationship=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship),
)
)
for link in page.incoming_links:
if link.source_page:
related_links.append(
RelatedLinkItem(
target_title=link.source_page.title,
target_slug=link.source_page.slug,
relationship=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship),
)
)
# Supplement with dynamic related techniques (up to 4 total)
curated_slugs = {link.target_slug for link in related_links}
max_related = 4
if len(related_links) < max_related:
remaining = max_related - len(related_links)
try:
dynamic_links = await _find_dynamic_related(
db, page, curated_slugs, remaining
)
related_links.extend(dynamic_links)
except Exception:
logger.warning(
"Dynamic related query failed for %s, continuing with curated only",
slug,
exc_info=True,
)
base = TechniquePageRead.model_validate(page)
# Count versions for this page
version_count_stmt = select(func.count()).where(
TechniquePageVersion.technique_page_id == page.id
)
version_count_result = await db.execute(version_count_stmt)
version_count = version_count_result.scalar() or 0
# Build source video list from association table
source_videos = [
SourceVideoSummary(
id=link.source_video.id,
filename=link.source_video.filename,
content_type=link.source_video.content_type.value if hasattr(link.source_video.content_type, 'value') else str(link.source_video.content_type),
added_at=link.added_at,
)
for link in page.source_video_links
if link.source_video is not None
]
return TechniquePageDetail(
**base.model_dump(),
key_moments=key_moment_items,
creator_info=creator_info,
related_links=related_links,
version_count=version_count,
source_videos=source_videos,
)
@router.get("/{slug}/versions", response_model=TechniquePageVersionListResponse)
async def list_technique_versions(
slug: str,
db: AsyncSession = Depends(get_session),
) -> TechniquePageVersionListResponse:
"""List all version snapshots for a technique page, newest first."""
# Resolve the technique page
page_stmt = select(TechniquePage).where(TechniquePage.slug == slug)
page_result = await db.execute(page_stmt)
page = page_result.scalar_one_or_none()
if page is None:
raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found")
# Fetch versions ordered by version_number DESC
versions_stmt = (
select(TechniquePageVersion)
.where(TechniquePageVersion.technique_page_id == page.id)
.order_by(TechniquePageVersion.version_number.desc())
)
versions_result = await db.execute(versions_stmt)
versions = versions_result.scalars().all()
items = [TechniquePageVersionSummary.model_validate(v) for v in versions]
return TechniquePageVersionListResponse(items=items, total=len(items))
@router.get("/{slug}/versions/{version_number}", response_model=TechniquePageVersionDetail)
async def get_technique_version(
slug: str,
version_number: int,
db: AsyncSession = Depends(get_session),
) -> TechniquePageVersionDetail:
"""Get a specific version snapshot by version number."""
# Resolve the technique page
page_stmt = select(TechniquePage).where(TechniquePage.slug == slug)
page_result = await db.execute(page_stmt)
page = page_result.scalar_one_or_none()
if page is None:
raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found")
# Fetch the specific version
version_stmt = (
select(TechniquePageVersion)
.where(
TechniquePageVersion.technique_page_id == page.id,
TechniquePageVersion.version_number == version_number,
)
)
version_result = await db.execute(version_stmt)
version = version_result.scalar_one_or_none()
if version is None:
raise HTTPException(
status_code=404,
detail=f"Version {version_number} not found for technique '{slug}'",
)
return TechniquePageVersionDetail.model_validate(version)