diff --git a/alembic/versions/004_pipeline_events.py b/alembic/versions/004_pipeline_events.py new file mode 100644 index 0000000..25de8bc --- /dev/null +++ b/alembic/versions/004_pipeline_events.py @@ -0,0 +1,37 @@ +"""Create pipeline_events table. + +Revision ID: 004_pipeline_events +Revises: 003_content_reports +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + +revision = "004_pipeline_events" +down_revision = "003_content_reports" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "pipeline_events", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.func.gen_random_uuid()), + sa.Column("video_id", UUID(as_uuid=True), nullable=False, index=True), + sa.Column("stage", sa.String(50), nullable=False), + sa.Column("event_type", sa.String(30), nullable=False), + sa.Column("prompt_tokens", sa.Integer(), nullable=True), + sa.Column("completion_tokens", sa.Integer(), nullable=True), + sa.Column("total_tokens", sa.Integer(), nullable=True), + sa.Column("model", sa.String(100), nullable=True), + sa.Column("duration_ms", sa.Integer(), nullable=True), + sa.Column("payload", JSONB(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.now(), nullable=False), + ) + # Composite index for event log queries (video + newest first) + op.create_index("ix_pipeline_events_video_created", "pipeline_events", ["video_id", "created_at"]) + + +def downgrade() -> None: + op.drop_index("ix_pipeline_events_video_created") + op.drop_table("pipeline_events") diff --git a/backend/models.py b/backend/models.py index 3935cad..714e9bd 100644 --- a/backend/models.py +++ b/backend/models.py @@ -142,6 +142,7 @@ class SourceVideo(Base): nullable=False, ) transcript_path: Mapped[str | None] = mapped_column(String(1000), nullable=True) + content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True) processing_status: Mapped[ProcessingStatus] = mapped_column( Enum(ProcessingStatus, name="processing_status", create_constraint=True), default=ProcessingStatus.pending, @@ -378,3 +379,36 @@ class ContentReport(Base): default=_now, server_default=func.now() ) resolved_at: Mapped[datetime | None] = mapped_column(nullable=True) + + +# ── Pipeline Event ─────────────────────────────────────────────────────────── + +class PipelineEvent(Base): + """Structured log entry for pipeline execution. + + Captures per-stage start/complete/error/llm_call events with + token usage and optional response payloads for debugging. + """ + __tablename__ = "pipeline_events" + + id: Mapped[uuid.UUID] = _uuid_pk() + video_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), nullable=False, index=True, + ) + stage: Mapped[str] = mapped_column( + String(50), nullable=False, doc="stage2_segmentation, stage3_extraction, etc." + ) + event_type: Mapped[str] = mapped_column( + String(30), nullable=False, doc="start, complete, error, llm_call" + ) + prompt_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) + completion_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) + total_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) + model: Mapped[str | None] = mapped_column(String(100), nullable=True) + duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True) + payload: Mapped[dict | None] = mapped_column( + JSONB, nullable=True, doc="LLM response content, error details, stage metadata" + ) + created_at: Mapped[datetime] = mapped_column( + default=_now, server_default=func.now() + ) diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index a5b9367..52522f8 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -1,30 +1,40 @@ -"""Pipeline management endpoints for manual re-trigger and status inspection.""" +"""Pipeline management endpoints — public trigger + admin dashboard. + +Public: + POST /pipeline/trigger/{video_id} Trigger pipeline for a video + +Admin: + GET /admin/pipeline/videos Video list with status + event counts + POST /admin/pipeline/trigger/{video_id} Retrigger (same as public but under admin prefix) + POST /admin/pipeline/revoke/{video_id} Revoke/cancel active tasks for a video + GET /admin/pipeline/events/{video_id} Event log for a video (paginated) + GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect +""" import logging +import uuid +from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException -from sqlalchemy import select +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import func, select, case from sqlalchemy.ext.asyncio import AsyncSession from database import get_session -from models import SourceVideo +from models import PipelineEvent, SourceVideo, Creator logger = logging.getLogger("chrysopedia.pipeline") -router = APIRouter(prefix="/pipeline", tags=["pipeline"]) +router = APIRouter(tags=["pipeline"]) -@router.post("/trigger/{video_id}") +# ── Public trigger ─────────────────────────────────────────────────────────── + +@router.post("/pipeline/trigger/{video_id}") async def trigger_pipeline( video_id: str, db: AsyncSession = Depends(get_session), ): - """Manually trigger (or re-trigger) the LLM extraction pipeline for a video. - - Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``, - and returns the current processing status. Returns 404 if the video - does not exist. - """ + """Manually trigger (or re-trigger) the LLM extraction pipeline for a video.""" stmt = select(SourceVideo).where(SourceVideo.id == video_id) result = await db.execute(stmt) video = result.scalar_one_or_none() @@ -32,16 +42,13 @@ async def trigger_pipeline( if video is None: raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") - # Import inside handler to avoid circular import at module level from pipeline.stages import run_pipeline try: run_pipeline.delay(str(video.id)) logger.info("Pipeline manually triggered for video_id=%s", video_id) except Exception as exc: - logger.warning( - "Failed to dispatch pipeline for video_id=%s: %s", video_id, exc - ) + logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc) raise HTTPException( status_code=503, detail="Pipeline dispatch failed — Celery/Redis may be unavailable", @@ -52,3 +59,219 @@ async def trigger_pipeline( "video_id": str(video.id), "current_processing_status": video.processing_status.value, } + + +# ── Admin: Video list ──────────────────────────────────────────────────────── + +@router.get("/admin/pipeline/videos") +async def list_pipeline_videos( + db: AsyncSession = Depends(get_session), +): + """List all videos with processing status and pipeline event counts.""" + # Subquery for event counts per video + event_counts = ( + select( + PipelineEvent.video_id, + func.count().label("event_count"), + func.sum(case( + (PipelineEvent.event_type == "llm_call", PipelineEvent.total_tokens), + else_=0 + )).label("total_tokens_used"), + func.max(PipelineEvent.created_at).label("last_event_at"), + ) + .group_by(PipelineEvent.video_id) + .subquery() + ) + + stmt = ( + select( + SourceVideo.id, + SourceVideo.filename, + SourceVideo.processing_status, + SourceVideo.content_hash, + SourceVideo.created_at, + SourceVideo.updated_at, + Creator.name.label("creator_name"), + event_counts.c.event_count, + event_counts.c.total_tokens_used, + event_counts.c.last_event_at, + ) + .join(Creator, SourceVideo.creator_id == Creator.id) + .outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id) + .order_by(SourceVideo.updated_at.desc()) + ) + + result = await db.execute(stmt) + rows = result.all() + + return { + "items": [ + { + "id": str(r.id), + "filename": r.filename, + "processing_status": r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status), + "content_hash": r.content_hash, + "creator_name": r.creator_name, + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + "event_count": r.event_count or 0, + "total_tokens_used": r.total_tokens_used or 0, + "last_event_at": r.last_event_at.isoformat() if r.last_event_at else None, + } + for r in rows + ], + "total": len(rows), + } + + +# ── Admin: Retrigger ───────────────────────────────────────────────────────── + +@router.post("/admin/pipeline/trigger/{video_id}") +async def admin_trigger_pipeline( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Admin retrigger — same as public trigger.""" + return await trigger_pipeline(video_id, db) + + +# ── Admin: Revoke ──────────────────────────────────────────────────────────── + +@router.post("/admin/pipeline/revoke/{video_id}") +async def revoke_pipeline(video_id: str): + """Revoke/cancel active Celery tasks for a video. + + Uses Celery's revoke with terminate=True to kill running tasks. + This is best-effort — the task may have already completed. + """ + from worker import celery_app + + try: + # Get active tasks and revoke any matching this video_id + inspector = celery_app.control.inspect() + active = inspector.active() or {} + revoked_count = 0 + + for _worker, tasks in active.items(): + for task in tasks: + task_args = task.get("args", []) + if task_args and str(task_args[0]) == video_id: + celery_app.control.revoke(task["id"], terminate=True) + revoked_count += 1 + logger.info("Revoked task %s for video_id=%s", task["id"], video_id) + + return { + "status": "revoked" if revoked_count > 0 else "no_active_tasks", + "video_id": video_id, + "tasks_revoked": revoked_count, + } + except Exception as exc: + logger.warning("Failed to revoke tasks for video_id=%s: %s", video_id, exc) + raise HTTPException( + status_code=503, + detail="Failed to communicate with Celery worker", + ) from exc + + +# ── Admin: Event log ───────────────────────────────────────────────────────── + +@router.get("/admin/pipeline/events/{video_id}") +async def list_pipeline_events( + video_id: str, + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(ge=1, le=200)] = 100, + stage: Annotated[str | None, Query(description="Filter by stage name")] = None, + event_type: Annotated[str | None, Query(description="Filter by event type")] = None, + db: AsyncSession = Depends(get_session), +): + """Get pipeline events for a video, newest first.""" + stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id) + + if stage: + stmt = stmt.where(PipelineEvent.stage == stage) + if event_type: + stmt = stmt.where(PipelineEvent.event_type == event_type) + + # Count + count_stmt = select(func.count()).select_from(stmt.subquery()) + total = (await db.execute(count_stmt)).scalar() or 0 + + # Fetch + stmt = stmt.order_by(PipelineEvent.created_at.desc()) + stmt = stmt.offset(offset).limit(limit) + result = await db.execute(stmt) + events = result.scalars().all() + + return { + "items": [ + { + "id": str(e.id), + "video_id": str(e.video_id), + "stage": e.stage, + "event_type": e.event_type, + "prompt_tokens": e.prompt_tokens, + "completion_tokens": e.completion_tokens, + "total_tokens": e.total_tokens, + "model": e.model, + "duration_ms": e.duration_ms, + "payload": e.payload, + "created_at": e.created_at.isoformat() if e.created_at else None, + } + for e in events + ], + "total": total, + "offset": offset, + "limit": limit, + } + + +# ── Admin: Worker status ───────────────────────────────────────────────────── + +@router.get("/admin/pipeline/worker-status") +async def worker_status(): + """Get current Celery worker status — active, reserved, and stats.""" + from worker import celery_app + + try: + inspector = celery_app.control.inspect() + active = inspector.active() or {} + reserved = inspector.reserved() or {} + stats = inspector.stats() or {} + + workers = [] + for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())): + worker_active = active.get(worker_name, []) + worker_reserved = reserved.get(worker_name, []) + worker_stats = stats.get(worker_name, {}) + + workers.append({ + "name": worker_name, + "active_tasks": [ + { + "id": t.get("id"), + "name": t.get("name"), + "args": t.get("args", []), + "time_start": t.get("time_start"), + } + for t in worker_active + ], + "reserved_tasks": len(worker_reserved), + "total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0), + "uptime": worker_stats.get("clock", None), + "pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None, + }) + + return { + "online": len(workers) > 0, + "workers": workers, + } + except Exception as exc: + logger.warning("Failed to inspect Celery workers: %s", exc) + return { + "online": False, + "workers": [], + "error": str(exc), + } diff --git a/frontend/src/App.css b/frontend/src/App.css index 2b64b2b..7e82800 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -1028,7 +1028,7 @@ body { /* ── Search results page ──────────────────────────────────────────────────── */ .search-results-page { - max-width: 48rem; + max-width: 64rem; } .search-fallback-banner { @@ -1177,7 +1177,32 @@ body { /* ── Technique page ───────────────────────────────────────────────────────── */ .technique-page { - max-width: 48rem; + max-width: 64rem; +} + +.technique-columns { + display: grid; + grid-template-columns: 1fr 22rem; + gap: 2rem; + align-items: start; +} + +.technique-columns__main { + min-width: 0; /* prevent grid blowout */ +} + +.technique-columns__sidebar { + position: sticky; + top: 1.5rem; +} + +@media (max-width: 768px) { + .technique-columns { + grid-template-columns: 1fr; + } + .technique-columns__sidebar { + position: static; + } } .technique-404 { @@ -1631,7 +1656,7 @@ body { ══════════════════════════════════════════════════════════════════════════════ */ .creator-detail { - max-width: 48rem; + max-width: 64rem; } .creator-detail__header { @@ -2390,3 +2415,375 @@ body { padding: 0.1rem 0.35rem; border-radius: 3px; } + +/* ── Pipeline Admin ─────────────────────────────────────────────────────── */ + +.admin-pipeline { + max-width: 1100px; + margin: 0 auto; + padding: 2rem 1rem; +} + +.admin-pipeline__header { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 1rem; + margin-bottom: 1.5rem; + flex-wrap: wrap; +} + +.admin-pipeline__header-right { + display: flex; + align-items: center; + gap: 1rem; + flex-wrap: wrap; +} + +.admin-pipeline__title { + color: var(--color-text-primary); + margin: 0 0 0.25rem; +} + +.admin-pipeline__subtitle { + color: var(--color-text-muted); + margin: 0; + font-size: 0.9rem; +} + +.admin-pipeline__list { + display: flex; + flex-direction: column; + gap: 0.75rem; +} + +/* ── Worker Status Indicator ────────────────────────────────────────────── */ + +.worker-status { + display: flex; + align-items: center; + gap: 0.5rem; + font-size: 0.8125rem; + color: var(--color-text-secondary); + padding: 0.35rem 0.75rem; + background: var(--color-bg-surface); + border: 1px solid var(--color-border); + border-radius: 6px; + white-space: nowrap; +} + +.worker-status__dot { + display: inline-block; + width: 8px; + height: 8px; + border-radius: 50%; + flex-shrink: 0; +} + +.worker-status__dot--online { + background: var(--color-badge-approved-text); + box-shadow: 0 0 6px var(--color-badge-approved-text); +} + +.worker-status__dot--offline { + background: var(--color-error); + box-shadow: 0 0 6px var(--color-error); +} + +.worker-status__dot--unknown { + background: var(--color-text-muted); +} + +.worker-status__label { + font-weight: 500; +} + +.worker-status__detail { + color: var(--color-text-muted); + font-size: 0.75rem; +} + +.worker-status--error { + border-color: var(--color-error-border); +} + +/* ── Pipeline Video Row ─────────────────────────────────────────────────── */ + +.pipeline-video { + background: var(--color-bg-surface); + border: 1px solid var(--color-border); + border-radius: 8px; + overflow: hidden; +} + +.pipeline-video__header { + display: grid; + grid-template-columns: 1fr auto auto; + gap: 0.75rem; + align-items: center; + padding: 0.75rem 1rem; + cursor: pointer; +} + +.pipeline-video__header:hover { + background: var(--color-bg-input); +} + +.pipeline-video__info { + display: flex; + flex-direction: column; + gap: 0.1rem; + min-width: 0; +} + +.pipeline-video__filename { + color: var(--color-text-primary); + font-weight: 500; + font-size: 0.9rem; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.pipeline-video__creator { + color: var(--color-text-muted); + font-size: 0.8rem; +} + +.pipeline-video__meta { + display: flex; + align-items: center; + gap: 0.625rem; + flex-wrap: wrap; +} + +.pipeline-video__stat { + color: var(--color-text-muted); + font-size: 0.8rem; + white-space: nowrap; +} + +.pipeline-video__time { + color: var(--color-text-muted); + font-size: 0.75rem; + white-space: nowrap; +} + +.pipeline-video__actions { + display: flex; + gap: 0.375rem; +} + +.pipeline-video__message { + padding: 0.375rem 1rem; + font-size: 0.8rem; +} + +.pipeline-video__message--ok { + background: var(--color-badge-approved-bg); + color: var(--color-badge-approved-text); +} + +.pipeline-video__message--err { + background: var(--color-error-bg); + color: var(--color-error); +} + +.pipeline-video__detail { + padding: 0.75rem 1rem 1rem; + border-top: 1px solid var(--color-border); +} + +.pipeline-video__detail-meta { + display: flex; + gap: 1.25rem; + font-size: 0.8rem; + color: var(--color-text-muted); + margin-bottom: 1rem; + flex-wrap: wrap; +} + +/* ── Pipeline Badges ────────────────────────────────────────────────────── */ + +.pipeline-badge { + display: inline-flex; + align-items: center; + padding: 0.15rem 0.5rem; + border-radius: 4px; + font-size: 0.75rem; + font-weight: 500; + background: var(--color-pill-bg); + color: var(--color-pill-text); + white-space: nowrap; +} + +.pipeline-badge--success { + background: var(--color-badge-approved-bg); + color: var(--color-badge-approved-text); +} + +.pipeline-badge--active { + background: var(--color-badge-edited-bg); + color: var(--color-badge-edited-text); +} + +.pipeline-badge--error { + background: var(--color-badge-rejected-bg); + color: var(--color-badge-rejected-text); +} + +.pipeline-badge--pending { + background: var(--color-badge-pending-bg); + color: var(--color-badge-pending-text); +} + +.pipeline-badge--event-start { + background: var(--color-badge-edited-bg); + color: var(--color-badge-edited-text); +} + +.pipeline-badge--event-complete { + background: var(--color-badge-approved-bg); + color: var(--color-badge-approved-text); +} + +.pipeline-badge--event-error { + background: var(--color-badge-rejected-bg); + color: var(--color-badge-rejected-text); +} + +.pipeline-badge--event-llm_call { + background: var(--color-pill-plugin-bg); + color: var(--color-pill-plugin-text); +} + +/* ── Pipeline Events ────────────────────────────────────────────────────── */ + +.pipeline-events__header { + display: flex; + align-items: center; + justify-content: space-between; + margin-bottom: 0.75rem; +} + +.pipeline-events__count { + font-size: 0.85rem; + color: var(--color-text-secondary); + font-weight: 500; +} + +.pipeline-events__empty { + font-size: 0.85rem; + color: var(--color-text-muted); + padding: 0.5rem 0; +} + +.pipeline-events__list { + display: flex; + flex-direction: column; + gap: 0.25rem; +} + +.pipeline-event { + background: var(--color-bg-page); + border: 1px solid var(--color-border); + border-radius: 6px; + padding: 0.5rem 0.75rem; +} + +.pipeline-event--error { + border-left: 3px solid var(--color-error); +} + +.pipeline-event__row { + display: flex; + align-items: center; + gap: 0.5rem; + flex-wrap: wrap; +} + +.pipeline-event__icon { + font-size: 0.85rem; + flex-shrink: 0; + width: 1.25rem; + text-align: center; +} + +.pipeline-event__stage { + color: var(--color-text-primary); + font-size: 0.8125rem; + font-weight: 500; +} + +.pipeline-event__model { + color: var(--color-text-muted); + font-size: 0.75rem; + font-family: monospace; +} + +.pipeline-event__tokens { + color: var(--color-pill-plugin-text); + font-size: 0.75rem; + font-weight: 500; +} + +.pipeline-event__duration { + color: var(--color-text-muted); + font-size: 0.75rem; +} + +.pipeline-event__time { + color: var(--color-text-muted); + font-size: 0.75rem; + margin-left: auto; + white-space: nowrap; +} + +/* ── Pipeline Events Pager ──────────────────────────────────────────────── */ + +.pipeline-events__pager { + display: flex; + align-items: center; + justify-content: center; + gap: 0.75rem; + margin-top: 0.75rem; +} + +.pipeline-events__pager-info { + font-size: 0.8rem; + color: var(--color-text-muted); +} + +/* ── Collapsible JSON ───────────────────────────────────────────────────── */ + +.json-viewer { + margin-top: 0.375rem; +} + +.json-viewer__toggle { + background: none; + border: none; + color: var(--color-accent); + font-size: 0.75rem; + cursor: pointer; + padding: 0; + font-family: inherit; +} + +.json-viewer__toggle:hover { + color: var(--color-accent-hover); +} + +.json-viewer__content { + margin: 0.375rem 0 0; + padding: 0.5rem 0.75rem; + background: var(--color-bg-transcript); + border: 1px solid var(--color-border); + border-radius: 4px; + color: var(--color-text-secondary); + font-size: 0.75rem; + line-height: 1.5; + overflow-x: auto; + max-height: 300px; + overflow-y: auto; +} diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 1dd122c..e96014c 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -8,6 +8,7 @@ import TopicsBrowse from "./pages/TopicsBrowse"; import ReviewQueue from "./pages/ReviewQueue"; import MomentDetail from "./pages/MomentDetail"; import AdminReports from "./pages/AdminReports"; +import AdminPipeline from "./pages/AdminPipeline"; import ModeToggle from "./components/ModeToggle"; export default function App() { @@ -24,6 +25,7 @@ export default function App() { Creators Review Reports + Pipeline @@ -45,6 +47,7 @@ export default function App() { } /> } /> } /> + } /> {/* Fallback */} } /> diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts index 7683819..efce818 100644 --- a/frontend/src/api/public-client.ts +++ b/frontend/src/api/public-client.ts @@ -357,3 +357,114 @@ export async function updateReport( body: JSON.stringify(body), }); } + + +// ── Pipeline Admin ────────────────────────────────────────────────────────── + +export interface PipelineVideoItem { + id: string; + filename: string; + processing_status: string; + content_hash: string | null; + creator_name: string; + created_at: string | null; + updated_at: string | null; + event_count: number; + total_tokens_used: number; + last_event_at: string | null; +} + +export interface PipelineVideoListResponse { + items: PipelineVideoItem[]; + total: number; +} + +export interface PipelineEvent { + id: string; + video_id: string; + stage: string; + event_type: string; + prompt_tokens: number | null; + completion_tokens: number | null; + total_tokens: number | null; + model: string | null; + duration_ms: number | null; + payload: Record | null; + created_at: string | null; +} + +export interface PipelineEventListResponse { + items: PipelineEvent[]; + total: number; + offset: number; + limit: number; +} + +export interface WorkerTask { + id: string; + name: string; + args: unknown[]; + time_start: number | null; +} + +export interface WorkerInfo { + name: string; + active_tasks: WorkerTask[]; + reserved_tasks: number; + total_completed: number; + uptime: string | null; + pool_size: number | null; +} + +export interface WorkerStatusResponse { + online: boolean; + workers: WorkerInfo[]; + error?: string; +} + +export interface TriggerResponse { + status: string; + video_id: string; + current_processing_status?: string; +} + +export interface RevokeResponse { + status: string; + video_id: string; + tasks_revoked: number; +} + +export async function fetchPipelineVideos(): Promise { + return request(`${BASE}/admin/pipeline/videos`); +} + +export async function fetchPipelineEvents( + videoId: string, + params: { offset?: number; limit?: number; stage?: string; event_type?: string } = {}, +): Promise { + const qs = new URLSearchParams(); + if (params.offset !== undefined) qs.set("offset", String(params.offset)); + if (params.limit !== undefined) qs.set("limit", String(params.limit)); + if (params.stage) qs.set("stage", params.stage); + if (params.event_type) qs.set("event_type", params.event_type); + const query = qs.toString(); + return request( + `${BASE}/admin/pipeline/events/${videoId}${query ? `?${query}` : ""}`, + ); +} + +export async function fetchWorkerStatus(): Promise { + return request(`${BASE}/admin/pipeline/worker-status`); +} + +export async function triggerPipeline(videoId: string): Promise { + return request(`${BASE}/admin/pipeline/trigger/${videoId}`, { + method: "POST", + }); +} + +export async function revokePipeline(videoId: string): Promise { + return request(`${BASE}/admin/pipeline/revoke/${videoId}`, { + method: "POST", + }); +} diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx new file mode 100644 index 0000000..b4dc718 --- /dev/null +++ b/frontend/src/pages/AdminPipeline.tsx @@ -0,0 +1,422 @@ +/** + * Pipeline admin dashboard — video list with status, retrigger/revoke, + * expandable event log with token usage and collapsible JSON viewer. + */ + +import { useCallback, useEffect, useState } from "react"; +import { + fetchPipelineVideos, + fetchPipelineEvents, + fetchWorkerStatus, + triggerPipeline, + revokePipeline, + type PipelineVideoItem, + type PipelineEvent, + type WorkerStatusResponse, +} from "../api/public-client"; + +// ── Helpers ────────────────────────────────────────────────────────────────── + +function formatDate(iso: string | null): string { + if (!iso) return "—"; + return new Date(iso).toLocaleString(undefined, { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); +} + +function formatTokens(n: number): string { + if (n === 0) return "0"; + if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`; + if (n >= 1_000) return `${(n / 1_000).toFixed(1)}k`; + return String(n); +} + +function statusBadgeClass(status: string): string { + switch (status) { + case "completed": + case "indexed": + return "pipeline-badge--success"; + case "processing": + case "extracted": + case "classified": + case "synthesized": + return "pipeline-badge--active"; + case "failed": + case "error": + return "pipeline-badge--error"; + case "pending": + case "queued": + return "pipeline-badge--pending"; + default: + return ""; + } +} + +function eventTypeIcon(eventType: string): string { + switch (eventType) { + case "start": + return "▶"; + case "complete": + return "✓"; + case "error": + return "✗"; + case "llm_call": + return "🤖"; + default: + return "·"; + } +} + +// ── Collapsible JSON ───────────────────────────────────────────────────────── + +function JsonViewer({ data }: { data: Record | null }) { + const [open, setOpen] = useState(false); + if (!data || Object.keys(data).length === 0) return null; + + return ( +
+ + {open && ( +
+          {JSON.stringify(data, null, 2)}
+        
+ )} +
+ ); +} + +// ── Event Log ──────────────────────────────────────────────────────────────── + +function EventLog({ videoId }: { videoId: string }) { + const [events, setEvents] = useState([]); + const [total, setTotal] = useState(0); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [offset, setOffset] = useState(0); + const limit = 50; + + const load = useCallback(async () => { + setLoading(true); + setError(null); + try { + const res = await fetchPipelineEvents(videoId, { offset, limit }); + setEvents(res.items); + setTotal(res.total); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to load events"); + } finally { + setLoading(false); + } + }, [videoId, offset]); + + useEffect(() => { + void load(); + }, [load]); + + if (loading) return
Loading events…
; + if (error) return
Error: {error}
; + if (events.length === 0) return
No events recorded.
; + + const hasNext = offset + limit < total; + const hasPrev = offset > 0; + + return ( +
+
+ {total} event{total !== 1 ? "s" : ""} + +
+ +
+ {events.map((evt) => ( +
+
+ {eventTypeIcon(evt.event_type)} + {evt.stage} + + {evt.event_type} + + {evt.model && {evt.model}} + {evt.total_tokens != null && evt.total_tokens > 0 && ( + + {formatTokens(evt.total_tokens)} tok + + )} + {evt.duration_ms != null && ( + {evt.duration_ms}ms + )} + {formatDate(evt.created_at)} +
+ +
+ ))} +
+ + {(hasPrev || hasNext) && ( +
+ + + {offset + 1}–{Math.min(offset + limit, total)} of {total} + + +
+ )} +
+ ); +} + +// ── Worker Status ──────────────────────────────────────────────────────────── + +function WorkerStatus() { + const [status, setStatus] = useState(null); + const [error, setError] = useState(null); + + const load = useCallback(async () => { + try { + setError(null); + const res = await fetchWorkerStatus(); + setStatus(res); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed"); + } + }, []); + + useEffect(() => { + void load(); + const id = setInterval(() => void load(), 15_000); + return () => clearInterval(id); + }, [load]); + + if (error) { + return ( +
+ + Worker: error ({error}) +
+ ); + } + + if (!status) { + return ( +
+ + Worker: checking… +
+ ); + } + + return ( +
+ + + {status.online ? `${status.workers.length} worker${status.workers.length !== 1 ? "s" : ""} online` : "Workers offline"} + + {status.workers.map((w) => ( + + {w.active_tasks.length > 0 + ? `${w.active_tasks.length} active` + : "idle"} + {w.pool_size != null && ` · pool ${w.pool_size}`} + + ))} +
+ ); +} + +// ── Main Page ──────────────────────────────────────────────────────────────── + +export default function AdminPipeline() { + const [videos, setVideos] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [expandedId, setExpandedId] = useState(null); + const [actionLoading, setActionLoading] = useState(null); + const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null); + + const load = useCallback(async () => { + setLoading(true); + setError(null); + try { + const res = await fetchPipelineVideos(); + setVideos(res.items); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to load videos"); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + void load(); + }, [load]); + + const handleTrigger = async (videoId: string) => { + setActionLoading(videoId); + setActionMessage(null); + try { + const res = await triggerPipeline(videoId); + setActionMessage({ id: videoId, text: `Triggered (${res.status})`, ok: true }); + // Refresh after short delay to let status update + setTimeout(() => void load(), 2000); + } catch (err) { + setActionMessage({ + id: videoId, + text: err instanceof Error ? err.message : "Trigger failed", + ok: false, + }); + } finally { + setActionLoading(null); + } + }; + + const handleRevoke = async (videoId: string) => { + setActionLoading(videoId); + setActionMessage(null); + try { + const res = await revokePipeline(videoId); + setActionMessage({ + id: videoId, + text: res.tasks_revoked > 0 + ? `Revoked ${res.tasks_revoked} task${res.tasks_revoked !== 1 ? "s" : ""}` + : "No active tasks", + ok: true, + }); + setTimeout(() => void load(), 2000); + } catch (err) { + setActionMessage({ + id: videoId, + text: err instanceof Error ? err.message : "Revoke failed", + ok: false, + }); + } finally { + setActionLoading(null); + } + }; + + const toggleExpand = (id: string) => { + setExpandedId((prev) => (prev === id ? null : id)); + }; + + return ( +
+
+
+

