Deleted files: - generate_stage5_variants.py (874 lines) — superseded by pipeline.quality toolkit - PROJECT_CONTEXT.md (461 lines) — stale, .gsd/PROJECT.md is the living doc - CHRYSOPEDIA-ASSESSMENT.md (654 lines) — M011 triage artifact, all findings actioned CSS cleanup (364 lines): - 20 orphaned block groups from deleted review queue/old components - Duplicate .btn base rule, .btn--warning, @keyframes stagePulse Python imports: - routers/pipeline.py: uuid, literal_column, over, text - tests/test_pipeline.py: 9 unused imports (PropertyMock, create_engine, etc.) Build verified: tsc --noEmit clean, npm run build clean (59 modules, 0 warnings).
1532 lines
56 KiB
Python
1532 lines
56 KiB
Python
"""Pipeline management endpoints — public trigger + admin dashboard.
|
|
|
|
Public:
|
|
POST /pipeline/trigger/{video_id} Trigger pipeline for a video
|
|
|
|
Admin:
|
|
GET /admin/pipeline/videos Video list with status + event counts
|
|
POST /admin/pipeline/trigger/{video_id} Retrigger (same as public but under admin prefix)
|
|
POST /admin/pipeline/revoke/{video_id} Revoke/cancel active tasks for a video
|
|
GET /admin/pipeline/events/{video_id} Event log for a video (paginated)
|
|
GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, select, case
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from config import get_settings
|
|
from database import get_session
|
|
from models import PipelineEvent, PipelineRun, PipelineRunStatus, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus, TechniquePage, TechniquePageVideo, TechniquePageVersion
|
|
from redis_client import get_redis
|
|
from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse, AdminTechniquePageItem, AdminTechniquePageListResponse
|
|
|
|
logger = logging.getLogger("chrysopedia.pipeline")
|
|
|
|
router = APIRouter(tags=["pipeline"])
|
|
|
|
REDIS_DEBUG_MODE_KEY = "chrysopedia:debug_mode"
|
|
|
|
|
|
# ── Public trigger ───────────────────────────────────────────────────────────
|
|
|
|
@router.post("/pipeline/trigger/{video_id}")
|
|
async def trigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video."""
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
from pipeline.stages import run_pipeline
|
|
|
|
try:
|
|
run_pipeline.delay(str(video.id), trigger="manual")
|
|
logger.info("Pipeline manually triggered for video_id=%s", video_id)
|
|
except Exception as exc:
|
|
logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
|
|
) from exc
|
|
|
|
return {
|
|
"status": "triggered",
|
|
"video_id": str(video.id),
|
|
"current_processing_status": video.processing_status.value,
|
|
}
|
|
|
|
|
|
# ── Admin: Video list ────────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/videos")
|
|
async def list_pipeline_videos(
|
|
db: AsyncSession = Depends(get_session),
|
|
offset: Annotated[int, Query(ge=0)] = 0,
|
|
limit: Annotated[int, Query(ge=1, le=500)] = 200,
|
|
status: Annotated[str | None, Query()] = None,
|
|
creator_id: Annotated[str | None, Query()] = None,
|
|
):
|
|
"""List videos with processing status and pipeline event counts.
|
|
|
|
Supports optional server-side pagination and filtering. Defaults
|
|
return all videos for backward compatibility with the frontend's
|
|
existing client-side filtering.
|
|
"""
|
|
# Subquery for event counts per video
|
|
event_counts = (
|
|
select(
|
|
PipelineEvent.video_id,
|
|
func.count().label("event_count"),
|
|
func.sum(case(
|
|
(PipelineEvent.event_type == "llm_call", PipelineEvent.total_tokens),
|
|
else_=0
|
|
)).label("total_tokens_used"),
|
|
func.max(PipelineEvent.created_at).label("last_event_at"),
|
|
)
|
|
.group_by(PipelineEvent.video_id)
|
|
.subquery()
|
|
)
|
|
|
|
# Subquery for the most recent pipeline run per video
|
|
latest_run = (
|
|
select(
|
|
PipelineRun.video_id,
|
|
PipelineRun.id.label("run_id"),
|
|
PipelineRun.run_number,
|
|
PipelineRun.trigger.label("run_trigger"),
|
|
PipelineRun.status.label("run_status"),
|
|
PipelineRun.started_at.label("run_started_at"),
|
|
PipelineRun.finished_at.label("run_finished_at"),
|
|
PipelineRun.error_stage.label("run_error_stage"),
|
|
PipelineRun.total_tokens.label("run_total_tokens"),
|
|
)
|
|
.order_by(PipelineRun.video_id, PipelineRun.started_at.desc())
|
|
.distinct(PipelineRun.video_id)
|
|
.subquery()
|
|
)
|
|
|
|
# Subquery for the most recent stage start event per video (active stage indicator)
|
|
latest_stage = (
|
|
select(
|
|
PipelineEvent.video_id,
|
|
PipelineEvent.stage.label("active_stage"),
|
|
PipelineEvent.event_type.label("active_stage_status"),
|
|
PipelineEvent.created_at.label("stage_started_at"),
|
|
)
|
|
.where(PipelineEvent.event_type.in_(["start", "complete", "error"]))
|
|
.order_by(PipelineEvent.video_id, PipelineEvent.created_at.desc())
|
|
.distinct(PipelineEvent.video_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
SourceVideo.id,
|
|
SourceVideo.filename,
|
|
SourceVideo.processing_status,
|
|
SourceVideo.content_hash,
|
|
SourceVideo.created_at,
|
|
SourceVideo.updated_at,
|
|
Creator.name.label("creator_name"),
|
|
event_counts.c.event_count,
|
|
event_counts.c.total_tokens_used,
|
|
event_counts.c.last_event_at,
|
|
latest_stage.c.active_stage,
|
|
latest_stage.c.active_stage_status,
|
|
latest_stage.c.stage_started_at,
|
|
latest_run.c.run_id,
|
|
latest_run.c.run_number,
|
|
latest_run.c.run_trigger,
|
|
latest_run.c.run_status,
|
|
latest_run.c.run_started_at,
|
|
latest_run.c.run_finished_at,
|
|
latest_run.c.run_error_stage,
|
|
latest_run.c.run_total_tokens,
|
|
)
|
|
.join(Creator, SourceVideo.creator_id == Creator.id)
|
|
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
|
|
.outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id)
|
|
.outerjoin(latest_run, SourceVideo.id == latest_run.c.video_id)
|
|
)
|
|
|
|
# Optional filters
|
|
if status:
|
|
stmt = stmt.where(SourceVideo.processing_status == status)
|
|
if creator_id:
|
|
stmt = stmt.where(SourceVideo.creator_id == creator_id)
|
|
|
|
# Total count before pagination
|
|
count_result = await db.execute(
|
|
select(func.count()).select_from(stmt.subquery())
|
|
)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Apply ordering and pagination
|
|
stmt = stmt.order_by(SourceVideo.updated_at.desc()).offset(offset).limit(limit)
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(r.id),
|
|
"filename": r.filename,
|
|
"processing_status": r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status),
|
|
"content_hash": r.content_hash,
|
|
"creator_name": r.creator_name,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None,
|
|
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
|
"event_count": r.event_count or 0,
|
|
"total_tokens_used": r.total_tokens_used or 0,
|
|
"last_event_at": r.last_event_at.isoformat() if r.last_event_at else None,
|
|
"active_stage": r.active_stage,
|
|
"active_stage_status": r.active_stage_status,
|
|
"stage_started_at": r.stage_started_at.isoformat() if r.stage_started_at else None,
|
|
"latest_run": {
|
|
"id": str(r.run_id),
|
|
"run_number": r.run_number,
|
|
"trigger": r.run_trigger.value if hasattr(r.run_trigger, 'value') else r.run_trigger,
|
|
"status": r.run_status.value if hasattr(r.run_status, 'value') else r.run_status,
|
|
"started_at": r.run_started_at.isoformat() if r.run_started_at else None,
|
|
"finished_at": r.run_finished_at.isoformat() if r.run_finished_at else None,
|
|
"error_stage": r.run_error_stage,
|
|
"total_tokens": r.run_total_tokens or 0,
|
|
} if r.run_id else None,
|
|
}
|
|
for r in rows
|
|
],
|
|
"total": total,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
|
|
# ── Admin: Technique Pages ───────────────────────────────────────────────────
|
|
|
|
@router.get(
|
|
"/admin/pipeline/technique-pages",
|
|
response_model=AdminTechniquePageListResponse,
|
|
)
|
|
async def list_admin_technique_pages(
|
|
multi_source_only: bool = False,
|
|
creator: Annotated[str | None, Query(description="Filter by creator slug")] = None,
|
|
sort: Annotated[str, Query(description="Sort: recent, alpha, creator")] = "recent",
|
|
offset: Annotated[int, Query(ge=0)] = 0,
|
|
limit: Annotated[int, Query(ge=1, le=200)] = 50,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""List technique pages with source video counts, version counts, and creator info.
|
|
|
|
Supports filtering by multi-source pages only and by creator slug.
|
|
"""
|
|
# Correlated subquery: source video count per page
|
|
video_count_sq = (
|
|
select(func.count())
|
|
.select_from(TechniquePageVideo)
|
|
.where(TechniquePageVideo.technique_page_id == TechniquePage.id)
|
|
.correlate(TechniquePage)
|
|
.scalar_subquery()
|
|
.label("source_video_count")
|
|
)
|
|
|
|
# Correlated subquery: version count per page
|
|
version_count_sq = (
|
|
select(func.count())
|
|
.select_from(TechniquePageVersion)
|
|
.where(TechniquePageVersion.technique_page_id == TechniquePage.id)
|
|
.correlate(TechniquePage)
|
|
.scalar_subquery()
|
|
.label("version_count")
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
TechniquePage.id,
|
|
TechniquePage.title,
|
|
TechniquePage.slug,
|
|
TechniquePage.topic_category,
|
|
TechniquePage.body_sections_format,
|
|
TechniquePage.created_at,
|
|
TechniquePage.updated_at,
|
|
Creator.name.label("creator_name"),
|
|
Creator.slug.label("creator_slug"),
|
|
video_count_sq,
|
|
version_count_sq,
|
|
)
|
|
.join(Creator, TechniquePage.creator_id == Creator.id)
|
|
)
|
|
|
|
# Filters
|
|
if multi_source_only:
|
|
stmt = stmt.where(video_count_sq > 1)
|
|
if creator:
|
|
stmt = stmt.where(Creator.slug == creator)
|
|
|
|
# Count total before pagination
|
|
count_stmt = select(func.count()).select_from(stmt.subquery())
|
|
total = (await db.execute(count_stmt)).scalar() or 0
|
|
|
|
# Sort
|
|
if sort == "alpha":
|
|
stmt = stmt.order_by(TechniquePage.title.asc())
|
|
elif sort == "creator":
|
|
stmt = stmt.order_by(Creator.name.asc(), TechniquePage.title.asc())
|
|
else: # "recent" default
|
|
stmt = stmt.order_by(TechniquePage.updated_at.desc())
|
|
|
|
stmt = stmt.offset(offset).limit(limit)
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
items = [
|
|
AdminTechniquePageItem(
|
|
id=r.id,
|
|
title=r.title,
|
|
slug=r.slug,
|
|
creator_name=r.creator_name,
|
|
creator_slug=r.creator_slug,
|
|
topic_category=r.topic_category,
|
|
body_sections_format=r.body_sections_format,
|
|
source_video_count=r.source_video_count or 0,
|
|
version_count=r.version_count or 0,
|
|
created_at=r.created_at,
|
|
updated_at=r.updated_at,
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
return AdminTechniquePageListResponse(
|
|
items=items,
|
|
total=total,
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
# ── Admin: Retrigger ─────────────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/trigger/{video_id}")
|
|
async def admin_trigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Admin retrigger — same as public trigger."""
|
|
return await trigger_pipeline(video_id, db)
|
|
|
|
|
|
# ── Admin: Clean Retrigger ───────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/clean-retrigger/{video_id}")
|
|
async def clean_retrigger_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Wipe prior pipeline output for a video, then retrigger.
|
|
|
|
Deletes: pipeline_events, key_moments, transcript_segments,
|
|
and associated Qdrant vectors. Resets processing_status to 'not_started'.
|
|
Does NOT delete technique_pages — the pipeline re-synthesizes via upsert.
|
|
"""
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
# Delete pipeline events
|
|
await db.execute(
|
|
PipelineEvent.__table__.delete().where(PipelineEvent.video_id == video_id)
|
|
)
|
|
# Delete key moments
|
|
await db.execute(
|
|
KeyMoment.__table__.delete().where(KeyMoment.source_video_id == video_id)
|
|
)
|
|
# Note: transcript_segments are NOT deleted — they are the pipeline's input
|
|
# data created during ingest, not pipeline output. Deleting them would leave
|
|
# the pipeline with nothing to process.
|
|
|
|
# Reset status
|
|
video.processing_status = ProcessingStatus.not_started
|
|
await db.commit()
|
|
|
|
deleted_counts = {
|
|
"pipeline_events": "cleared",
|
|
"key_moments": "cleared",
|
|
}
|
|
|
|
# Best-effort Qdrant cleanup (non-blocking)
|
|
try:
|
|
settings = get_settings()
|
|
from pipeline.qdrant_client import QdrantManager
|
|
qdrant = QdrantManager(settings)
|
|
qdrant.delete_by_video_id(str(video_id))
|
|
deleted_counts["qdrant_vectors"] = "cleared"
|
|
except Exception as exc:
|
|
logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc)
|
|
deleted_counts["qdrant_vectors"] = f"skipped: {exc}"
|
|
|
|
# Clear Redis classification/prior-pages cache to prevent stale data
|
|
try:
|
|
import redis as redis_lib
|
|
settings = get_settings()
|
|
r = redis_lib.Redis.from_url(settings.redis_url)
|
|
r.delete(f"chrysopedia:classification:{video_id}")
|
|
r.delete(f"chrysopedia:prior_pages:{video_id}")
|
|
deleted_counts["redis_cache"] = "cleared"
|
|
logger.info("Redis cache cleared for video_id=%s", video_id)
|
|
except Exception as exc:
|
|
logger.warning("Redis cache cleanup failed for video_id=%s: %s", video_id, exc)
|
|
deleted_counts["redis_cache"] = f"skipped: {exc}"
|
|
|
|
# Clear durable classification_data column too
|
|
video.classification_data = None
|
|
await db.commit()
|
|
|
|
# Now trigger the pipeline
|
|
from pipeline.stages import run_pipeline
|
|
try:
|
|
run_pipeline.delay(str(video.id), trigger="clean_reprocess")
|
|
logger.info("Clean retrigger dispatched for video_id=%s", video_id)
|
|
except Exception as exc:
|
|
logger.warning("Failed to dispatch pipeline after cleanup for video_id=%s: %s", video_id, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Cleanup succeeded but pipeline dispatch failed — Celery/Redis may be unavailable",
|
|
) from exc
|
|
|
|
return {
|
|
"status": "clean_retriggered",
|
|
"video_id": str(video.id),
|
|
"cleaned": deleted_counts,
|
|
}
|
|
|
|
|
|
# ── Admin: Re-run Single Stage ──────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/rerun-stage/{video_id}/{stage_name}")
|
|
async def rerun_stage(
|
|
video_id: str,
|
|
stage_name: str,
|
|
prompt_override: str | None = None,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Re-run a single pipeline stage without running predecessors.
|
|
|
|
Designed for fast prompt iteration — especially stage 5 synthesis.
|
|
Optionally accepts a prompt_override string that temporarily replaces
|
|
the on-disk prompt template for this run only.
|
|
|
|
Valid stage names: stage2_segmentation, stage3_extraction,
|
|
stage4_classification, stage5_synthesis, stage6_embed_and_index.
|
|
"""
|
|
from pipeline.stages import _PIPELINE_STAGES, run_single_stage
|
|
|
|
# Validate stage name
|
|
if stage_name not in _PIPELINE_STAGES:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid stage '{stage_name}'. Valid: {_PIPELINE_STAGES}",
|
|
)
|
|
|
|
# Validate video exists
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
# Dispatch single-stage re-run
|
|
try:
|
|
run_single_stage.delay(
|
|
str(video.id),
|
|
stage_name,
|
|
trigger="stage_rerun",
|
|
prompt_override=prompt_override,
|
|
)
|
|
logger.info(
|
|
"Single-stage re-run dispatched: video_id=%s, stage=%s, prompt_override=%s",
|
|
video_id, stage_name, "yes" if prompt_override else "no",
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to dispatch single-stage re-run for video_id=%s: %s", video_id, exc,
|
|
)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Stage re-run dispatch failed — Celery/Redis may be unavailable",
|
|
) from exc
|
|
|
|
return {
|
|
"status": "stage_rerun_dispatched",
|
|
"video_id": str(video.id),
|
|
"stage": stage_name,
|
|
"prompt_override": bool(prompt_override),
|
|
}
|
|
|
|
|
|
# ── Admin: Chunking Inspector ───────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/chunking/{video_id}")
|
|
async def get_chunking_data(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Return chunking/grouping data for a video — topic boundaries, classifications,
|
|
and synthesis group breakdowns.
|
|
|
|
Helps diagnose whether bad synthesis output is a prompt problem or a data shape problem.
|
|
"""
|
|
from config import get_settings
|
|
|
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
|
result = await db.execute(stmt)
|
|
video = result.scalar_one_or_none()
|
|
if video is None:
|
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
|
|
|
settings = get_settings()
|
|
|
|
# 1. Topic boundaries (stage 2 output): segments grouped by topic_label
|
|
segments = (
|
|
await db.execute(
|
|
select(TranscriptSegment)
|
|
.where(TranscriptSegment.source_video_id == video_id)
|
|
.order_by(TranscriptSegment.segment_index)
|
|
)
|
|
).scalars().all()
|
|
|
|
topic_boundaries: list[dict] = []
|
|
current_label = None
|
|
current_group: list[dict] = []
|
|
|
|
for seg in segments:
|
|
label = seg.topic_label or "unlabeled"
|
|
if label != current_label:
|
|
if current_group:
|
|
topic_boundaries.append({
|
|
"topic_label": current_label,
|
|
"segment_count": len(current_group),
|
|
"start_time": current_group[0]["start_time"],
|
|
"end_time": current_group[-1]["end_time"],
|
|
"start_index": current_group[0]["segment_index"],
|
|
"end_index": current_group[-1]["segment_index"],
|
|
})
|
|
current_label = label
|
|
current_group = []
|
|
|
|
current_group.append({
|
|
"start_time": seg.start_time,
|
|
"end_time": seg.end_time,
|
|
"segment_index": seg.segment_index,
|
|
})
|
|
|
|
if current_group:
|
|
topic_boundaries.append({
|
|
"topic_label": current_label,
|
|
"segment_count": len(current_group),
|
|
"start_time": current_group[0]["start_time"],
|
|
"end_time": current_group[-1]["end_time"],
|
|
"start_index": current_group[0]["segment_index"],
|
|
"end_index": current_group[-1]["segment_index"],
|
|
})
|
|
|
|
# 2. Key moments (stage 3 output)
|
|
moments = (
|
|
await db.execute(
|
|
select(KeyMoment)
|
|
.where(KeyMoment.source_video_id == video_id)
|
|
.order_by(KeyMoment.start_time)
|
|
)
|
|
).scalars().all()
|
|
|
|
key_moments = [
|
|
{
|
|
"id": str(m.id),
|
|
"title": m.title,
|
|
"content_type": m.content_type.value,
|
|
"start_time": m.start_time,
|
|
"end_time": m.end_time,
|
|
"plugins": m.plugins or [],
|
|
"technique_page_id": str(m.technique_page_id) if m.technique_page_id else None,
|
|
}
|
|
for m in moments
|
|
]
|
|
|
|
# 3. Classification data (stage 4 output)
|
|
classification: list[dict] = []
|
|
cls_source = "missing"
|
|
|
|
if video.classification_data:
|
|
classification = video.classification_data
|
|
cls_source = "postgresql"
|
|
else:
|
|
try:
|
|
import redis as redis_lib
|
|
r = redis_lib.Redis.from_url(settings.redis_url)
|
|
raw = r.get(f"chrysopedia:classification:{video_id}")
|
|
if raw:
|
|
classification = __import__("json").loads(raw)
|
|
cls_source = "redis"
|
|
except Exception:
|
|
pass
|
|
|
|
# 4. Synthesis groups (how stage 5 would group moments)
|
|
cls_by_moment_id = {c["moment_id"]: c for c in classification}
|
|
synthesis_groups: dict[str, dict] = {}
|
|
|
|
for m in moments:
|
|
cls_info = cls_by_moment_id.get(str(m.id), {})
|
|
category = cls_info.get("topic_category", "Uncategorized").strip().title()
|
|
if category not in synthesis_groups:
|
|
synthesis_groups[category] = {
|
|
"category": category,
|
|
"moment_count": 0,
|
|
"moment_ids": [],
|
|
"exceeds_chunk_threshold": False,
|
|
}
|
|
synthesis_groups[category]["moment_count"] += 1
|
|
synthesis_groups[category]["moment_ids"].append(str(m.id))
|
|
|
|
chunk_size = settings.synthesis_chunk_size
|
|
for group in synthesis_groups.values():
|
|
group["exceeds_chunk_threshold"] = group["moment_count"] > chunk_size
|
|
group["chunks_needed"] = max(1, -(-group["moment_count"] // chunk_size)) # ceil division
|
|
|
|
return {
|
|
"video_id": str(video.id),
|
|
"total_segments": len(segments),
|
|
"total_moments": len(moments),
|
|
"classification_source": cls_source,
|
|
"synthesis_chunk_size": chunk_size,
|
|
"topic_boundaries": topic_boundaries,
|
|
"key_moments": key_moments,
|
|
"classification": classification,
|
|
"synthesis_groups": list(synthesis_groups.values()),
|
|
}
|
|
|
|
|
|
# ── Admin: Revoke ────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/revoke/{video_id}")
|
|
async def revoke_pipeline(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Revoke/cancel active Celery tasks for a video.
|
|
|
|
Uses Celery's revoke with terminate=True to kill running tasks.
|
|
Also marks the latest running pipeline_run as cancelled.
|
|
This is best-effort — the task may have already completed.
|
|
"""
|
|
from worker import celery_app
|
|
|
|
try:
|
|
# Get active tasks and revoke any matching this video_id
|
|
inspector = celery_app.control.inspect()
|
|
active = inspector.active() or {}
|
|
revoked_count = 0
|
|
|
|
for _worker, tasks in active.items():
|
|
for task in tasks:
|
|
task_args = task.get("args", [])
|
|
if task_args and str(task_args[0]) == video_id:
|
|
celery_app.control.revoke(task["id"], terminate=True)
|
|
revoked_count += 1
|
|
logger.info("Revoked task %s for video_id=%s", task["id"], video_id)
|
|
|
|
# Mark any running pipeline_runs as cancelled
|
|
running_runs = await db.execute(
|
|
select(PipelineRun).where(
|
|
PipelineRun.video_id == video_id,
|
|
PipelineRun.status == PipelineRunStatus.running,
|
|
)
|
|
)
|
|
for run in running_runs.scalars().all():
|
|
run.status = PipelineRunStatus.cancelled
|
|
run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
await db.commit()
|
|
|
|
return {
|
|
"status": "revoked" if revoked_count > 0 else "no_active_tasks",
|
|
"video_id": video_id,
|
|
"tasks_revoked": revoked_count,
|
|
}
|
|
except Exception as exc:
|
|
logger.warning("Failed to revoke tasks for video_id=%s: %s", video_id, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Failed to communicate with Celery worker",
|
|
) from exc
|
|
|
|
|
|
# ── Admin: Recent activity feed ──────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/recent-activity")
|
|
async def recent_pipeline_activity(
|
|
limit: Annotated[int, Query(ge=1, le=20)] = 10,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Get the most recent pipeline stage completions and errors with video context."""
|
|
stmt = (
|
|
select(
|
|
PipelineEvent.id,
|
|
PipelineEvent.video_id,
|
|
PipelineEvent.stage,
|
|
PipelineEvent.event_type,
|
|
PipelineEvent.total_tokens,
|
|
PipelineEvent.duration_ms,
|
|
PipelineEvent.created_at,
|
|
SourceVideo.filename,
|
|
Creator.name.label("creator_name"),
|
|
)
|
|
.join(SourceVideo, PipelineEvent.video_id == SourceVideo.id)
|
|
.join(Creator, SourceVideo.creator_id == Creator.id)
|
|
.where(PipelineEvent.event_type.in_(["complete", "error"]))
|
|
.order_by(PipelineEvent.created_at.desc())
|
|
.limit(limit)
|
|
)
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(r.id),
|
|
"video_id": str(r.video_id),
|
|
"filename": r.filename,
|
|
"creator_name": r.creator_name,
|
|
"stage": r.stage,
|
|
"event_type": r.event_type,
|
|
"total_tokens": r.total_tokens,
|
|
"duration_ms": r.duration_ms,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None,
|
|
}
|
|
for r in rows
|
|
],
|
|
}
|
|
|
|
|
|
# ── Admin: Pipeline runs ─────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/runs/{video_id}")
|
|
async def list_pipeline_runs(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""List all pipeline runs for a video, newest first."""
|
|
# Count events per run
|
|
event_counts = (
|
|
select(
|
|
PipelineEvent.run_id,
|
|
func.count().label("event_count"),
|
|
)
|
|
.where(PipelineEvent.run_id.isnot(None))
|
|
.group_by(PipelineEvent.run_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
PipelineRun,
|
|
event_counts.c.event_count,
|
|
)
|
|
.outerjoin(event_counts, PipelineRun.id == event_counts.c.run_id)
|
|
.where(PipelineRun.video_id == video_id)
|
|
.order_by(PipelineRun.started_at.desc())
|
|
)
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
# Also count legacy events (run_id IS NULL) for this video
|
|
legacy_count_result = await db.execute(
|
|
select(func.count())
|
|
.select_from(PipelineEvent)
|
|
.where(PipelineEvent.video_id == video_id, PipelineEvent.run_id.is_(None))
|
|
)
|
|
legacy_count = legacy_count_result.scalar() or 0
|
|
|
|
items = []
|
|
for run, evt_count in rows:
|
|
items.append({
|
|
"id": str(run.id),
|
|
"run_number": run.run_number,
|
|
"trigger": run.trigger.value if hasattr(run.trigger, 'value') else str(run.trigger),
|
|
"status": run.status.value if hasattr(run.status, 'value') else str(run.status),
|
|
"started_at": run.started_at.isoformat() if run.started_at else None,
|
|
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
|
|
"error_stage": run.error_stage,
|
|
"total_tokens": run.total_tokens or 0,
|
|
"event_count": evt_count or 0,
|
|
})
|
|
|
|
return {
|
|
"items": items,
|
|
"legacy_event_count": legacy_count,
|
|
}
|
|
|
|
|
|
# ── Admin: Event log ─────────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/events/{video_id}")
|
|
async def list_pipeline_events(
|
|
video_id: str,
|
|
offset: Annotated[int, Query(ge=0)] = 0,
|
|
limit: Annotated[int, Query(ge=1, le=200)] = 100,
|
|
stage: Annotated[str | None, Query(description="Filter by stage name")] = None,
|
|
event_type: Annotated[str | None, Query(description="Filter by event type")] = None,
|
|
run_id: Annotated[str | None, Query(description="Filter by pipeline run ID")] = None,
|
|
order: Annotated[str, Query(description="Sort order: asc or desc")] = "desc",
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Get pipeline events for a video. Default: newest first (desc)."""
|
|
stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id)
|
|
|
|
if run_id:
|
|
stmt = stmt.where(PipelineEvent.run_id == run_id)
|
|
if stage:
|
|
stmt = stmt.where(PipelineEvent.stage == stage)
|
|
if event_type:
|
|
stmt = stmt.where(PipelineEvent.event_type == event_type)
|
|
|
|
# Validate order param
|
|
if order not in ("asc", "desc"):
|
|
raise HTTPException(status_code=400, detail="order must be 'asc' or 'desc'")
|
|
|
|
# Count
|
|
count_stmt = select(func.count()).select_from(stmt.subquery())
|
|
total = (await db.execute(count_stmt)).scalar() or 0
|
|
|
|
# Fetch
|
|
order_clause = PipelineEvent.created_at.asc() if order == "asc" else PipelineEvent.created_at.desc()
|
|
stmt = stmt.order_by(order_clause)
|
|
stmt = stmt.offset(offset).limit(limit)
|
|
result = await db.execute(stmt)
|
|
events = result.scalars().all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(e.id),
|
|
"video_id": str(e.video_id),
|
|
"stage": e.stage,
|
|
"event_type": e.event_type,
|
|
"prompt_tokens": e.prompt_tokens,
|
|
"completion_tokens": e.completion_tokens,
|
|
"total_tokens": e.total_tokens,
|
|
"model": e.model,
|
|
"duration_ms": e.duration_ms,
|
|
"payload": e.payload,
|
|
"created_at": e.created_at.isoformat() if e.created_at else None,
|
|
"system_prompt_text": e.system_prompt_text,
|
|
"user_prompt_text": e.user_prompt_text,
|
|
"response_text": e.response_text,
|
|
}
|
|
for e in events
|
|
],
|
|
"total": total,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
|
|
# ── Admin: Debug mode ─────────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/debug-mode", response_model=DebugModeResponse)
|
|
async def get_debug_mode() -> DebugModeResponse:
|
|
"""Get the current pipeline debug mode (on/off)."""
|
|
settings = get_settings()
|
|
try:
|
|
redis = await get_redis()
|
|
try:
|
|
value = await redis.get(REDIS_DEBUG_MODE_KEY)
|
|
if value is not None:
|
|
return DebugModeResponse(debug_mode=value.lower() == "true")
|
|
finally:
|
|
await redis.aclose()
|
|
except Exception as exc:
|
|
logger.warning("Redis unavailable for debug mode read, using config default: %s", exc)
|
|
|
|
return DebugModeResponse(debug_mode=settings.debug_mode)
|
|
|
|
|
|
@router.put("/admin/pipeline/debug-mode", response_model=DebugModeResponse)
|
|
async def set_debug_mode(body: DebugModeUpdate) -> DebugModeResponse:
|
|
"""Set the pipeline debug mode (on/off)."""
|
|
try:
|
|
redis = await get_redis()
|
|
try:
|
|
await redis.set(REDIS_DEBUG_MODE_KEY, str(body.debug_mode))
|
|
finally:
|
|
await redis.aclose()
|
|
except Exception as exc:
|
|
logger.error("Failed to set debug mode in Redis: %s", exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail=f"Redis unavailable: {exc}",
|
|
)
|
|
|
|
logger.info("Pipeline debug mode set to %s", body.debug_mode)
|
|
return DebugModeResponse(debug_mode=body.debug_mode)
|
|
|
|
|
|
# ── Admin: Token summary ─────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/token-summary/{video_id}", response_model=TokenSummaryResponse)
|
|
async def get_token_summary(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> TokenSummaryResponse:
|
|
"""Get per-stage token usage summary for a video."""
|
|
stmt = (
|
|
select(
|
|
PipelineEvent.stage,
|
|
func.count().label("call_count"),
|
|
func.coalesce(func.sum(PipelineEvent.prompt_tokens), 0).label("total_prompt_tokens"),
|
|
func.coalesce(func.sum(PipelineEvent.completion_tokens), 0).label("total_completion_tokens"),
|
|
func.coalesce(func.sum(PipelineEvent.total_tokens), 0).label("total_tokens"),
|
|
)
|
|
.where(PipelineEvent.video_id == video_id)
|
|
.where(PipelineEvent.event_type == "llm_call")
|
|
.group_by(PipelineEvent.stage)
|
|
.order_by(PipelineEvent.stage)
|
|
)
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
stages = [
|
|
TokenStageSummary(
|
|
stage=r.stage,
|
|
call_count=r.call_count,
|
|
total_prompt_tokens=r.total_prompt_tokens,
|
|
total_completion_tokens=r.total_completion_tokens,
|
|
total_tokens=r.total_tokens,
|
|
)
|
|
for r in rows
|
|
]
|
|
grand_total = sum(s.total_tokens for s in stages)
|
|
|
|
return TokenSummaryResponse(
|
|
video_id=video_id,
|
|
stages=stages,
|
|
grand_total_tokens=grand_total,
|
|
)
|
|
|
|
|
|
# ── Admin: Stale Pages ──────────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/stale-pages")
|
|
async def get_stale_pages(
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Detect technique pages synthesized with an older prompt than the current one.
|
|
|
|
Compares the SHA-256 hash of the current stage5_synthesis.txt against the
|
|
prompt hashes stored in TechniquePageVersion.pipeline_metadata.
|
|
|
|
Uses a single query with a window function to fetch the latest version per
|
|
page joined to creators, eliminating N+1 queries.
|
|
"""
|
|
import hashlib
|
|
from pathlib import Path as _Path
|
|
from models import TechniquePage, TechniquePageVersion
|
|
|
|
settings = get_settings()
|
|
prompt_path = _Path(settings.prompts_path) / "stage5_synthesis.txt"
|
|
|
|
if not prompt_path.exists():
|
|
raise HTTPException(status_code=500, detail="stage5_synthesis.txt not found")
|
|
|
|
current_hash = hashlib.sha256(
|
|
prompt_path.read_text(encoding="utf-8").encode()
|
|
).hexdigest()[:12]
|
|
|
|
# Subquery: latest version per technique page via row_number window
|
|
latest_version = (
|
|
select(
|
|
TechniquePageVersion.technique_page_id,
|
|
TechniquePageVersion.pipeline_metadata,
|
|
func.row_number().over(
|
|
partition_by=TechniquePageVersion.technique_page_id,
|
|
order_by=TechniquePageVersion.version_number.desc(),
|
|
).label("rn"),
|
|
)
|
|
.subquery("latest_version")
|
|
)
|
|
|
|
# Main query: pages + creator name + latest version metadata in one shot
|
|
rows = (
|
|
await db.execute(
|
|
select(
|
|
TechniquePage.slug,
|
|
TechniquePage.creator_id,
|
|
Creator.name.label("creator_name"),
|
|
latest_version.c.pipeline_metadata,
|
|
)
|
|
.join(Creator, Creator.id == TechniquePage.creator_id)
|
|
.outerjoin(
|
|
latest_version,
|
|
(latest_version.c.technique_page_id == TechniquePage.id)
|
|
& (latest_version.c.rn == 1),
|
|
)
|
|
)
|
|
).all()
|
|
|
|
total = len(rows)
|
|
stale_count = 0
|
|
fresh_count = 0
|
|
stale_by_creator: dict[str, dict] = {}
|
|
|
|
for slug, _creator_id, creator_name, meta in rows:
|
|
page_hash = None
|
|
if meta:
|
|
page_hash = meta.get("prompt_hash", meta.get("stage5_prompt_hash"))
|
|
|
|
if page_hash == current_hash:
|
|
fresh_count += 1
|
|
else:
|
|
stale_count += 1
|
|
name = creator_name or "Unknown"
|
|
if name not in stale_by_creator:
|
|
stale_by_creator[name] = {"creator": name, "stale_count": 0, "page_slugs": []}
|
|
stale_by_creator[name]["stale_count"] += 1
|
|
stale_by_creator[name]["page_slugs"].append(slug)
|
|
|
|
return {
|
|
"current_prompt_hash": current_hash,
|
|
"total_pages": total,
|
|
"stale_pages": stale_count,
|
|
"fresh_pages": fresh_count,
|
|
"stale_by_creator": list(stale_by_creator.values()),
|
|
}
|
|
|
|
|
|
# ── Admin: Bulk Re-Synthesize ───────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/bulk-resynthesize")
|
|
async def bulk_resynthesize(
|
|
video_ids: list[str] | None = None,
|
|
stage: str = "stage5_synthesis",
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Re-run a single stage on multiple videos without full pipeline reset.
|
|
|
|
If video_ids is None, targets all videos with processing_status='complete'.
|
|
Rate-limited to avoid overwhelming the worker queue.
|
|
"""
|
|
from pipeline.stages import _PIPELINE_STAGES, run_single_stage
|
|
|
|
if stage not in _PIPELINE_STAGES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid stage: {stage}")
|
|
|
|
if video_ids is None:
|
|
result = await db.execute(
|
|
select(SourceVideo.id).where(
|
|
SourceVideo.processing_status == ProcessingStatus.complete
|
|
)
|
|
)
|
|
video_ids = [str(row[0]) for row in result.all()]
|
|
|
|
if not video_ids:
|
|
return {"status": "no_videos", "dispatched": 0, "skipped": 0, "total": 0}
|
|
|
|
dispatched = 0
|
|
skipped = []
|
|
for vid in video_ids:
|
|
try:
|
|
run_single_stage.delay(vid, stage, trigger="stage_rerun")
|
|
dispatched += 1
|
|
except Exception as exc:
|
|
logger.warning("Bulk re-synth dispatch failed for video_id=%s: %s", vid, exc)
|
|
skipped.append({"video_id": vid, "reason": str(exc)})
|
|
|
|
logger.info(
|
|
"[BULK-RESYNTH] Dispatched %d/%d %s re-runs",
|
|
dispatched, len(video_ids), stage,
|
|
)
|
|
|
|
return {
|
|
"status": "dispatched",
|
|
"stage": stage,
|
|
"total": len(video_ids),
|
|
"dispatched": dispatched,
|
|
"skipped": skipped if skipped else None,
|
|
}
|
|
|
|
|
|
# ── Admin: Wipe All Output ──────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/wipe-all-output")
|
|
async def wipe_all_output(
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Wipe ALL pipeline output while preserving raw input data.
|
|
|
|
Deletes: technique_page_versions, related_technique_links, technique_pages,
|
|
key_moments, pipeline_events, pipeline_runs, Qdrant vectors, Redis cache.
|
|
Resets: all video processing_status to 'not_started', classification_data to null.
|
|
Preserves: creators, source_videos (metadata), transcript_segments.
|
|
"""
|
|
from models import TechniquePageVersion, TechniquePage, RelatedTechniqueLink, PipelineRun
|
|
|
|
counts = {}
|
|
|
|
# Order matters due to FK constraints: versions → links → pages → moments → events → runs
|
|
|
|
# 1. Technique page versions
|
|
result = await db.execute(TechniquePageVersion.__table__.delete())
|
|
counts["technique_page_versions"] = result.rowcount
|
|
|
|
# 2. Related technique links
|
|
result = await db.execute(RelatedTechniqueLink.__table__.delete())
|
|
counts["related_technique_links"] = result.rowcount
|
|
|
|
# 3. Key moments (FK to technique_pages, must clear before pages)
|
|
result = await db.execute(KeyMoment.__table__.delete())
|
|
counts["key_moments"] = result.rowcount
|
|
|
|
# 4. Technique pages
|
|
result = await db.execute(TechniquePage.__table__.delete())
|
|
counts["technique_pages"] = result.rowcount
|
|
|
|
# 5. Pipeline events
|
|
result = await db.execute(PipelineEvent.__table__.delete())
|
|
counts["pipeline_events"] = result.rowcount
|
|
|
|
# 6. Pipeline runs
|
|
result = await db.execute(PipelineRun.__table__.delete())
|
|
counts["pipeline_runs"] = result.rowcount
|
|
|
|
# 7. Reset all video statuses and classification data
|
|
from sqlalchemy import update
|
|
result = await db.execute(
|
|
update(SourceVideo).values(
|
|
processing_status=ProcessingStatus.not_started,
|
|
classification_data=None,
|
|
)
|
|
)
|
|
counts["videos_reset"] = result.rowcount
|
|
|
|
await db.commit()
|
|
|
|
# 8. Clear Qdrant vectors (best-effort)
|
|
try:
|
|
settings = get_settings()
|
|
from pipeline.qdrant_client import QdrantManager
|
|
qdrant = QdrantManager(settings)
|
|
qdrant.ensure_collection()
|
|
# Delete entire collection and recreate empty
|
|
from qdrant_client.models import Distance, VectorParams
|
|
qdrant.client.delete_collection(settings.qdrant_collection)
|
|
qdrant.client.create_collection(
|
|
collection_name=settings.qdrant_collection,
|
|
vectors_config=VectorParams(
|
|
size=settings.embedding_dimensions,
|
|
distance=Distance.COSINE,
|
|
),
|
|
)
|
|
counts["qdrant"] = "collection_recreated"
|
|
except Exception as exc:
|
|
logger.warning("Qdrant wipe failed: %s", exc)
|
|
counts["qdrant"] = f"skipped: {exc}"
|
|
|
|
# 9. Clear Redis pipeline cache keys (best-effort)
|
|
try:
|
|
import redis as redis_lib
|
|
settings = get_settings()
|
|
r = redis_lib.Redis.from_url(settings.redis_url)
|
|
cursor = 0
|
|
deleted_keys = 0
|
|
while True:
|
|
cursor, keys = r.scan(cursor, match="chrysopedia:*", count=100)
|
|
if keys:
|
|
r.delete(*keys)
|
|
deleted_keys += len(keys)
|
|
if cursor == 0:
|
|
break
|
|
counts["redis_keys_deleted"] = deleted_keys
|
|
except Exception as exc:
|
|
logger.warning("Redis wipe failed: %s", exc)
|
|
counts["redis"] = f"skipped: {exc}"
|
|
|
|
logger.info("[WIPE] All pipeline output wiped: %s", counts)
|
|
|
|
return {
|
|
"status": "wiped",
|
|
"deleted": counts,
|
|
}
|
|
|
|
|
|
# ── Admin: Prompt Optimization ──────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/optimize-prompt/{stage}")
|
|
async def trigger_optimization(
|
|
stage: int,
|
|
video_id: str = Query(..., description="Video UUID to use as fixture source"),
|
|
iterations: int = Query(5, description="Number of optimization iterations"),
|
|
variants_per_iter: int = Query(2, description="Variants per iteration"),
|
|
):
|
|
"""Trigger an automated prompt optimization run as a background Celery task.
|
|
|
|
Exports a fixture from the specified video, then runs the optimization loop
|
|
with generate-score-select cycles. Progress is tracked in
|
|
quality/results/progress_stage{N}.json.
|
|
"""
|
|
from pipeline.quality.scorer import STAGE_CONFIGS
|
|
|
|
if stage not in STAGE_CONFIGS:
|
|
raise HTTPException(status_code=400, detail=f"Unsupported stage {stage}. Valid: {sorted(STAGE_CONFIGS)}")
|
|
|
|
# Dispatch as a Celery task
|
|
from worker import celery_app
|
|
|
|
@celery_app.task
|
|
def _run_optimization(video_id: str, stage: int, iterations: int, variants_per_iter: int) -> dict:
|
|
"""Background optimization task."""
|
|
import tempfile
|
|
from pipeline.export_fixture import export_fixture
|
|
from pipeline.quality.optimizer import OptimizationLoop
|
|
from pipeline.quality.__main__ import write_results_json
|
|
from config import get_settings as _get_settings
|
|
from pipeline.llm_client import LLMClient as _LLMClient
|
|
|
|
settings = _get_settings()
|
|
|
|
# Export fixture
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".json", prefix="opt_fixture_", delete=False)
|
|
tmp.close()
|
|
exit_code = export_fixture(settings.database_url, settings.redis_url, video_id, tmp.name)
|
|
if exit_code != 0:
|
|
return {"error": f"Fixture export failed (exit code {exit_code})"}
|
|
|
|
client = _LLMClient(settings)
|
|
loop = OptimizationLoop(
|
|
client=client,
|
|
stage=stage,
|
|
fixture_path=tmp.name,
|
|
iterations=iterations,
|
|
variants_per_iter=variants_per_iter,
|
|
output_dir="backend/pipeline/quality/results/",
|
|
)
|
|
result = loop.run()
|
|
|
|
json_path = write_results_json(
|
|
result=result, output_dir="backend/pipeline/quality/results/",
|
|
stage=stage, iterations=iterations, variants_per_iter=variants_per_iter,
|
|
fixture_path=tmp.name,
|
|
)
|
|
return {
|
|
"best_score": result.best_score.composite,
|
|
"results_path": json_path,
|
|
"iterations": iterations,
|
|
}
|
|
|
|
try:
|
|
task = _run_optimization.delay(video_id, stage, iterations, variants_per_iter)
|
|
logger.info(
|
|
"[OPTIMIZE] Dispatched optimization: stage=%d, video_id=%s, iterations=%d, task_id=%s",
|
|
stage, video_id, iterations, task.id,
|
|
)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=503, detail=f"Failed to dispatch optimization: {exc}") from exc
|
|
|
|
return {
|
|
"status": "dispatched",
|
|
"task_id": task.id,
|
|
"stage": stage,
|
|
"video_id": video_id,
|
|
"iterations": iterations,
|
|
"variants_per_iter": variants_per_iter,
|
|
}
|
|
|
|
|
|
# ── Admin: Reindex All ─────────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/reindex-all")
|
|
async def reindex_all(
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Re-run stage 6 (embed & index) for all videos with processing_status='complete'.
|
|
|
|
Use after changing embedding text composition or Qdrant payload fields
|
|
to regenerate all vectors and payloads without re-running the full pipeline.
|
|
"""
|
|
stmt = select(SourceVideo.id).where(
|
|
SourceVideo.processing_status == ProcessingStatus.complete
|
|
)
|
|
result = await db.execute(stmt)
|
|
video_ids = [str(row[0]) for row in result.all()]
|
|
|
|
if not video_ids:
|
|
return {
|
|
"status": "no_videos",
|
|
"message": "No videos with processing_status='complete' found.",
|
|
"dispatched": 0,
|
|
}
|
|
|
|
from pipeline.stages import stage6_embed_and_index
|
|
|
|
dispatched = 0
|
|
errors = []
|
|
for vid in video_ids:
|
|
try:
|
|
stage6_embed_and_index.delay(vid)
|
|
dispatched += 1
|
|
except Exception as exc:
|
|
logger.warning("Failed to dispatch reindex for video_id=%s: %s", vid, exc)
|
|
errors.append({"video_id": vid, "error": str(exc)})
|
|
|
|
logger.info(
|
|
"Reindex-all dispatched %d/%d stage6 tasks.",
|
|
dispatched, len(video_ids),
|
|
)
|
|
|
|
return {
|
|
"status": "dispatched",
|
|
"dispatched": dispatched,
|
|
"total_complete_videos": len(video_ids),
|
|
"errors": errors if errors else None,
|
|
}
|
|
|
|
|
|
# ── Admin: Embedding status ──────────────────────────────────────────────────
|
|
|
|
@router.get("/admin/pipeline/embed-status/{video_id}")
|
|
async def get_embed_status(
|
|
video_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Get embedding/indexing status for a video's technique pages.
|
|
|
|
Returns which technique pages have been embedded, vector counts from
|
|
Qdrant, and the last stage 6 event timestamp.
|
|
"""
|
|
from models import KeyMoment
|
|
|
|
# Get technique pages linked to this video's key moments
|
|
moments = (await db.execute(
|
|
select(KeyMoment.technique_page_id)
|
|
.where(KeyMoment.source_video_id == video_id)
|
|
.where(KeyMoment.technique_page_id.isnot(None))
|
|
.distinct()
|
|
)).scalars().all()
|
|
|
|
page_ids = set(moments)
|
|
pages_info = []
|
|
if page_ids:
|
|
rows = (await db.execute(
|
|
select(TechniquePage.id, TechniquePage.title, TechniquePage.slug)
|
|
.where(TechniquePage.id.in_(page_ids))
|
|
)).all()
|
|
pages_info = [{"id": str(r.id), "title": r.title, "slug": r.slug} for r in rows]
|
|
|
|
# Get last stage 6 event for this video
|
|
last_embed_event = (await db.execute(
|
|
select(PipelineEvent.created_at, PipelineEvent.event_type)
|
|
.where(PipelineEvent.video_id == video_id)
|
|
.where(PipelineEvent.stage == "stage6_embed_and_index")
|
|
.order_by(PipelineEvent.created_at.desc())
|
|
.limit(1)
|
|
)).first()
|
|
|
|
# Try to get Qdrant vector count for this video
|
|
qdrant_count = None
|
|
try:
|
|
from config import get_settings
|
|
from pipeline.qdrant_client import QdrantManager
|
|
from qdrant_client.models import Filter, FieldCondition, MatchValue
|
|
|
|
settings = get_settings()
|
|
qdrant = QdrantManager(settings)
|
|
result = qdrant._client.count(
|
|
collection_name=qdrant._collection,
|
|
count_filter=Filter(
|
|
must=[FieldCondition(key="source_video_id", match=MatchValue(value=video_id))]
|
|
),
|
|
exact=True,
|
|
)
|
|
qdrant_count = result.count
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"technique_pages": pages_info,
|
|
"technique_page_count": len(pages_info),
|
|
"qdrant_vector_count": qdrant_count,
|
|
"last_embed_event": {
|
|
"created_at": last_embed_event.created_at.isoformat(),
|
|
"event_type": last_embed_event.event_type,
|
|
} if last_embed_event else None,
|
|
}
|
|
|
|
|
|
# ── Admin: Creator profile editing ───────────────────────────────────────────
|
|
|
|
@router.put("/admin/pipeline/creators/{creator_id}")
|
|
async def update_creator_profile(
|
|
creator_id: str,
|
|
body: dict,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Update creator profile fields (bio, social_links, featured, avatar_url).
|
|
|
|
Only provided fields are updated — omitted fields are left unchanged.
|
|
"""
|
|
creator = (await db.execute(
|
|
select(Creator).where(Creator.id == creator_id)
|
|
)).scalar_one_or_none()
|
|
if not creator:
|
|
raise HTTPException(status_code=404, detail="Creator not found")
|
|
|
|
updatable = {"bio", "social_links", "featured", "avatar_url"}
|
|
updated = []
|
|
for field in updatable:
|
|
if field in body:
|
|
setattr(creator, field, body[field])
|
|
updated.append(field)
|
|
|
|
if not updated:
|
|
raise HTTPException(status_code=400, detail="No updatable fields provided")
|
|
|
|
if "avatar_url" in body and body["avatar_url"]:
|
|
creator.avatar_source = "manual"
|
|
|
|
await db.commit()
|
|
return {"status": "updated", "creator": creator.name, "fields": updated}
|
|
|
|
|
|
# ── Admin: Avatar fetching ───────────────────────────────────────────────────
|
|
|
|
@router.post("/admin/pipeline/creators/{creator_id}/fetch-avatar")
|
|
async def fetch_creator_avatar(
|
|
creator_id: str,
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Trigger avatar lookup from TheAudioDB for a single creator."""
|
|
from pipeline.stages import fetch_creator_avatar as _task
|
|
|
|
creator = (await db.execute(
|
|
select(Creator).where(Creator.id == creator_id)
|
|
)).scalar_one_or_none()
|
|
if not creator:
|
|
raise HTTPException(status_code=404, detail="Creator not found")
|
|
|
|
_task.delay(creator_id)
|
|
return {"status": "dispatched", "creator": creator.name, "creator_id": creator_id}
|
|
|
|
|
|
@router.post("/admin/pipeline/creators/fetch-all-avatars")
|
|
async def fetch_all_avatars(
|
|
db: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Trigger avatar lookup for all creators missing avatars."""
|
|
from pipeline.stages import fetch_creator_avatar as _task
|
|
|
|
result = await db.execute(
|
|
select(Creator.id, Creator.name).where(
|
|
(Creator.avatar_url.is_(None)) | (Creator.avatar_source == "generated")
|
|
)
|
|
)
|
|
creators = result.all()
|
|
|
|
dispatched = 0
|
|
for cid, name in creators:
|
|
_task.delay(str(cid))
|
|
dispatched += 1
|
|
|
|
return {"status": "dispatched", "count": dispatched}
|
|
|
|
|
|
# ── Admin: Worker status ─────────────────────────────────────────────────────
|
|
|
|
WORKER_STATUS_CACHE_KEY = "chrysopedia:worker_status"
|
|
WORKER_STATUS_CACHE_TTL = 10 # seconds
|
|
|
|
|
|
def _inspect_workers():
|
|
"""Synchronous Celery inspect — runs in a thread to avoid blocking the event loop."""
|
|
from worker import celery_app
|
|
|
|
inspector = celery_app.control.inspect(timeout=0.5)
|
|
active = inspector.active() or {}
|
|
reserved = inspector.reserved() or {}
|
|
stats = inspector.stats() or {}
|
|
|
|
workers = []
|
|
for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())):
|
|
worker_active = active.get(worker_name, [])
|
|
worker_reserved = reserved.get(worker_name, [])
|
|
worker_stats = stats.get(worker_name, {})
|
|
|
|
workers.append({
|
|
"name": worker_name,
|
|
"active_tasks": [
|
|
{
|
|
"id": t.get("id"),
|
|
"name": t.get("name"),
|
|
"args": t.get("args", []),
|
|
"time_start": t.get("time_start"),
|
|
}
|
|
for t in worker_active
|
|
],
|
|
"reserved_tasks": len(worker_reserved),
|
|
"total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0)
|
|
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0),
|
|
"uptime": worker_stats.get("clock", None),
|
|
"pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None,
|
|
})
|
|
|
|
return {"online": len(workers) > 0, "workers": workers}
|
|
|
|
|
|
@router.get("/admin/pipeline/worker-status")
|
|
async def worker_status():
|
|
"""Get current Celery worker status — active, reserved, and stats.
|
|
|
|
Results are cached in Redis for 10 seconds to avoid repeated slow
|
|
Celery inspect round-trips. The synchronous inspect calls run in a
|
|
thread so they never block the async event loop.
|
|
"""
|
|
# Try Redis cache first
|
|
try:
|
|
redis = await get_redis()
|
|
cached = await redis.get(WORKER_STATUS_CACHE_KEY)
|
|
await redis.aclose()
|
|
if cached:
|
|
return json.loads(cached)
|
|
except Exception:
|
|
pass
|
|
|
|
# Cache miss — run synchronous inspect in a thread
|
|
try:
|
|
result = await asyncio.to_thread(_inspect_workers)
|
|
except Exception as exc:
|
|
logger.warning("Failed to inspect Celery workers: %s", exc)
|
|
return {"online": False, "workers": [], "error": str(exc)}
|
|
|
|
# Write to Redis cache (best-effort)
|
|
try:
|
|
redis = await get_redis()
|
|
await redis.set(WORKER_STATUS_CACHE_KEY, json.dumps(result), ex=WORKER_STATUS_CACHE_TTL)
|
|
await redis.aclose()
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|