"""Technique page endpoints — list and detail with eager-loaded relations.""" from __future__ import annotations import logging from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from database import get_session from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage, TechniquePageVersion from schemas import ( CreatorInfo, KeyMomentSummary, PaginatedResponse, RelatedLinkItem, TechniquePageDetail, TechniquePageRead, TechniquePageVersionDetail, TechniquePageVersionListResponse, TechniquePageVersionSummary, ) logger = logging.getLogger("chrysopedia.techniques") router = APIRouter(prefix="/techniques", tags=["techniques"]) async def _find_dynamic_related( db: AsyncSession, page: TechniquePage, exclude_slugs: set[str], limit: int, ) -> list[RelatedLinkItem]: """Score and return dynamically related technique pages. Scoring: - Same creator + same topic_category: +3 - Same creator, different category: +2 - Same topic_category, different creator: +2 - Each overlapping topic_tag: +1 """ exclude_ids = {page.id} # Base: all other technique pages, eagerly load creator for name stmt = ( select(TechniquePage) .options(selectinload(TechniquePage.creator)) .where(TechniquePage.id != page.id) ) if exclude_slugs: stmt = stmt.where(TechniquePage.slug.notin_(exclude_slugs)) result = await db.execute(stmt) candidates = result.scalars().all() if not candidates: return [] current_tags = set(page.topic_tags) if page.topic_tags else set() scored: list[tuple[int, str, TechniquePage]] = [] for cand in candidates: score = 0 reasons: list[str] = [] same_creator = cand.creator_id == page.creator_id same_category = cand.topic_category == page.topic_category if same_creator and same_category: score += 3 reasons.append("Same creator, same topic") elif same_creator: score += 2 reasons.append("Same creator") elif same_category: score += 2 reasons.append(f"Also about {page.topic_category}") # Tag overlap scoring if current_tags: cand_tags = set(cand.topic_tags) if cand.topic_tags else set() shared = current_tags & cand_tags if shared: score += len(shared) reasons.append(f"Shared tags: {', '.join(sorted(shared))}") if score > 0: scored.append((score, "; ".join(reasons), cand)) # Sort descending by score, then by title for determinism scored.sort(key=lambda x: (-x[0], x[2].title)) results: list[RelatedLinkItem] = [] for score, reason, cand in scored[:limit]: creator_name = cand.creator.name if cand.creator else "" results.append( RelatedLinkItem( target_title=cand.title, target_slug=cand.slug, relationship="dynamic", creator_name=creator_name, topic_category=cand.topic_category, reason=reason, ) ) return results @router.get("", response_model=PaginatedResponse) async def list_techniques( category: Annotated[str | None, Query()] = None, creator_slug: Annotated[str | None, Query()] = None, sort: Annotated[str, Query()] = "recent", offset: Annotated[int, Query(ge=0)] = 0, limit: Annotated[int, Query(ge=1, le=100)] = 50, db: AsyncSession = Depends(get_session), ) -> PaginatedResponse: """List technique pages with optional category/creator filtering.""" # Correlated subquery for key moment count (same pattern as creators.py) key_moment_count_sq = ( select(func.count()) .where(KeyMoment.technique_page_id == TechniquePage.id) .correlate(TechniquePage) .scalar_subquery() ) # Build base query with filters base_stmt = select(TechniquePage.id) if category: base_stmt = base_stmt.where(TechniquePage.topic_category == category) if creator_slug: base_stmt = base_stmt.join(Creator, TechniquePage.creator_id == Creator.id).where( Creator.slug == creator_slug ) # Count total before pagination count_stmt = select(func.count()).select_from(base_stmt.subquery()) count_result = await db.execute(count_stmt) total = count_result.scalar() or 0 # Main query with subquery column stmt = select( TechniquePage, key_moment_count_sq.label("key_moment_count"), ) if category: stmt = stmt.where(TechniquePage.topic_category == category) if creator_slug: stmt = stmt.join(Creator, TechniquePage.creator_id == Creator.id).where( Creator.slug == creator_slug ) stmt = stmt.options(selectinload(TechniquePage.creator)) if sort == "random": stmt = stmt.order_by(func.random()) elif sort == "oldest": stmt = stmt.order_by(TechniquePage.created_at.asc()) elif sort == "alpha": stmt = stmt.order_by(TechniquePage.title.asc()) elif sort == "creator": # Need a join for creator name ordering; avoid duplicate join if creator_slug filter already joined if not creator_slug: stmt = stmt.join(Creator, TechniquePage.creator_id == Creator.id, isouter=True) stmt = stmt.order_by(Creator.name.asc(), TechniquePage.title.asc()) else: # Default: "recent" — newest first stmt = stmt.order_by(TechniquePage.created_at.desc()) stmt = stmt.offset(offset).limit(limit) result = await db.execute(stmt) rows = result.all() items = [] for row in rows: p = row[0] km_count = row[1] or 0 item = TechniquePageRead.model_validate(p) if p.creator: item.creator_name = p.creator.name item.creator_slug = p.creator.slug item.key_moment_count = km_count items.append(item) return PaginatedResponse( items=items, total=total, offset=offset, limit=limit, ) @router.get("/random") async def random_technique( db: AsyncSession = Depends(get_session), ) -> dict: """Return the slug of a single random technique page.""" stmt = select(TechniquePage.slug).order_by(func.random()).limit(1) result = await db.execute(stmt) slug = result.scalar_one_or_none() if slug is None: raise HTTPException(status_code=404, detail="No techniques available") return {"slug": slug} @router.get("/{slug}", response_model=TechniquePageDetail) async def get_technique( slug: str, db: AsyncSession = Depends(get_session), ) -> TechniquePageDetail: """Get full technique page detail with key moments, creator, and related links.""" stmt = ( select(TechniquePage) .where(TechniquePage.slug == slug) .options( selectinload(TechniquePage.key_moments).selectinload(KeyMoment.source_video), selectinload(TechniquePage.creator), selectinload(TechniquePage.outgoing_links).selectinload( RelatedTechniqueLink.target_page ), selectinload(TechniquePage.incoming_links).selectinload( RelatedTechniqueLink.source_page ), ) ) result = await db.execute(stmt) page = result.scalar_one_or_none() if page is None: raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found") # Build key moments (ordered by start_time) key_moments = sorted(page.key_moments, key=lambda km: km.start_time) key_moment_items = [] for km in key_moments: item = KeyMomentSummary.model_validate(km) item.video_filename = km.source_video.filename if km.source_video else "" key_moment_items.append(item) # Build creator info creator_info = None if page.creator: creator_info = CreatorInfo( name=page.creator.name, slug=page.creator.slug, genres=page.creator.genres, ) # Build related links (outgoing + incoming) related_links: list[RelatedLinkItem] = [] for link in page.outgoing_links: if link.target_page: related_links.append( RelatedLinkItem( target_title=link.target_page.title, target_slug=link.target_page.slug, relationship=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship), ) ) for link in page.incoming_links: if link.source_page: related_links.append( RelatedLinkItem( target_title=link.source_page.title, target_slug=link.source_page.slug, relationship=link.relationship.value if hasattr(link.relationship, 'value') else str(link.relationship), ) ) # Supplement with dynamic related techniques (up to 4 total) curated_slugs = {link.target_slug for link in related_links} max_related = 4 if len(related_links) < max_related: remaining = max_related - len(related_links) try: dynamic_links = await _find_dynamic_related( db, page, curated_slugs, remaining ) related_links.extend(dynamic_links) except Exception: logger.warning( "Dynamic related query failed for %s, continuing with curated only", slug, exc_info=True, ) base = TechniquePageRead.model_validate(page) # Count versions for this page version_count_stmt = select(func.count()).where( TechniquePageVersion.technique_page_id == page.id ) version_count_result = await db.execute(version_count_stmt) version_count = version_count_result.scalar() or 0 return TechniquePageDetail( **base.model_dump(), key_moments=key_moment_items, creator_info=creator_info, related_links=related_links, version_count=version_count, ) @router.get("/{slug}/versions", response_model=TechniquePageVersionListResponse) async def list_technique_versions( slug: str, db: AsyncSession = Depends(get_session), ) -> TechniquePageVersionListResponse: """List all version snapshots for a technique page, newest first.""" # Resolve the technique page page_stmt = select(TechniquePage).where(TechniquePage.slug == slug) page_result = await db.execute(page_stmt) page = page_result.scalar_one_or_none() if page is None: raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found") # Fetch versions ordered by version_number DESC versions_stmt = ( select(TechniquePageVersion) .where(TechniquePageVersion.technique_page_id == page.id) .order_by(TechniquePageVersion.version_number.desc()) ) versions_result = await db.execute(versions_stmt) versions = versions_result.scalars().all() items = [TechniquePageVersionSummary.model_validate(v) for v in versions] return TechniquePageVersionListResponse(items=items, total=len(items)) @router.get("/{slug}/versions/{version_number}", response_model=TechniquePageVersionDetail) async def get_technique_version( slug: str, version_number: int, db: AsyncSession = Depends(get_session), ) -> TechniquePageVersionDetail: """Get a specific version snapshot by version number.""" # Resolve the technique page page_stmt = select(TechniquePage).where(TechniquePage.slug == slug) page_result = await db.execute(page_stmt) page = page_result.scalar_one_or_none() if page is None: raise HTTPException(status_code=404, detail=f"Technique '{slug}' not found") # Fetch the specific version version_stmt = ( select(TechniquePageVersion) .where( TechniquePageVersion.technique_page_id == page.id, TechniquePageVersion.version_number == version_number, ) ) version_result = await db.execute(version_stmt) version = version_result.scalar_one_or_none() if version is None: raise HTTPException( status_code=404, detail=f"Version {version_number} not found for technique '{slug}'", ) return TechniquePageVersionDetail.model_validate(version)