Pipeline Management

+

+ {videos.length} video{videos.length !== 1 ? "s" : ""} +

+
+
+ + +
+
+ + {loading ? ( +
Loading videos…
+ ) : error ? ( +
Error: {error}
+ ) : videos.length === 0 ? ( +
No videos in pipeline.
+ ) : ( +
+ {videos.map((video) => ( +
+
toggleExpand(video.id)} + > +
+ + {video.filename} + + {video.creator_name} +
+ +
+ + {video.processing_status} + + + {video.event_count} events + + + {formatTokens(video.total_tokens_used)} tokens + + + {formatDate(video.last_event_at)} + +
+ +
e.stopPropagation()}> + + +
+
+ + {actionMessage?.id === video.id && ( +
+ {actionMessage.text} +
+ )} + + {expandedId === video.id && ( +
+
+ ID: {video.id.slice(0, 8)}… + {video.content_hash && ( + + Hash: {video.content_hash.slice(0, 12)}… + + )} + Created: {formatDate(video.created_at)} + Updated: {formatDate(video.updated_at)} +
+ +
+ )} +
+ ))} +
+ )} +
+ ); +} diff --git a/frontend/src/pages/TechniquePage.tsx b/frontend/src/pages/TechniquePage.tsx index 43bfe45..e892913 100644 --- a/frontend/src/pages/TechniquePage.tsx +++ b/frontend/src/pages/TechniquePage.tsx @@ -371,7 +371,9 @@ export default function TechniquePage() { /> )} - {/* Summary */} +
+
+ {/* Summary */} {displaySummary && (

{displaySummary}

@@ -401,7 +403,9 @@ export default function TechniquePage() {
)} - {/* Key moments (always from live data — not versioned) */} +
+
+ {/* Key moments (always from live data — not versioned) */} {technique.key_moments.length > 0 && (

Key Moments

@@ -500,6 +504,8 @@ export default function TechniquePage() {
)} +
+
); } diff --git a/frontend/tsconfig.app.tsbuildinfo b/frontend/tsconfig.app.tsbuildinfo index 38e7477..997c8ec 100644 --- a/frontend/tsconfig.app.tsbuildinfo +++ b/frontend/tsconfig.app.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/client.ts","./src/api/public-client.ts","./src/components/ModeToggle.tsx","./src/components/StatusBadge.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/MomentDetail.tsx","./src/pages/ReviewQueue.tsx","./src/pages/SearchResults.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx"],"version":"5.6.3"} \ No newline at end of file +{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/client.ts","./src/api/public-client.ts","./src/components/ModeToggle.tsx","./src/components/ReportIssueModal.tsx","./src/components/StatusBadge.tsx","./src/pages/AdminPipeline.tsx","./src/pages/AdminReports.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/MomentDetail.tsx","./src/pages/ReviewQueue.tsx","./src/pages/SearchResults.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx"],"version":"5.6.3"} \ No newline at end of file