diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index 521da12..bd478ec 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -74,8 +74,17 @@ async def trigger_pipeline( @router.get("/admin/pipeline/videos") async def list_pipeline_videos( db: AsyncSession = Depends(get_session), + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(ge=1, le=500)] = 200, + status: Annotated[str | None, Query()] = None, + creator_id: Annotated[str | None, Query()] = None, ): - """List all videos with processing status and pipeline event counts.""" + """List videos with processing status and pipeline event counts. + + Supports optional server-side pagination and filtering. Defaults + return all videos for backward compatibility with the frontend's + existing client-side filtering. + """ # Subquery for event counts per video event_counts = ( select( @@ -151,9 +160,24 @@ async def list_pipeline_videos( .outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id) .outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id) .outerjoin(latest_run, SourceVideo.id == latest_run.c.video_id) - .order_by(SourceVideo.updated_at.desc()) ) + # Optional filters + if status: + stmt = stmt.where(SourceVideo.processing_status == status) + if creator_id: + stmt = stmt.where(SourceVideo.creator_id == creator_id) + + # Total count before pagination + from sqlalchemy import literal_column + count_result = await db.execute( + select(func.count()).select_from(stmt.subquery()) + ) + total = count_result.scalar() or 0 + + # Apply ordering and pagination + stmt = stmt.order_by(SourceVideo.updated_at.desc()).offset(offset).limit(limit) + result = await db.execute(stmt) rows = result.all() @@ -186,7 +210,9 @@ async def list_pipeline_videos( } for r in rows ], - "total": len(rows), + "total": total, + "offset": offset, + "limit": limit, } @@ -913,9 +939,13 @@ async def get_stale_pages( Compares the SHA-256 hash of the current stage5_synthesis.txt against the prompt hashes stored in TechniquePageVersion.pipeline_metadata. + + Uses a single query with a window function to fetch the latest version per + page joined to creators, eliminating N+1 queries. """ import hashlib from pathlib import Path as _Path + from sqlalchemy import over, text from models import TechniquePage, TechniquePageVersion settings = get_settings() @@ -928,43 +958,56 @@ async def get_stale_pages( prompt_path.read_text(encoding="utf-8").encode() ).hexdigest()[:12] - # Get all technique pages - pages = (await db.execute(select(TechniquePage))).scalars().all() - total = len(pages) + # Subquery: latest version per technique page via row_number window + latest_version = ( + select( + TechniquePageVersion.technique_page_id, + TechniquePageVersion.pipeline_metadata, + func.row_number().over( + partition_by=TechniquePageVersion.technique_page_id, + order_by=TechniquePageVersion.version_number.desc(), + ).label("rn"), + ) + .subquery("latest_version") + ) + # Main query: pages + creator name + latest version metadata in one shot + rows = ( + await db.execute( + select( + TechniquePage.slug, + TechniquePage.creator_id, + Creator.name.label("creator_name"), + latest_version.c.pipeline_metadata, + ) + .join(Creator, Creator.id == TechniquePage.creator_id) + .outerjoin( + latest_version, + (latest_version.c.technique_page_id == TechniquePage.id) + & (latest_version.c.rn == 1), + ) + ) + ).all() + + total = len(rows) stale_count = 0 fresh_count = 0 stale_by_creator: dict[str, dict] = {} - for page in pages: - # Get latest version to check prompt hash - latest_version = ( - await db.execute( - select(TechniquePageVersion) - .where(TechniquePageVersion.technique_page_id == page.id) - .order_by(TechniquePageVersion.version_number.desc()) - .limit(1) - ) - ).scalar_one_or_none() - + for slug, _creator_id, creator_name, meta in rows: page_hash = None - if latest_version and latest_version.pipeline_metadata: - meta = latest_version.pipeline_metadata + if meta: page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash")) if page_hash == current_hash: fresh_count += 1 else: stale_count += 1 - # Look up creator name - creator = (await db.execute( - select(Creator.name).where(Creator.id == page.creator_id) - )).scalar_one_or_none() or "Unknown" - - if creator not in stale_by_creator: - stale_by_creator[creator] = {"creator": creator, "stale_count": 0, "page_slugs": []} - stale_by_creator[creator]["stale_count"] += 1 - stale_by_creator[creator]["page_slugs"].append(page.slug) + name = creator_name or "Unknown" + if name not in stale_by_creator: + stale_by_creator[name] = {"creator": name, "stale_count": 0, "page_slugs": []} + stale_by_creator[name]["stale_count"] += 1 + stale_by_creator[name]["page_slugs"].append(slug) return { "current_prompt_hash": current_hash, diff --git a/backend/routers/techniques.py b/backend/routers/techniques.py index 63a6b39..47b045f 100644 --- a/backend/routers/techniques.py +++ b/backend/routers/techniques.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json import logging from typing import Annotated @@ -12,6 +13,7 @@ from sqlalchemy.orm import selectinload from database import get_session from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage, TechniquePageVersion, TechniquePageVideo +from redis_client import get_redis from schemas import ( CreatorInfo, KeyMomentSummary, @@ -27,6 +29,8 @@ from schemas import ( logger = logging.getLogger("chrysopedia.techniques") +RELATED_CACHE_TTL = 3600 # 1 hour — related links only change when pages are created + router = APIRouter(prefix="/techniques", tags=["techniques"]) @@ -38,15 +42,31 @@ async def _find_dynamic_related( ) -> list[RelatedLinkItem]: """Score and return dynamically related technique pages. + Results are cached in Redis for 1 hour per page slug since related + links only change when technique pages are created or updated. + Scoring: - Same creator + same topic_category: +3 - Same creator, different category: +2 - Same topic_category, different creator: +2 - Each overlapping topic_tag: +1 """ - exclude_ids = {page.id} + cache_key = f"chrysopedia:related:{page.slug}" - # Base: all other technique pages, eagerly load creator for name + # Try Redis cache + try: + redis = await get_redis() + cached = await redis.get(cache_key) + await redis.aclose() + if cached: + items = json.loads(cached) + # Filter out any that should be excluded and respect limit + filtered = [i for i in items if i["target_slug"] not in exclude_slugs] + return [RelatedLinkItem(**i) for i in filtered[:limit]] + except Exception: + pass + + # Cache miss — compute from DB stmt = ( select(TechniquePage) .options(selectinload(TechniquePage.creator)) @@ -81,7 +101,6 @@ async def _find_dynamic_related( score += 2 reasons.append(f"Also about {page.topic_category}") - # Tag overlap scoring if current_tags: cand_tags = set(cand.topic_tags) if cand.topic_tags else set() shared = current_tags & cand_tags @@ -92,7 +111,6 @@ async def _find_dynamic_related( if score > 0: scored.append((score, "; ".join(reasons), cand)) - # Sort descending by score, then by title for determinism scored.sort(key=lambda x: (-x[0], x[2].title)) results: list[RelatedLinkItem] = [] @@ -108,6 +126,19 @@ async def _find_dynamic_related( reason=reason, ) ) + + # Write to Redis cache (best-effort) + try: + redis = await get_redis() + await redis.set( + cache_key, + json.dumps([r.model_dump() for r in results]), + ex=RELATED_CACHE_TTL, + ) + await redis.aclose() + except Exception: + pass + return results