feat: Pipeline events, admin dashboard, and version switcher UI

- Add pipeline_events table (migration 004) for structured stage logging
- Add PipelineEvent model with token usage tracking
- Admin pipeline dashboard with video list, event log, worker status,
  trigger/revoke controls, and collapsible JSON payload viewer
- Version switcher on technique pages — view historical snapshots
  with pipeline metadata (model names, prompt hashes)
- Frontend types for pipeline admin and version APIs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
jlightner 2026-03-30 05:55:07 -05:00
parent b3204bece9
commit df33d15360
9 changed files with 1255 additions and 22 deletions

View file

@ -0,0 +1,37 @@
"""Create pipeline_events table.
Revision ID: 004_pipeline_events
Revises: 003_content_reports
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
revision = "004_pipeline_events"
down_revision = "003_content_reports"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"pipeline_events",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.func.gen_random_uuid()),
sa.Column("video_id", UUID(as_uuid=True), nullable=False, index=True),
sa.Column("stage", sa.String(50), nullable=False),
sa.Column("event_type", sa.String(30), nullable=False),
sa.Column("prompt_tokens", sa.Integer(), nullable=True),
sa.Column("completion_tokens", sa.Integer(), nullable=True),
sa.Column("total_tokens", sa.Integer(), nullable=True),
sa.Column("model", sa.String(100), nullable=True),
sa.Column("duration_ms", sa.Integer(), nullable=True),
sa.Column("payload", JSONB(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now(), nullable=False),
)
# Composite index for event log queries (video + newest first)
op.create_index("ix_pipeline_events_video_created", "pipeline_events", ["video_id", "created_at"])
def downgrade() -> None:
op.drop_index("ix_pipeline_events_video_created")
op.drop_table("pipeline_events")

View file

@ -142,6 +142,7 @@ class SourceVideo(Base):
nullable=False, nullable=False,
) )
transcript_path: Mapped[str | None] = mapped_column(String(1000), nullable=True) transcript_path: Mapped[str | None] = mapped_column(String(1000), nullable=True)
content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True)
processing_status: Mapped[ProcessingStatus] = mapped_column( processing_status: Mapped[ProcessingStatus] = mapped_column(
Enum(ProcessingStatus, name="processing_status", create_constraint=True), Enum(ProcessingStatus, name="processing_status", create_constraint=True),
default=ProcessingStatus.pending, default=ProcessingStatus.pending,
@ -378,3 +379,36 @@ class ContentReport(Base):
default=_now, server_default=func.now() default=_now, server_default=func.now()
) )
resolved_at: Mapped[datetime | None] = mapped_column(nullable=True) resolved_at: Mapped[datetime | None] = mapped_column(nullable=True)
# ── Pipeline Event ───────────────────────────────────────────────────────────
class PipelineEvent(Base):
"""Structured log entry for pipeline execution.
Captures per-stage start/complete/error/llm_call events with
token usage and optional response payloads for debugging.
"""
__tablename__ = "pipeline_events"
id: Mapped[uuid.UUID] = _uuid_pk()
video_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), nullable=False, index=True,
)
stage: Mapped[str] = mapped_column(
String(50), nullable=False, doc="stage2_segmentation, stage3_extraction, etc."
)
event_type: Mapped[str] = mapped_column(
String(30), nullable=False, doc="start, complete, error, llm_call"
)
prompt_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
completion_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
total_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
model: Mapped[str | None] = mapped_column(String(100), nullable=True)
duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
payload: Mapped[dict | None] = mapped_column(
JSONB, nullable=True, doc="LLM response content, error details, stage metadata"
)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)

View file

@ -1,30 +1,40 @@
"""Pipeline management endpoints for manual re-trigger and status inspection.""" """Pipeline management endpoints — public trigger + admin dashboard.
Public:
POST /pipeline/trigger/{video_id} Trigger pipeline for a video
Admin:
GET /admin/pipeline/videos Video list with status + event counts
POST /admin/pipeline/trigger/{video_id} Retrigger (same as public but under admin prefix)
POST /admin/pipeline/revoke/{video_id} Revoke/cancel active tasks for a video
GET /admin/pipeline/events/{video_id} Event log for a video (paginated)
GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect
"""
import logging import logging
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select from sqlalchemy import func, select, case
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session from database import get_session
from models import SourceVideo from models import PipelineEvent, SourceVideo, Creator
logger = logging.getLogger("chrysopedia.pipeline") logger = logging.getLogger("chrysopedia.pipeline")
router = APIRouter(prefix="/pipeline", tags=["pipeline"]) router = APIRouter(tags=["pipeline"])
@router.post("/trigger/{video_id}") # ── Public trigger ───────────────────────────────────────────────────────────
@router.post("/pipeline/trigger/{video_id}")
async def trigger_pipeline( async def trigger_pipeline(
video_id: str, video_id: str,
db: AsyncSession = Depends(get_session), db: AsyncSession = Depends(get_session),
): ):
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video. """Manually trigger (or re-trigger) the LLM extraction pipeline for a video."""
Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``,
and returns the current processing status. Returns 404 if the video
does not exist.
"""
stmt = select(SourceVideo).where(SourceVideo.id == video_id) stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt) result = await db.execute(stmt)
video = result.scalar_one_or_none() video = result.scalar_one_or_none()
@ -32,16 +42,13 @@ async def trigger_pipeline(
if video is None: if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
# Import inside handler to avoid circular import at module level
from pipeline.stages import run_pipeline from pipeline.stages import run_pipeline
try: try:
run_pipeline.delay(str(video.id)) run_pipeline.delay(str(video.id))
logger.info("Pipeline manually triggered for video_id=%s", video_id) logger.info("Pipeline manually triggered for video_id=%s", video_id)
except Exception as exc: except Exception as exc:
logger.warning( logger.warning("Failed to dispatch pipeline for video_id=%s: %s", video_id, exc)
"Failed to dispatch pipeline for video_id=%s: %s", video_id, exc
)
raise HTTPException( raise HTTPException(
status_code=503, status_code=503,
detail="Pipeline dispatch failed — Celery/Redis may be unavailable", detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
@ -52,3 +59,219 @@ async def trigger_pipeline(
"video_id": str(video.id), "video_id": str(video.id),
"current_processing_status": video.processing_status.value, "current_processing_status": video.processing_status.value,
} }
# ── Admin: Video list ────────────────────────────────────────────────────────
@router.get("/admin/pipeline/videos")
async def list_pipeline_videos(
db: AsyncSession = Depends(get_session),
):
"""List all videos with processing status and pipeline event counts."""
# Subquery for event counts per video
event_counts = (
select(
PipelineEvent.video_id,
func.count().label("event_count"),
func.sum(case(
(PipelineEvent.event_type == "llm_call", PipelineEvent.total_tokens),
else_=0
)).label("total_tokens_used"),
func.max(PipelineEvent.created_at).label("last_event_at"),
)
.group_by(PipelineEvent.video_id)
.subquery()
)
stmt = (
select(
SourceVideo.id,
SourceVideo.filename,
SourceVideo.processing_status,
SourceVideo.content_hash,
SourceVideo.created_at,
SourceVideo.updated_at,
Creator.name.label("creator_name"),
event_counts.c.event_count,
event_counts.c.total_tokens_used,
event_counts.c.last_event_at,
)
.join(Creator, SourceVideo.creator_id == Creator.id)
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
.order_by(SourceVideo.updated_at.desc())
)
result = await db.execute(stmt)
rows = result.all()
return {
"items": [
{
"id": str(r.id),
"filename": r.filename,
"processing_status": r.processing_status.value if hasattr(r.processing_status, 'value') else str(r.processing_status),
"content_hash": r.content_hash,
"creator_name": r.creator_name,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
"event_count": r.event_count or 0,
"total_tokens_used": r.total_tokens_used or 0,
"last_event_at": r.last_event_at.isoformat() if r.last_event_at else None,
}
for r in rows
],
"total": len(rows),
}
# ── Admin: Retrigger ─────────────────────────────────────────────────────────
@router.post("/admin/pipeline/trigger/{video_id}")
async def admin_trigger_pipeline(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Admin retrigger — same as public trigger."""
return await trigger_pipeline(video_id, db)
# ── Admin: Revoke ────────────────────────────────────────────────────────────
@router.post("/admin/pipeline/revoke/{video_id}")
async def revoke_pipeline(video_id: str):
"""Revoke/cancel active Celery tasks for a video.
Uses Celery's revoke with terminate=True to kill running tasks.
This is best-effort the task may have already completed.
"""
from worker import celery_app
try:
# Get active tasks and revoke any matching this video_id
inspector = celery_app.control.inspect()
active = inspector.active() or {}
revoked_count = 0
for _worker, tasks in active.items():
for task in tasks:
task_args = task.get("args", [])
if task_args and str(task_args[0]) == video_id:
celery_app.control.revoke(task["id"], terminate=True)
revoked_count += 1
logger.info("Revoked task %s for video_id=%s", task["id"], video_id)
return {
"status": "revoked" if revoked_count > 0 else "no_active_tasks",
"video_id": video_id,
"tasks_revoked": revoked_count,
}
except Exception as exc:
logger.warning("Failed to revoke tasks for video_id=%s: %s", video_id, exc)
raise HTTPException(
status_code=503,
detail="Failed to communicate with Celery worker",
) from exc
# ── Admin: Event log ─────────────────────────────────────────────────────────
@router.get("/admin/pipeline/events/{video_id}")
async def list_pipeline_events(
video_id: str,
offset: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=200)] = 100,
stage: Annotated[str | None, Query(description="Filter by stage name")] = None,
event_type: Annotated[str | None, Query(description="Filter by event type")] = None,
db: AsyncSession = Depends(get_session),
):
"""Get pipeline events for a video, newest first."""
stmt = select(PipelineEvent).where(PipelineEvent.video_id == video_id)
if stage:
stmt = stmt.where(PipelineEvent.stage == stage)
if event_type:
stmt = stmt.where(PipelineEvent.event_type == event_type)
# Count
count_stmt = select(func.count()).select_from(stmt.subquery())
total = (await db.execute(count_stmt)).scalar() or 0
# Fetch
stmt = stmt.order_by(PipelineEvent.created_at.desc())
stmt = stmt.offset(offset).limit(limit)
result = await db.execute(stmt)
events = result.scalars().all()
return {
"items": [
{
"id": str(e.id),
"video_id": str(e.video_id),
"stage": e.stage,
"event_type": e.event_type,
"prompt_tokens": e.prompt_tokens,
"completion_tokens": e.completion_tokens,
"total_tokens": e.total_tokens,
"model": e.model,
"duration_ms": e.duration_ms,
"payload": e.payload,
"created_at": e.created_at.isoformat() if e.created_at else None,
}
for e in events
],
"total": total,
"offset": offset,
"limit": limit,
}
# ── Admin: Worker status ─────────────────────────────────────────────────────
@router.get("/admin/pipeline/worker-status")
async def worker_status():
"""Get current Celery worker status — active, reserved, and stats."""
from worker import celery_app
try:
inspector = celery_app.control.inspect()
active = inspector.active() or {}
reserved = inspector.reserved() or {}
stats = inspector.stats() or {}
workers = []
for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())):
worker_active = active.get(worker_name, [])
worker_reserved = reserved.get(worker_name, [])
worker_stats = stats.get(worker_name, {})
workers.append({
"name": worker_name,
"active_tasks": [
{
"id": t.get("id"),
"name": t.get("name"),
"args": t.get("args", []),
"time_start": t.get("time_start"),
}
for t in worker_active
],
"reserved_tasks": len(worker_reserved),
"total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0)
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0)
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0)
+ worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0),
"uptime": worker_stats.get("clock", None),
"pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None,
})
return {
"online": len(workers) > 0,
"workers": workers,
}
except Exception as exc:
logger.warning("Failed to inspect Celery workers: %s", exc)
return {
"online": False,
"workers": [],
"error": str(exc),
}

View file

@ -1028,7 +1028,7 @@ body {
/* ── Search results page ──────────────────────────────────────────────────── */ /* ── Search results page ──────────────────────────────────────────────────── */
.search-results-page { .search-results-page {
max-width: 48rem; max-width: 64rem;
} }
.search-fallback-banner { .search-fallback-banner {
@ -1177,7 +1177,32 @@ body {
/* ── Technique page ───────────────────────────────────────────────────────── */ /* ── Technique page ───────────────────────────────────────────────────────── */
.technique-page { .technique-page {
max-width: 48rem; max-width: 64rem;
}
.technique-columns {
display: grid;
grid-template-columns: 1fr 22rem;
gap: 2rem;
align-items: start;
}
.technique-columns__main {
min-width: 0; /* prevent grid blowout */
}
.technique-columns__sidebar {
position: sticky;
top: 1.5rem;
}
@media (max-width: 768px) {
.technique-columns {
grid-template-columns: 1fr;
}
.technique-columns__sidebar {
position: static;
}
} }
.technique-404 { .technique-404 {
@ -1631,7 +1656,7 @@ body {
*/ */
.creator-detail { .creator-detail {
max-width: 48rem; max-width: 64rem;
} }
.creator-detail__header { .creator-detail__header {
@ -2390,3 +2415,375 @@ body {
padding: 0.1rem 0.35rem; padding: 0.1rem 0.35rem;
border-radius: 3px; border-radius: 3px;
} }
/* ── Pipeline Admin ─────────────────────────────────────────────────────── */
.admin-pipeline {
max-width: 1100px;
margin: 0 auto;
padding: 2rem 1rem;
}
.admin-pipeline__header {
display: flex;
justify-content: space-between;
align-items: flex-start;
gap: 1rem;
margin-bottom: 1.5rem;
flex-wrap: wrap;
}
.admin-pipeline__header-right {
display: flex;
align-items: center;
gap: 1rem;
flex-wrap: wrap;
}
.admin-pipeline__title {
color: var(--color-text-primary);
margin: 0 0 0.25rem;
}
.admin-pipeline__subtitle {
color: var(--color-text-muted);
margin: 0;
font-size: 0.9rem;
}
.admin-pipeline__list {
display: flex;
flex-direction: column;
gap: 0.75rem;
}
/* ── Worker Status Indicator ────────────────────────────────────────────── */
.worker-status {
display: flex;
align-items: center;
gap: 0.5rem;
font-size: 0.8125rem;
color: var(--color-text-secondary);
padding: 0.35rem 0.75rem;
background: var(--color-bg-surface);
border: 1px solid var(--color-border);
border-radius: 6px;
white-space: nowrap;
}
.worker-status__dot {
display: inline-block;
width: 8px;
height: 8px;
border-radius: 50%;
flex-shrink: 0;
}
.worker-status__dot--online {
background: var(--color-badge-approved-text);
box-shadow: 0 0 6px var(--color-badge-approved-text);
}
.worker-status__dot--offline {
background: var(--color-error);
box-shadow: 0 0 6px var(--color-error);
}
.worker-status__dot--unknown {
background: var(--color-text-muted);
}
.worker-status__label {
font-weight: 500;
}
.worker-status__detail {
color: var(--color-text-muted);
font-size: 0.75rem;
}
.worker-status--error {
border-color: var(--color-error-border);
}
/* ── Pipeline Video Row ─────────────────────────────────────────────────── */
.pipeline-video {
background: var(--color-bg-surface);
border: 1px solid var(--color-border);
border-radius: 8px;
overflow: hidden;
}
.pipeline-video__header {
display: grid;
grid-template-columns: 1fr auto auto;
gap: 0.75rem;
align-items: center;
padding: 0.75rem 1rem;
cursor: pointer;
}
.pipeline-video__header:hover {
background: var(--color-bg-input);
}
.pipeline-video__info {
display: flex;
flex-direction: column;
gap: 0.1rem;
min-width: 0;
}
.pipeline-video__filename {
color: var(--color-text-primary);
font-weight: 500;
font-size: 0.9rem;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.pipeline-video__creator {
color: var(--color-text-muted);
font-size: 0.8rem;
}
.pipeline-video__meta {
display: flex;
align-items: center;
gap: 0.625rem;
flex-wrap: wrap;
}
.pipeline-video__stat {
color: var(--color-text-muted);
font-size: 0.8rem;
white-space: nowrap;
}
.pipeline-video__time {
color: var(--color-text-muted);
font-size: 0.75rem;
white-space: nowrap;
}
.pipeline-video__actions {
display: flex;
gap: 0.375rem;
}
.pipeline-video__message {
padding: 0.375rem 1rem;
font-size: 0.8rem;
}
.pipeline-video__message--ok {
background: var(--color-badge-approved-bg);
color: var(--color-badge-approved-text);
}
.pipeline-video__message--err {
background: var(--color-error-bg);
color: var(--color-error);
}
.pipeline-video__detail {
padding: 0.75rem 1rem 1rem;
border-top: 1px solid var(--color-border);
}
.pipeline-video__detail-meta {
display: flex;
gap: 1.25rem;
font-size: 0.8rem;
color: var(--color-text-muted);
margin-bottom: 1rem;
flex-wrap: wrap;
}
/* ── Pipeline Badges ────────────────────────────────────────────────────── */
.pipeline-badge {
display: inline-flex;
align-items: center;
padding: 0.15rem 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
font-weight: 500;
background: var(--color-pill-bg);
color: var(--color-pill-text);
white-space: nowrap;
}
.pipeline-badge--success {
background: var(--color-badge-approved-bg);
color: var(--color-badge-approved-text);
}
.pipeline-badge--active {
background: var(--color-badge-edited-bg);
color: var(--color-badge-edited-text);
}
.pipeline-badge--error {
background: var(--color-badge-rejected-bg);
color: var(--color-badge-rejected-text);
}
.pipeline-badge--pending {
background: var(--color-badge-pending-bg);
color: var(--color-badge-pending-text);
}
.pipeline-badge--event-start {
background: var(--color-badge-edited-bg);
color: var(--color-badge-edited-text);
}
.pipeline-badge--event-complete {
background: var(--color-badge-approved-bg);
color: var(--color-badge-approved-text);
}
.pipeline-badge--event-error {
background: var(--color-badge-rejected-bg);
color: var(--color-badge-rejected-text);
}
.pipeline-badge--event-llm_call {
background: var(--color-pill-plugin-bg);
color: var(--color-pill-plugin-text);
}
/* ── Pipeline Events ────────────────────────────────────────────────────── */
.pipeline-events__header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 0.75rem;
}
.pipeline-events__count {
font-size: 0.85rem;
color: var(--color-text-secondary);
font-weight: 500;
}
.pipeline-events__empty {
font-size: 0.85rem;
color: var(--color-text-muted);
padding: 0.5rem 0;
}
.pipeline-events__list {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.pipeline-event {
background: var(--color-bg-page);
border: 1px solid var(--color-border);
border-radius: 6px;
padding: 0.5rem 0.75rem;
}
.pipeline-event--error {
border-left: 3px solid var(--color-error);
}
.pipeline-event__row {
display: flex;
align-items: center;
gap: 0.5rem;
flex-wrap: wrap;
}
.pipeline-event__icon {
font-size: 0.85rem;
flex-shrink: 0;
width: 1.25rem;
text-align: center;
}
.pipeline-event__stage {
color: var(--color-text-primary);
font-size: 0.8125rem;
font-weight: 500;
}
.pipeline-event__model {
color: var(--color-text-muted);
font-size: 0.75rem;
font-family: monospace;
}
.pipeline-event__tokens {
color: var(--color-pill-plugin-text);
font-size: 0.75rem;
font-weight: 500;
}
.pipeline-event__duration {
color: var(--color-text-muted);
font-size: 0.75rem;
}
.pipeline-event__time {
color: var(--color-text-muted);
font-size: 0.75rem;
margin-left: auto;
white-space: nowrap;
}
/* ── Pipeline Events Pager ──────────────────────────────────────────────── */
.pipeline-events__pager {
display: flex;
align-items: center;
justify-content: center;
gap: 0.75rem;
margin-top: 0.75rem;
}
.pipeline-events__pager-info {
font-size: 0.8rem;
color: var(--color-text-muted);
}
/* ── Collapsible JSON ───────────────────────────────────────────────────── */
.json-viewer {
margin-top: 0.375rem;
}
.json-viewer__toggle {
background: none;
border: none;
color: var(--color-accent);
font-size: 0.75rem;
cursor: pointer;
padding: 0;
font-family: inherit;
}
.json-viewer__toggle:hover {
color: var(--color-accent-hover);
}
.json-viewer__content {
margin: 0.375rem 0 0;
padding: 0.5rem 0.75rem;
background: var(--color-bg-transcript);
border: 1px solid var(--color-border);
border-radius: 4px;
color: var(--color-text-secondary);
font-size: 0.75rem;
line-height: 1.5;
overflow-x: auto;
max-height: 300px;
overflow-y: auto;
}

View file

@ -8,6 +8,7 @@ import TopicsBrowse from "./pages/TopicsBrowse";
import ReviewQueue from "./pages/ReviewQueue"; import ReviewQueue from "./pages/ReviewQueue";
import MomentDetail from "./pages/MomentDetail"; import MomentDetail from "./pages/MomentDetail";
import AdminReports from "./pages/AdminReports"; import AdminReports from "./pages/AdminReports";
import AdminPipeline from "./pages/AdminPipeline";
import ModeToggle from "./components/ModeToggle"; import ModeToggle from "./components/ModeToggle";
export default function App() { export default function App() {
@ -24,6 +25,7 @@ export default function App() {
<Link to="/creators">Creators</Link> <Link to="/creators">Creators</Link>
<Link to="/admin/review">Review</Link> <Link to="/admin/review">Review</Link>
<Link to="/admin/reports">Reports</Link> <Link to="/admin/reports">Reports</Link>
<Link to="/admin/pipeline">Pipeline</Link>
</nav> </nav>
<ModeToggle /> <ModeToggle />
</div> </div>
@ -45,6 +47,7 @@ export default function App() {
<Route path="/admin/review" element={<ReviewQueue />} /> <Route path="/admin/review" element={<ReviewQueue />} />
<Route path="/admin/review/:momentId" element={<MomentDetail />} /> <Route path="/admin/review/:momentId" element={<MomentDetail />} />
<Route path="/admin/reports" element={<AdminReports />} /> <Route path="/admin/reports" element={<AdminReports />} />
<Route path="/admin/pipeline" element={<AdminPipeline />} />
{/* Fallback */} {/* Fallback */}
<Route path="*" element={<Navigate to="/" replace />} /> <Route path="*" element={<Navigate to="/" replace />} />

View file

@ -357,3 +357,114 @@ export async function updateReport(
body: JSON.stringify(body), body: JSON.stringify(body),
}); });
} }
// ── Pipeline Admin ──────────────────────────────────────────────────────────
export interface PipelineVideoItem {
id: string;
filename: string;
processing_status: string;
content_hash: string | null;
creator_name: string;
created_at: string | null;
updated_at: string | null;
event_count: number;
total_tokens_used: number;
last_event_at: string | null;
}
export interface PipelineVideoListResponse {
items: PipelineVideoItem[];
total: number;
}
export interface PipelineEvent {
id: string;
video_id: string;
stage: string;
event_type: string;
prompt_tokens: number | null;
completion_tokens: number | null;
total_tokens: number | null;
model: string | null;
duration_ms: number | null;
payload: Record<string, unknown> | null;
created_at: string | null;
}
export interface PipelineEventListResponse {
items: PipelineEvent[];
total: number;
offset: number;
limit: number;
}
export interface WorkerTask {
id: string;
name: string;
args: unknown[];
time_start: number | null;
}
export interface WorkerInfo {
name: string;
active_tasks: WorkerTask[];
reserved_tasks: number;
total_completed: number;
uptime: string | null;
pool_size: number | null;
}
export interface WorkerStatusResponse {
online: boolean;
workers: WorkerInfo[];
error?: string;
}
export interface TriggerResponse {
status: string;
video_id: string;
current_processing_status?: string;
}
export interface RevokeResponse {
status: string;
video_id: string;
tasks_revoked: number;
}
export async function fetchPipelineVideos(): Promise<PipelineVideoListResponse> {
return request<PipelineVideoListResponse>(`${BASE}/admin/pipeline/videos`);
}
export async function fetchPipelineEvents(
videoId: string,
params: { offset?: number; limit?: number; stage?: string; event_type?: string } = {},
): Promise<PipelineEventListResponse> {
const qs = new URLSearchParams();
if (params.offset !== undefined) qs.set("offset", String(params.offset));
if (params.limit !== undefined) qs.set("limit", String(params.limit));
if (params.stage) qs.set("stage", params.stage);
if (params.event_type) qs.set("event_type", params.event_type);
const query = qs.toString();
return request<PipelineEventListResponse>(
`${BASE}/admin/pipeline/events/${videoId}${query ? `?${query}` : ""}`,
);
}
export async function fetchWorkerStatus(): Promise<WorkerStatusResponse> {
return request<WorkerStatusResponse>(`${BASE}/admin/pipeline/worker-status`);
}
export async function triggerPipeline(videoId: string): Promise<TriggerResponse> {
return request<TriggerResponse>(`${BASE}/admin/pipeline/trigger/${videoId}`, {
method: "POST",
});
}
export async function revokePipeline(videoId: string): Promise<RevokeResponse> {
return request<RevokeResponse>(`${BASE}/admin/pipeline/revoke/${videoId}`, {
method: "POST",
});
}

View file

@ -0,0 +1,422 @@
/**
* Pipeline admin dashboard video list with status, retrigger/revoke,
* expandable event log with token usage and collapsible JSON viewer.
*/
import { useCallback, useEffect, useState } from "react";
import {
fetchPipelineVideos,
fetchPipelineEvents,
fetchWorkerStatus,
triggerPipeline,
revokePipeline,
type PipelineVideoItem,
type PipelineEvent,
type WorkerStatusResponse,
} from "../api/public-client";
// ── Helpers ──────────────────────────────────────────────────────────────────
function formatDate(iso: string | null): string {
if (!iso) return "—";
return new Date(iso).toLocaleString(undefined, {
month: "short",
day: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
});
}
function formatTokens(n: number): string {
if (n === 0) return "0";
if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`;
if (n >= 1_000) return `${(n / 1_000).toFixed(1)}k`;
return String(n);
}
function statusBadgeClass(status: string): string {
switch (status) {
case "completed":
case "indexed":
return "pipeline-badge--success";
case "processing":
case "extracted":
case "classified":
case "synthesized":
return "pipeline-badge--active";
case "failed":
case "error":
return "pipeline-badge--error";
case "pending":
case "queued":
return "pipeline-badge--pending";
default:
return "";
}
}
function eventTypeIcon(eventType: string): string {
switch (eventType) {
case "start":
return "▶";
case "complete":
return "✓";
case "error":
return "✗";
case "llm_call":
return "🤖";
default:
return "·";
}
}
// ── Collapsible JSON ─────────────────────────────────────────────────────────
function JsonViewer({ data }: { data: Record<string, unknown> | null }) {
const [open, setOpen] = useState(false);
if (!data || Object.keys(data).length === 0) return null;
return (
<div className="json-viewer">
<button
className="json-viewer__toggle"
onClick={() => setOpen((v) => !v)}
aria-expanded={open}
>
{open ? "▾ Hide payload" : "▸ Show payload"}
</button>
{open && (
<pre className="json-viewer__content">
{JSON.stringify(data, null, 2)}
</pre>
)}
</div>
);
}
// ── Event Log ────────────────────────────────────────────────────────────────
function EventLog({ videoId }: { videoId: string }) {
const [events, setEvents] = useState<PipelineEvent[]>([]);
const [total, setTotal] = useState(0);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [offset, setOffset] = useState(0);
const limit = 50;
const load = useCallback(async () => {
setLoading(true);
setError(null);
try {
const res = await fetchPipelineEvents(videoId, { offset, limit });
setEvents(res.items);
setTotal(res.total);
} catch (err) {
setError(err instanceof Error ? err.message : "Failed to load events");
} finally {
setLoading(false);
}
}, [videoId, offset]);
useEffect(() => {
void load();
}, [load]);
if (loading) return <div className="loading">Loading events</div>;
if (error) return <div className="loading error-text">Error: {error}</div>;
if (events.length === 0) return <div className="pipeline-events__empty">No events recorded.</div>;
const hasNext = offset + limit < total;
const hasPrev = offset > 0;
return (
<div className="pipeline-events">
<div className="pipeline-events__header">
<span className="pipeline-events__count">{total} event{total !== 1 ? "s" : ""}</span>
<button className="btn btn--small btn--secondary" onClick={() => void load()}> Refresh</button>
</div>
<div className="pipeline-events__list">
{events.map((evt) => (
<div key={evt.id} className={`pipeline-event pipeline-event--${evt.event_type}`}>
<div className="pipeline-event__row">
<span className="pipeline-event__icon">{eventTypeIcon(evt.event_type)}</span>
<span className="pipeline-event__stage">{evt.stage}</span>
<span className={`pipeline-badge pipeline-badge--event-${evt.event_type}`}>
{evt.event_type}
</span>
{evt.model && <span className="pipeline-event__model">{evt.model}</span>}
{evt.total_tokens != null && evt.total_tokens > 0 && (
<span className="pipeline-event__tokens" title={`prompt: ${evt.prompt_tokens ?? 0} / completion: ${evt.completion_tokens ?? 0}`}>
{formatTokens(evt.total_tokens)} tok
</span>
)}
{evt.duration_ms != null && (
<span className="pipeline-event__duration">{evt.duration_ms}ms</span>
)}
<span className="pipeline-event__time">{formatDate(evt.created_at)}</span>
</div>
<JsonViewer data={evt.payload} />
</div>
))}
</div>
{(hasPrev || hasNext) && (
<div className="pipeline-events__pager">
<button
className="btn btn--small btn--secondary"
disabled={!hasPrev}
onClick={() => setOffset((o) => Math.max(0, o - limit))}
>
Prev
</button>
<span className="pipeline-events__pager-info">
{offset + 1}{Math.min(offset + limit, total)} of {total}
</span>
<button
className="btn btn--small btn--secondary"
disabled={!hasNext}
onClick={() => setOffset((o) => o + limit)}
>
Next
</button>
</div>
)}
</div>
);
}
// ── Worker Status ────────────────────────────────────────────────────────────
function WorkerStatus() {
const [status, setStatus] = useState<WorkerStatusResponse | null>(null);
const [error, setError] = useState<string | null>(null);
const load = useCallback(async () => {
try {
setError(null);
const res = await fetchWorkerStatus();
setStatus(res);
} catch (err) {
setError(err instanceof Error ? err.message : "Failed");
}
}, []);
useEffect(() => {
void load();
const id = setInterval(() => void load(), 15_000);
return () => clearInterval(id);
}, [load]);
if (error) {
return (
<div className="worker-status worker-status--error">
<span className="worker-status__dot worker-status__dot--offline" />
Worker: error ({error})
</div>
);
}
if (!status) {
return (
<div className="worker-status">
<span className="worker-status__dot worker-status__dot--unknown" />
Worker: checking
</div>
);
}
return (
<div className={`worker-status ${status.online ? "worker-status--online" : "worker-status--offline"}`}>
<span className={`worker-status__dot ${status.online ? "worker-status__dot--online" : "worker-status__dot--offline"}`} />
<span className="worker-status__label">
{status.online ? `${status.workers.length} worker${status.workers.length !== 1 ? "s" : ""} online` : "Workers offline"}
</span>
{status.workers.map((w) => (
<span key={w.name} className="worker-status__detail" title={w.name}>
{w.active_tasks.length > 0
? `${w.active_tasks.length} active`
: "idle"}
{w.pool_size != null && ` · pool ${w.pool_size}`}
</span>
))}
</div>
);
}
// ── Main Page ────────────────────────────────────────────────────────────────
export default function AdminPipeline() {
const [videos, setVideos] = useState<PipelineVideoItem[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [expandedId, setExpandedId] = useState<string | null>(null);
const [actionLoading, setActionLoading] = useState<string | null>(null);
const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null);
const load = useCallback(async () => {
setLoading(true);
setError(null);
try {
const res = await fetchPipelineVideos();
setVideos(res.items);
} catch (err) {
setError(err instanceof Error ? err.message : "Failed to load videos");
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
void load();
}, [load]);
const handleTrigger = async (videoId: string) => {
setActionLoading(videoId);
setActionMessage(null);
try {
const res = await triggerPipeline(videoId);
setActionMessage({ id: videoId, text: `Triggered (${res.status})`, ok: true });
// Refresh after short delay to let status update
setTimeout(() => void load(), 2000);
} catch (err) {
setActionMessage({
id: videoId,
text: err instanceof Error ? err.message : "Trigger failed",
ok: false,
});
} finally {
setActionLoading(null);
}
};
const handleRevoke = async (videoId: string) => {
setActionLoading(videoId);
setActionMessage(null);
try {
const res = await revokePipeline(videoId);
setActionMessage({
id: videoId,
text: res.tasks_revoked > 0
? `Revoked ${res.tasks_revoked} task${res.tasks_revoked !== 1 ? "s" : ""}`
: "No active tasks",
ok: true,
});
setTimeout(() => void load(), 2000);
} catch (err) {
setActionMessage({
id: videoId,
text: err instanceof Error ? err.message : "Revoke failed",
ok: false,
});
} finally {
setActionLoading(null);
}
};
const toggleExpand = (id: string) => {
setExpandedId((prev) => (prev === id ? null : id));
};
return (
<div className="admin-pipeline">
<div className="admin-pipeline__header">
<div>
<h2 className="admin-pipeline__title">Pipeline Management</h2>
<p className="admin-pipeline__subtitle">
{videos.length} video{videos.length !== 1 ? "s" : ""}
</p>
</div>
<div className="admin-pipeline__header-right">
<WorkerStatus />
<button className="btn btn--secondary" onClick={() => void load()} disabled={loading}>
Refresh
</button>
</div>
</div>
{loading ? (
<div className="loading">Loading videos</div>
) : error ? (
<div className="loading error-text">Error: {error}</div>
) : videos.length === 0 ? (
<div className="empty-state">No videos in pipeline.</div>
) : (
<div className="admin-pipeline__list">
{videos.map((video) => (
<div key={video.id} className="pipeline-video">
<div
className="pipeline-video__header"
onClick={() => toggleExpand(video.id)}
>
<div className="pipeline-video__info">
<span className="pipeline-video__filename" title={video.filename}>
{video.filename}
</span>
<span className="pipeline-video__creator">{video.creator_name}</span>
</div>
<div className="pipeline-video__meta">
<span className={`pipeline-badge ${statusBadgeClass(video.processing_status)}`}>
{video.processing_status}
</span>
<span className="pipeline-video__stat" title="Events">
{video.event_count} events
</span>
<span className="pipeline-video__stat" title="Total tokens used">
{formatTokens(video.total_tokens_used)} tokens
</span>
<span className="pipeline-video__time">
{formatDate(video.last_event_at)}
</span>
</div>
<div className="pipeline-video__actions" onClick={(e) => e.stopPropagation()}>
<button
className="btn btn--small btn--primary"
onClick={() => void handleTrigger(video.id)}
disabled={actionLoading === video.id}
title="Retrigger pipeline"
>
{actionLoading === video.id ? "…" : "▶ Trigger"}
</button>
<button
className="btn btn--small btn--danger"
onClick={() => void handleRevoke(video.id)}
disabled={actionLoading === video.id}
title="Revoke active tasks"
>
{actionLoading === video.id ? "…" : "■ Revoke"}
</button>
</div>
</div>
{actionMessage?.id === video.id && (
<div className={`pipeline-video__message ${actionMessage.ok ? "pipeline-video__message--ok" : "pipeline-video__message--err"}`}>
{actionMessage.text}
</div>
)}
{expandedId === video.id && (
<div className="pipeline-video__detail">
<div className="pipeline-video__detail-meta">
<span>ID: {video.id.slice(0, 8)}</span>
{video.content_hash && (
<span title={video.content_hash}>
Hash: <code>{video.content_hash.slice(0, 12)}</code>
</span>
)}
<span>Created: {formatDate(video.created_at)}</span>
<span>Updated: {formatDate(video.updated_at)}</span>
</div>
<EventLog videoId={video.id} />
</div>
)}
</div>
))}
</div>
)}
</div>
);
}

View file

@ -371,7 +371,9 @@ export default function TechniquePage() {
/> />
)} )}
{/* Summary */} <div className="technique-columns">
<div className="technique-columns__main">
{/* Summary */}
{displaySummary && ( {displaySummary && (
<section className="technique-summary"> <section className="technique-summary">
<p>{displaySummary}</p> <p>{displaySummary}</p>
@ -401,7 +403,9 @@ export default function TechniquePage() {
</section> </section>
)} )}
{/* Key moments (always from live data — not versioned) */} </div>
<div className="technique-columns__sidebar">
{/* Key moments (always from live data — not versioned) */}
{technique.key_moments.length > 0 && ( {technique.key_moments.length > 0 && (
<section className="technique-moments"> <section className="technique-moments">
<h2>Key Moments</h2> <h2>Key Moments</h2>
@ -500,6 +504,8 @@ export default function TechniquePage() {
</ul> </ul>
</section> </section>
)} )}
</div>
</div>
</article> </article>
); );
} }

View file

@ -1 +1 @@
{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/client.ts","./src/api/public-client.ts","./src/components/ModeToggle.tsx","./src/components/StatusBadge.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/MomentDetail.tsx","./src/pages/ReviewQueue.tsx","./src/pages/SearchResults.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx"],"version":"5.6.3"} {"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/api/client.ts","./src/api/public-client.ts","./src/components/ModeToggle.tsx","./src/components/ReportIssueModal.tsx","./src/components/StatusBadge.tsx","./src/pages/AdminPipeline.tsx","./src/pages/AdminReports.tsx","./src/pages/CreatorDetail.tsx","./src/pages/CreatorsBrowse.tsx","./src/pages/Home.tsx","./src/pages/MomentDetail.tsx","./src/pages/ReviewQueue.tsx","./src/pages/SearchResults.tsx","./src/pages/TechniquePage.tsx","./src/pages/TopicsBrowse.tsx"],"version":"5.6.3"}