perf: eliminate N+1 queries in stale-pages, add videos pagination, cache related techniques
- Rewrote stale-pages endpoint to use a single query with row_number window function instead of per-page queries for latest version + creator - Added optional offset/limit/status/creator_id params to videos endpoint (backward compatible — defaults return all results) - Added 1-hour Redis cache to _find_dynamic_related technique scoring
This commit is contained in:
parent
46983ae43b
commit
61546bf25b
2 changed files with 106 additions and 32 deletions
|
|
@ -74,8 +74,17 @@ async def trigger_pipeline(
|
|||
@router.get("/admin/pipeline/videos")
|
||||
async def list_pipeline_videos(
|
||||
db: AsyncSession = Depends(get_session),
|
||||
offset: Annotated[int, Query(ge=0)] = 0,
|
||||
limit: Annotated[int, Query(ge=1, le=500)] = 200,
|
||||
status: Annotated[str | None, Query()] = None,
|
||||
creator_id: Annotated[str | None, Query()] = None,
|
||||
):
|
||||
"""List all videos with processing status and pipeline event counts."""
|
||||
"""List videos with processing status and pipeline event counts.
|
||||
|
||||
Supports optional server-side pagination and filtering. Defaults
|
||||
return all videos for backward compatibility with the frontend's
|
||||
existing client-side filtering.
|
||||
"""
|
||||
# Subquery for event counts per video
|
||||
event_counts = (
|
||||
select(
|
||||
|
|
@ -151,9 +160,24 @@ async def list_pipeline_videos(
|
|||
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
|
||||
.outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id)
|
||||
.outerjoin(latest_run, SourceVideo.id == latest_run.c.video_id)
|
||||
.order_by(SourceVideo.updated_at.desc())
|
||||
)
|
||||
|
||||
# Optional filters
|
||||
if status:
|
||||
stmt = stmt.where(SourceVideo.processing_status == status)
|
||||
if creator_id:
|
||||
stmt = stmt.where(SourceVideo.creator_id == creator_id)
|
||||
|
||||
# Total count before pagination
|
||||
from sqlalchemy import literal_column
|
||||
count_result = await db.execute(
|
||||
select(func.count()).select_from(stmt.subquery())
|
||||
)
|
||||
total = count_result.scalar() or 0
|
||||
|
||||
# Apply ordering and pagination
|
||||
stmt = stmt.order_by(SourceVideo.updated_at.desc()).offset(offset).limit(limit)
|
||||
|
||||
result = await db.execute(stmt)
|
||||
rows = result.all()
|
||||
|
||||
|
|
@ -186,7 +210,9 @@ async def list_pipeline_videos(
|
|||
}
|
||||
for r in rows
|
||||
],
|
||||
"total": len(rows),
|
||||
"total": total,
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -913,9 +939,13 @@ async def get_stale_pages(
|
|||
|
||||
Compares the SHA-256 hash of the current stage5_synthesis.txt against the
|
||||
prompt hashes stored in TechniquePageVersion.pipeline_metadata.
|
||||
|
||||
Uses a single query with a window function to fetch the latest version per
|
||||
page joined to creators, eliminating N+1 queries.
|
||||
"""
|
||||
import hashlib
|
||||
from pathlib import Path as _Path
|
||||
from sqlalchemy import over, text
|
||||
from models import TechniquePage, TechniquePageVersion
|
||||
|
||||
settings = get_settings()
|
||||
|
|
@ -928,43 +958,56 @@ async def get_stale_pages(
|
|||
prompt_path.read_text(encoding="utf-8").encode()
|
||||
).hexdigest()[:12]
|
||||
|
||||
# Get all technique pages
|
||||
pages = (await db.execute(select(TechniquePage))).scalars().all()
|
||||
total = len(pages)
|
||||
# Subquery: latest version per technique page via row_number window
|
||||
latest_version = (
|
||||
select(
|
||||
TechniquePageVersion.technique_page_id,
|
||||
TechniquePageVersion.pipeline_metadata,
|
||||
func.row_number().over(
|
||||
partition_by=TechniquePageVersion.technique_page_id,
|
||||
order_by=TechniquePageVersion.version_number.desc(),
|
||||
).label("rn"),
|
||||
)
|
||||
.subquery("latest_version")
|
||||
)
|
||||
|
||||
# Main query: pages + creator name + latest version metadata in one shot
|
||||
rows = (
|
||||
await db.execute(
|
||||
select(
|
||||
TechniquePage.slug,
|
||||
TechniquePage.creator_id,
|
||||
Creator.name.label("creator_name"),
|
||||
latest_version.c.pipeline_metadata,
|
||||
)
|
||||
.join(Creator, Creator.id == TechniquePage.creator_id)
|
||||
.outerjoin(
|
||||
latest_version,
|
||||
(latest_version.c.technique_page_id == TechniquePage.id)
|
||||
& (latest_version.c.rn == 1),
|
||||
)
|
||||
)
|
||||
).all()
|
||||
|
||||
total = len(rows)
|
||||
stale_count = 0
|
||||
fresh_count = 0
|
||||
stale_by_creator: dict[str, dict] = {}
|
||||
|
||||
for page in pages:
|
||||
# Get latest version to check prompt hash
|
||||
latest_version = (
|
||||
await db.execute(
|
||||
select(TechniquePageVersion)
|
||||
.where(TechniquePageVersion.technique_page_id == page.id)
|
||||
.order_by(TechniquePageVersion.version_number.desc())
|
||||
.limit(1)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
for slug, _creator_id, creator_name, meta in rows:
|
||||
page_hash = None
|
||||
if latest_version and latest_version.pipeline_metadata:
|
||||
meta = latest_version.pipeline_metadata
|
||||
if meta:
|
||||
page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash"))
|
||||
|
||||
if page_hash == current_hash:
|
||||
fresh_count += 1
|
||||
else:
|
||||
stale_count += 1
|
||||
# Look up creator name
|
||||
creator = (await db.execute(
|
||||
select(Creator.name).where(Creator.id == page.creator_id)
|
||||
)).scalar_one_or_none() or "Unknown"
|
||||
|
||||
if creator not in stale_by_creator:
|
||||
stale_by_creator[creator] = {"creator": creator, "stale_count": 0, "page_slugs": []}
|
||||
stale_by_creator[creator]["stale_count"] += 1
|
||||
stale_by_creator[creator]["page_slugs"].append(page.slug)
|
||||
name = creator_name or "Unknown"
|
||||
if name not in stale_by_creator:
|
||||
stale_by_creator[name] = {"creator": name, "stale_count": 0, "page_slugs": []}
|
||||
stale_by_creator[name]["stale_count"] += 1
|
||||
stale_by_creator[name]["page_slugs"].append(slug)
|
||||
|
||||
return {
|
||||
"current_prompt_hash": current_hash,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Annotated
|
||||
|
||||
|
|
@ -12,6 +13,7 @@ from sqlalchemy.orm import selectinload
|
|||
|
||||
from database import get_session
|
||||
from models import Creator, KeyMoment, RelatedTechniqueLink, SourceVideo, TechniquePage, TechniquePageVersion, TechniquePageVideo
|
||||
from redis_client import get_redis
|
||||
from schemas import (
|
||||
CreatorInfo,
|
||||
KeyMomentSummary,
|
||||
|
|
@ -27,6 +29,8 @@ from schemas import (
|
|||
|
||||
logger = logging.getLogger("chrysopedia.techniques")
|
||||
|
||||
RELATED_CACHE_TTL = 3600 # 1 hour — related links only change when pages are created
|
||||
|
||||
router = APIRouter(prefix="/techniques", tags=["techniques"])
|
||||
|
||||
|
||||
|
|
@ -38,15 +42,31 @@ async def _find_dynamic_related(
|
|||
) -> list[RelatedLinkItem]:
|
||||
"""Score and return dynamically related technique pages.
|
||||
|
||||
Results are cached in Redis for 1 hour per page slug since related
|
||||
links only change when technique pages are created or updated.
|
||||
|
||||
Scoring:
|
||||
- Same creator + same topic_category: +3
|
||||
- Same creator, different category: +2
|
||||
- Same topic_category, different creator: +2
|
||||
- Each overlapping topic_tag: +1
|
||||
"""
|
||||
exclude_ids = {page.id}
|
||||
cache_key = f"chrysopedia:related:{page.slug}"
|
||||
|
||||
# Base: all other technique pages, eagerly load creator for name
|
||||
# Try Redis cache
|
||||
try:
|
||||
redis = await get_redis()
|
||||
cached = await redis.get(cache_key)
|
||||
await redis.aclose()
|
||||
if cached:
|
||||
items = json.loads(cached)
|
||||
# Filter out any that should be excluded and respect limit
|
||||
filtered = [i for i in items if i["target_slug"] not in exclude_slugs]
|
||||
return [RelatedLinkItem(**i) for i in filtered[:limit]]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Cache miss — compute from DB
|
||||
stmt = (
|
||||
select(TechniquePage)
|
||||
.options(selectinload(TechniquePage.creator))
|
||||
|
|
@ -81,7 +101,6 @@ async def _find_dynamic_related(
|
|||
score += 2
|
||||
reasons.append(f"Also about {page.topic_category}")
|
||||
|
||||
# Tag overlap scoring
|
||||
if current_tags:
|
||||
cand_tags = set(cand.topic_tags) if cand.topic_tags else set()
|
||||
shared = current_tags & cand_tags
|
||||
|
|
@ -92,7 +111,6 @@ async def _find_dynamic_related(
|
|||
if score > 0:
|
||||
scored.append((score, "; ".join(reasons), cand))
|
||||
|
||||
# Sort descending by score, then by title for determinism
|
||||
scored.sort(key=lambda x: (-x[0], x[2].title))
|
||||
|
||||
results: list[RelatedLinkItem] = []
|
||||
|
|
@ -108,6 +126,19 @@ async def _find_dynamic_related(
|
|||
reason=reason,
|
||||
)
|
||||
)
|
||||
|
||||
# Write to Redis cache (best-effort)
|
||||
try:
|
||||
redis = await get_redis()
|
||||
await redis.set(
|
||||
cache_key,
|
||||
json.dumps([r.model_dump() for r in results]),
|
||||
ex=RELATED_CACHE_TTL,
|
||||
)
|
||||
await redis.aclose()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return results
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue