/** * Pipeline admin dashboard — video list with status, retrigger/revoke, * expandable event log with token usage and collapsible JSON viewer. */ import React, { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useSearchParams } from "react-router-dom"; import { useDocumentTitle } from "../hooks/useDocumentTitle"; import { fetchPipelineVideos, fetchPipelineEvents, fetchPipelineRuns, fetchWorkerStatus, fetchDebugMode, setDebugMode, triggerPipeline, revokePipeline, cleanRetriggerPipeline, rerunStage, fetchChunkingData, fetchStalePages, bulkResynthesize, wipeAllOutput, fetchCreators, fetchRecentActivity, type PipelineVideoItem, type PipelineEvent, type PipelineRunItem, type WorkerStatusResponse, type RecentActivityItem, } from "../api/public-client"; // ── Helpers ────────────────────────────────────────────────────────────────── function formatDate(iso: string | null): string { if (!iso) return "—"; return new Date(iso).toLocaleString(undefined, { month: "short", day: "numeric", hour: "2-digit", minute: "2-digit", second: "2-digit", }); } function formatTokens(n: number): string { if (n === 0) return "0"; if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`; if (n >= 1_000) return `${(n / 1_000).toFixed(1)}k`; return String(n); } function formatElapsed(isoStart: string | null): string { if (!isoStart) return ""; const ms = Date.now() - new Date(isoStart).getTime(); if (ms < 0) return ""; const secs = Math.floor(ms / 1000); if (secs < 60) return `${secs}s`; const mins = Math.floor(secs / 60); const remainSecs = secs % 60; if (mins < 60) return `${mins}m ${remainSecs}s`; const hrs = Math.floor(mins / 60); return `${hrs}h ${mins % 60}m`; } const STATUS_LABELS: Record = { not_started: "Not Started", queued: "Queued", processing: "In Progress", error: "Errored", complete: "Complete", }; function statusBadgeClass(status: string): string { switch (status) { case "complete": return "pipeline-badge--success"; case "processing": return "pipeline-badge--active"; case "error": return "pipeline-badge--error"; case "not_started": case "queued": return "pipeline-badge--pending"; default: return ""; } } function eventTypeIcon(eventType: string): string { switch (eventType) { case "start": return "▶"; case "complete": return "✓"; case "error": return "✗"; case "llm_call": return "⚙"; default: return "·"; } } // ── Collapsible JSON ───────────────────────────────────────────────────────── function JsonViewer({ data }: { data: Record | null }) { const [open, setOpen] = useState(false); if (!data || Object.keys(data).length === 0) return null; return (
{open && (
          {JSON.stringify(data, null, 2)}
        
)}
); } // ── Debug Payload Viewer ───────────────────────────────────────────────────── interface DebugSection { label: string; content: string; } function DebugPayloadViewer({ event }: { event: PipelineEvent }) { const sections: DebugSection[] = []; if (event.payload?.request_params) sections.push({ label: "Request Params", content: JSON.stringify(event.payload.request_params, null, 2) }); if (event.system_prompt_text) sections.push({ label: "System Prompt", content: event.system_prompt_text }); if (event.user_prompt_text) sections.push({ label: "User Prompt", content: event.user_prompt_text }); if (event.response_text) sections.push({ label: "Response", content: event.response_text }); const [openSections, setOpenSections] = useState>({}); const [copiedKey, setCopiedKey] = useState(null); const SECTION_TOOLTIPS: Record = { "System Prompt": "Instructions sent to the LLM defining its role and output format for this pipeline stage", "User Prompt": "The actual transcript content and extraction request sent to the LLM", "Response": "Raw LLM output before parsing — the extracted data that feeds the next stage", }; if (sections.length === 0) return null; const toggleSection = (label: string) => { setOpenSections((prev) => ({ ...prev, [label]: !prev[label] })); }; const copyToClipboard = async (label: string, text: string) => { try { await navigator.clipboard.writeText(text); setCopiedKey(label); setTimeout(() => setCopiedKey(null), 1500); } catch { // Fallback for non-HTTPS contexts const ta = document.createElement("textarea"); ta.value = text; ta.style.position = "fixed"; ta.style.opacity = "0"; document.body.appendChild(ta); ta.select(); document.execCommand("copy"); document.body.removeChild(ta); setCopiedKey(label); setTimeout(() => setCopiedKey(null), 1500); } }; const exportAsJson = () => { // Dump everything — full rawdog debug payload with request params const data: Record = { event_id: event.id, stage: event.stage, event_type: event.event_type, model: event.model, prompt_tokens: event.prompt_tokens, completion_tokens: event.completion_tokens, total_tokens: event.total_tokens, duration_ms: event.duration_ms, created_at: event.created_at, payload: event.payload, request_params: event.payload?.request_params ?? null, system_prompt_text: event.system_prompt_text, user_prompt_text: event.user_prompt_text, response_text: event.response_text, }; const blob = new Blob([JSON.stringify(data, null, 2)], { type: "application/json" }); const url = URL.createObjectURL(blob); const a = document.createElement("a"); a.href = url; a.download = `debug-${event.stage}-${event.id.slice(0, 8)}.json`; a.click(); URL.revokeObjectURL(url); }; return (
LLM Debug
{sections.map((sec) => { const isOpen = !!openSections[sec.label]; return (
{isOpen && (
{sec.content}
)}
); })}
); } // ── Event Log ──────────────────────────────────────────────────────────────── function EventLog({ videoId, status, runId }: { videoId: string; status: string; runId?: string }) { const [events, setEvents] = useState([]); const [total, setTotal] = useState(0); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const [offset, setOffset] = useState(0); const [viewMode, setViewMode] = useState<"head" | "tail">("tail"); const limit = 50; const initialLoaded = useRef(false); const load = useCallback(async (silent = false) => { if (!silent) setLoading(true); setError(null); try { const res = await fetchPipelineEvents(videoId, { offset, limit, order: viewMode === "head" ? "asc" : "desc", run_id: runId, }); setEvents(res.items); setTotal(res.total); initialLoaded.current = true; } catch (err) { if (!silent) setError(err instanceof Error ? err.message : "Failed to load events"); } finally { if (!silent) setLoading(false); } }, [videoId, offset, viewMode]); useEffect(() => { void load(); }, [load]); // Auto-refresh when video is actively processing useEffect(() => { if (status !== "processing" && status !== "queued") return; const id = setInterval(() => void load(true), 10_000); return () => clearInterval(id); }, [status, load]); if (loading) return
Loading events…
; if (error) return
Error: {error}
; if (events.length === 0) return
No events recorded.
; const hasNext = offset + limit < total; const hasPrev = offset > 0; return (
{total} event{total !== 1 ? "s" : ""}
{events.map((evt) => (
{eventTypeIcon(evt.event_type)} {evt.stage} {evt.event_type} {typeof evt.payload?.context === "string" && ( {evt.payload.context} )} {evt.model && {evt.model}} {evt.total_tokens != null && evt.total_tokens > 0 && ( {formatTokens(evt.total_tokens)} tok )} {evt.duration_ms != null && ( {evt.duration_ms}ms )} {formatDate(evt.created_at)}
))}
{(hasPrev || hasNext) && (
{offset + 1}–{Math.min(offset + limit, total)} of {total}
)}
); } // ── Worker Status ──────────────────────────────────────────────────────────── const WorkerStatus = React.memo(function WorkerStatus() { const [status, setStatus] = useState(null); const [error, setError] = useState(null); const load = useCallback(async () => { try { setError(null); const res = await fetchWorkerStatus(); setStatus(res); } catch (err) { setError(err instanceof Error ? err.message : "Failed"); } }, []); useEffect(() => { void load(); const id = setInterval(() => void load(), 15_000); return () => clearInterval(id); }, [load]); if (error) { return (
Worker: error ({error})
); } if (!status) { return (
Worker: checking…
); } return (
{status.online ? `${status.workers.length} worker${status.workers.length !== 1 ? "s" : ""} online` : "Workers offline"} {status.workers.map((w) => ( {w.active_tasks.length > 0 ? `${w.active_tasks.length} active` : "idle"} {w.pool_size != null && ` · pool ${w.pool_size}`} ))}
); }); // ── Debug Mode Toggle ──────────────────────────────────────────────────────── function DebugModeToggle({ debugMode, onDebugModeChange, }: { debugMode: boolean | null; onDebugModeChange: (mode: boolean) => void; }) { const [debugLoading, setDebugLoading] = useState(false); async function handleToggle() { if (debugMode === null || debugLoading) return; setDebugLoading(true); try { const res = await setDebugMode(!debugMode); onDebugModeChange(res.debug_mode); } catch { // swallow — leave previous state } finally { setDebugLoading(false); } } if (debugMode === null) return null; return (
Debug {debugMode ? "On" : "Off"}
); } // ── Status Filter ──────────────────────────────────────────────────────────── const StatusFilter = React.memo(function StatusFilter({ videos, activeFilter, onFilterChange, }: { videos: PipelineVideoItem[]; activeFilter: string | null; onFilterChange: (filter: string | null) => void; }) { // Fixed display order for pipeline lifecycle const STATUS_ORDER = ["not_started", "queued", "processing", "error", "complete"]; const present = new Set(videos.map((v) => v.processing_status)); const ordered = STATUS_ORDER.filter((s) => present.has(s)); if (ordered.length <= 1) return null; return (
{ordered.map((status) => { const count = videos.filter((v) => v.processing_status === status).length; return ( ); })}
); }); // ── Stage Tab View ─────────────────────────────────────────────────────────── const STAGE_TAB_ORDER = [ { key: "stage2_segmentation", label: "Segment", short: "S2" }, { key: "stage3_extraction", label: "Extract", short: "S3" }, { key: "stage4_classification", label: "Classify", short: "S4" }, { key: "stage5_synthesis", label: "Synthesize", short: "S5" }, { key: "stage6_embed_and_index", label: "Embed", short: "S6" }, ]; function stageTabStatus(events: PipelineEvent[]): "idle" | "running" | "done" | "error" { if (events.length === 0) return "idle"; const hasError = events.some((e) => e.event_type === "error"); if (hasError) return "error"; const hasComplete = events.some((e) => e.event_type === "complete"); if (hasComplete) return "done"; const hasStart = events.some((e) => e.event_type === "start"); if (hasStart) return "running"; return "running"; // llm_call without start — treat as running } function StageTabView({ videoId, runId, status }: { videoId: string; runId: string; status: string }) { const [events, setEvents] = useState([]); const [loading, setLoading] = useState(true); const [activeTab, setActiveTab] = useState(null); const initialSelect = useRef(false); const load = useCallback(async (silent = false) => { if (!silent) setLoading(true); try { const res = await fetchPipelineEvents(videoId, { offset: 0, limit: 200, order: "asc", run_id: runId, }); setEvents(res.items); // Auto-select tab on first load only if (!initialSelect.current && res.items.length > 0) { initialSelect.current = true; const stages = new Set(res.items.map((e) => e.stage)); const errorStage = res.items.find((e) => e.event_type === "error")?.stage; if (errorStage) { setActiveTab(errorStage); } else { const lastStage = [...STAGE_TAB_ORDER].reverse().find((s) => stages.has(s.key)); if (lastStage) setActiveTab(lastStage.key); } } } catch { // silent } finally { if (!silent) setLoading(false); } }, [videoId, runId]); useEffect(() => { void load(); }, [load]); // Auto-refresh while processing useEffect(() => { if (status !== "running") return; const id = setInterval(() => void load(true), 8_000); return () => clearInterval(id); }, [status, load]); if (loading) return
Loading stages…
; // Group events by stage const byStage = new Map(); for (const evt of events) { const list = byStage.get(evt.stage) ?? []; list.push(evt); byStage.set(evt.stage, list); } const tabEvents = activeTab ? (byStage.get(activeTab) ?? []) : []; // Compute summary stats for active tab const tabTokens = tabEvents.reduce((sum, e) => sum + (e.total_tokens ?? 0), 0); const tabLlmCalls = tabEvents.filter((e) => e.event_type === "llm_call").length; const tabDuration = (() => { const starts = tabEvents.filter((e) => e.event_type === "start"); const ends = tabEvents.filter((e) => e.event_type === "complete" || e.event_type === "error"); if (starts.length > 0 && ends.length > 0) { const endEvt = ends[ends.length - 1]; const startEvt = starts[0]; if (endEvt && startEvt && endEvt.created_at && startEvt.created_at) { const ms = new Date(endEvt.created_at).getTime() - new Date(startEvt.created_at).getTime(); if (ms > 0) return formatElapsed(startEvt.created_at); } } return null; })(); return (
{STAGE_TAB_ORDER.map((stage) => { const stageEvents = byStage.get(stage.key) ?? []; const tabStatus = stageTabStatus(stageEvents); const isActive = activeTab === stage.key; return ( ); })}
{activeTab && tabEvents.length > 0 && (
{tabLlmCalls > 0 && ( {tabLlmCalls} LLM call{tabLlmCalls !== 1 ? "s" : ""} )} {tabTokens > 0 && ( {formatTokens(tabTokens)} tokens )} {tabDuration && ( {tabDuration} )}
{tabEvents.map((evt) => (
{eventTypeIcon(evt.event_type)} {evt.event_type} {typeof evt.payload?.context === "string" && ( {evt.payload.context} )} {evt.model && {evt.model}} {evt.total_tokens != null && evt.total_tokens > 0 && ( {formatTokens(evt.total_tokens)} tok )} {evt.duration_ms != null && ( {evt.duration_ms}ms )} {formatDate(evt.created_at)}
{evt.event_type === "error" && evt.payload?.error != null && (
{`${evt.payload.error}`.slice(0, 500)}
)}
))}
)} {activeTab && tabEvents.length === 0 && (
Stage not started yet.
)}
); } // ── Run List ───────────────────────────────────────────────────────────────── const TRIGGER_LABELS: Record = { manual: "Manual", clean_reprocess: "Clean Reprocess", auto_ingest: "Auto Ingest", bulk: "Bulk", }; const RUN_STATUS_CLASS: Record = { running: "pipeline-badge--active", complete: "pipeline-badge--success", error: "pipeline-badge--error", cancelled: "pipeline-badge--pending", }; function RunList({ videoId, videoStatus }: { videoId: string; videoStatus: string }) { const [runs, setRuns] = useState([]); const [legacyCount, setLegacyCount] = useState(0); const [loading, setLoading] = useState(true); const [expandedRunId, setExpandedRunId] = useState(null); const [showLegacy, setShowLegacy] = useState(false); const load = useCallback(async (silent = false) => { if (!silent) setLoading(true); try { const res = await fetchPipelineRuns(videoId); setRuns(res.items); setLegacyCount(res.legacy_event_count); // Auto-expand the latest run on first load if (!silent && res.items.length > 0 && expandedRunId === null) { const firstRun = res.items[0]; if (firstRun) setExpandedRunId(firstRun.id); } } catch { // silently fail } finally { if (!silent) setLoading(false); } }, [videoId, expandedRunId]); useEffect(() => { void load(); }, [load]); // Auto-refresh when video is processing useEffect(() => { if (videoStatus !== "processing" && videoStatus !== "queued") return; const id = setInterval(() => void load(true), 10_000); return () => clearInterval(id); }, [videoStatus, load]); if (loading) return
Loading runs…
; if (runs.length === 0 && legacyCount === 0) { return
No pipeline runs recorded.
; } return (
{runs.map((run) => { const isExpanded = expandedRunId === run.id; return (
{isExpanded && (
)}
); })} {legacyCount > 0 && (
{showLegacy && (
)}
)}
); } // ── Stage Timeline ─────────────────────────────────────────────────────────── const PIPELINE_STAGES = [ { key: "stage2_segmentation", label: "Segment" }, { key: "stage3_extraction", label: "Extract" }, { key: "stage4_classification", label: "Classify" }, { key: "stage5_synthesis", label: "Synthesize" }, { key: "stage6_embed", label: "Embed" }, ]; const StageTimeline = React.memo(function StageTimeline({ video }: { video: PipelineVideoItem }) { const [, setTick] = useState(0); const isProcessing = video.processing_status === "processing"; // Tick every second while processing to update elapsed time useEffect(() => { if (!isProcessing) return; const id = setInterval(() => setTick((t) => t + 1), 1000); return () => clearInterval(id); }, [isProcessing]); if (!isProcessing && video.processing_status !== "complete" && video.processing_status !== "error") { return null; } const activeStage = video.active_stage; const activeStatus = video.active_stage_status; const activeIdx = PIPELINE_STAGES.findIndex((s) => s.key === activeStage); return (
{PIPELINE_STAGES.map((stage, i) => { let stateClass = "stage-timeline__step--future"; if (activeIdx >= 0) { if (i < activeIdx) { stateClass = "stage-timeline__step--done"; } else if (i === activeIdx) { if (activeStatus === "start") stateClass = "stage-timeline__step--active"; else if (activeStatus === "complete") stateClass = "stage-timeline__step--done"; else if (activeStatus === "error") stateClass = "stage-timeline__step--error"; } } if (video.processing_status === "complete") { stateClass = "stage-timeline__step--done"; } return (
{stage.label}
); })} {isProcessing && activeStatus === "start" && ( {formatElapsed(video.stage_started_at)} )} {isProcessing && video.latest_run && video.latest_run.total_tokens > 0 && ( {formatTokens(video.latest_run.total_tokens)} tok )}
); }) // ── Recent Activity Feed ───────────────────────────────────────────────────── const RecentActivityFeed = React.memo(function RecentActivityFeed() { const [items, setItems] = useState([]); const [collapsed, setCollapsed] = useState(false); const [loading, setLoading] = useState(true); const load = useCallback(async () => { try { const res = await fetchRecentActivity(8); setItems(res.items); } catch { // silently fail } finally { setLoading(false); } }, []); useEffect(() => { void load(); const id = setInterval(() => void load(), 15_000); return () => clearInterval(id); }, [load]); if (loading || items.length === 0) return null; return (
{!collapsed && (
{items.map((item) => (
{item.event_type === "complete" ? "✓" : "✗"} {item.stage.replace("stage", "S").replace("_", " ")} {item.filename} {item.creator_name} {item.duration_ms != null && ( {item.duration_ms > 60000 ? `${(item.duration_ms / 60000).toFixed(1)}m` : item.duration_ms > 1000 ? `${(item.duration_ms / 1000).toFixed(1)}s` : `${item.duration_ms}ms`} )} {formatDate(item.created_at)}
))}
)}
); }); // ── Main Page ──────────────────────────────────────────────────────────────── interface BulkProgress { total: number; completed: number; failed: number; current: string | null; active: boolean; } interface BulkLogEntry { filename: string; ok: boolean; message: string; } // ── Chunking Inspector ───────────────────────────────────────────────────── function ChunkingInspector({ videoId }: { videoId: string }) { const [data, setData] = useState> | null>(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const [expanded, setExpanded] = useState(false); useEffect(() => { if (!expanded) return; let cancelled = false; setLoading(true); fetchChunkingData(videoId) .then((res) => { if (!cancelled) { setData(res); setError(null); } }) .catch((err) => { if (!cancelled) setError(err instanceof Error ? err.message : "Failed"); }) .finally(() => { if (!cancelled) setLoading(false); }); return () => { cancelled = true; }; }, [videoId, expanded]); return (
{expanded && loading &&
Loading chunking data...
} {expanded && error &&
{error}
} {expanded && data && (
{data.total_segments} segments {data.total_moments} moments Classification: {data.classification_source} Chunk size: {data.synthesis_chunk_size}
{/* Topic Boundaries */}

Topic Boundaries (Stage 2)

{data.topic_boundaries.map((tb, i) => (
{tb.topic_label} {tb.segment_count} segs · {tb.start_time.toFixed(0)}s-{tb.end_time.toFixed(0)}s
))}
{/* Synthesis Groups */}

Synthesis Groups (Stage 5 Input)

{data.synthesis_groups.map((g) => (
{g.category} {g.moment_count} moments {g.exceeds_chunk_threshold && ( Will split into {g.chunks_needed} chunks )}
))}
)}
); } export default function AdminPipeline() { useDocumentTitle("Pipeline Management — Chrysopedia"); const [searchParams] = useSearchParams(); const [videos, setVideos] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const [expandedId, setExpandedId] = useState(null); const [actionLoading, setActionLoading] = useState(null); const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null); const [activeFilter, setActiveFilter] = useState(null); const [debugMode, setDebugModeState] = useState(null); const [creators, setCreators] = useState<{ name: string; slug: string }[]>([]); const [creatorFilter, setCreatorFilter] = useState(null); const [searchQuery, setSearchQuery] = useState(""); const [selectedIds, setSelectedIds] = useState>(new Set()); // Re-run stage modal state const [rerunModalVideo, setRerunModalVideo] = useState(null); const [rerunStageSelect, setRerunStageSelect] = useState("stage5_synthesis"); const [rerunPromptOverride, setRerunPromptOverride] = useState(""); // Stale pages state const [stalePagesCount, setStalePagesCount] = useState(null); const [bulkProgress, setBulkProgress] = useState(null); const [bulkLog, setBulkLog] = useState([]); const [changedIds, setChangedIds] = useState>(new Set()); const [autoRefresh, setAutoRefresh] = useState(true); const bulkCancelRef = useRef(false); const prevStatusRef = useRef>(new Map()); const videoRefs = useRef>(new Map()); const deepLinked = useRef(false); // Compute filtered list (status + creator) — memoized to avoid recomputing on unrelated state changes const filteredVideos = useMemo(() => videos.filter((v) => { if (activeFilter !== null && v.processing_status !== activeFilter) return false; if (creatorFilter !== null && v.creator_name !== creatorFilter) return false; if (searchQuery) { const q = searchQuery.toLowerCase(); if (!v.filename.toLowerCase().includes(q) && !v.creator_name.toLowerCase().includes(q)) return false; } return true; }), [videos, activeFilter, creatorFilter, searchQuery]); const load = useCallback(async (isAutoRefresh = false) => { if (!isAutoRefresh) setLoading(true); setError(null); try { const res = await fetchPipelineVideos(); // Detect status changes for highlight animation if (prevStatusRef.current.size > 0) { const changed = new Set(); for (const v of res.items) { const prev = prevStatusRef.current.get(v.id); if (prev && prev !== v.processing_status) { changed.add(v.id); } } if (changed.size > 0) { setChangedIds(changed); // Clear highlights after animation setTimeout(() => setChangedIds(new Set()), 2000); } } // Store current statuses for next comparison const statusMap = new Map(); for (const v of res.items) { statusMap.set(v.id, v.processing_status); } prevStatusRef.current = statusMap; setVideos(res.items); } catch (err) { if (!isAutoRefresh) { setError(err instanceof Error ? err.message : "Failed to load videos"); } } finally { if (!isAutoRefresh) setLoading(false); } }, []); useEffect(() => { void load(); }, [load]); // Auto-refresh every 15 seconds useEffect(() => { if (!autoRefresh) return; const id = setInterval(() => void load(true), 15_000); return () => clearInterval(id); }, [autoRefresh, load]); // Load creators for filter dropdown useEffect(() => { fetchCreators({ limit: 200 }) .then((res) => { const list = res.items.map((c: { name: string; slug: string }) => ({ name: c.name, slug: c.slug, })); list.sort((a: { name: string }, b: { name: string }) => a.name.localeCompare(b.name)); setCreators(list); }) .catch(() => { // silently fail — filter stays hidden }); }, []); // Load stale page count useEffect(() => { fetchStalePages() .then((res) => setStalePagesCount(res.stale_pages)) .catch(() => { /* silently fail */ }); }, []); const handleBulkResynth = async () => { if (!confirm("Re-run stage 5 synthesis on all completed videos with the current prompt?")) return; try { const res = await bulkResynthesize(); setActionMessage({ id: "__bulk__", text: `Bulk re-synthesize dispatched: ${res.dispatched}/${res.total} videos`, ok: true, }); setStalePagesCount(null); // will refresh } catch (err) { setActionMessage({ id: "__bulk__", text: err instanceof Error ? err.message : "Bulk re-synth failed", ok: false, }); } }; const handleWipeAll = async () => { if (!confirm("WIPE ALL pipeline output? This deletes all technique pages, key moments, pipeline events, runs, and Qdrant vectors. Creators, videos, and transcripts are preserved. This cannot be undone.")) return; if (!confirm("Are you sure? This is irreversible.")) return; try { const res = await wipeAllOutput(); setActionMessage({ id: "__wipe__", text: `Wiped: ${JSON.stringify(res.deleted)}`, ok: true, }); setStalePagesCount(null); void load(); } catch (err) { setActionMessage({ id: "__wipe__", text: err instanceof Error ? err.message : "Wipe failed", ok: false, }); } }; // Deep-link: auto-expand and scroll to ?video= on first load useEffect(() => { if (deepLinked.current || loading || videos.length === 0) return; const targetVideoId = searchParams.get("video"); if (!targetVideoId) return; const match = videos.find((v) => v.id === targetVideoId); if (!match) return; deepLinked.current = true; setExpandedId(targetVideoId); requestAnimationFrame(() => { const el = videoRefs.current.get(targetVideoId); if (el) el.scrollIntoView({ behavior: "smooth", block: "start" }); }); }, [loading, videos, searchParams]); useEffect(() => { let cancelled = false; fetchDebugMode() .then((res) => { if (!cancelled) setDebugModeState(res.debug_mode); }) .catch(() => {}); return () => { cancelled = true; }; }, []); // Clear selection when filters change useEffect(() => { setSelectedIds(new Set()); }, [activeFilter, creatorFilter, searchQuery]); const handleTrigger = async (videoId: string) => { setActionLoading(videoId); setActionMessage(null); try { const res = await cleanRetriggerPipeline(videoId); setActionMessage({ id: videoId, text: `Retriggered (${res.status})`, ok: true }); setTimeout(() => void load(), 2000); } catch (err) { setActionMessage({ id: videoId, text: err instanceof Error ? err.message : "Retrigger failed", ok: false, }); } finally { setActionLoading(null); } }; const handleRevoke = async (videoId: string) => { setActionLoading(videoId); setActionMessage(null); try { const res = await revokePipeline(videoId); setActionMessage({ id: videoId, text: res.tasks_revoked > 0 ? `Revoked ${res.tasks_revoked} task${res.tasks_revoked !== 1 ? "s" : ""}` : "No active tasks", ok: true, }); setTimeout(() => void load(), 2000); } catch (err) { setActionMessage({ id: videoId, text: err instanceof Error ? err.message : "Revoke failed", ok: false, }); } finally { setActionLoading(null); } }; const handleRerunStage = async (videoId: string) => { setActionLoading(videoId); setActionMessage(null); try { const res = await rerunStage( videoId, rerunStageSelect, rerunPromptOverride.trim() || undefined, ); setActionMessage({ id: videoId, text: `Stage re-run dispatched: ${res.stage}${res.prompt_override ? " (with prompt override)" : ""}`, ok: true, }); setRerunModalVideo(null); setRerunPromptOverride(""); setTimeout(() => void load(), 2000); } catch (err) { setActionMessage({ id: videoId, text: err instanceof Error ? err.message : "Stage re-run failed", ok: false, }); } finally { setActionLoading(null); } }; const toggleExpand = (id: string) => { setExpandedId((prev) => (prev === id ? null : id)); }; // ── Selection ─────────────────────────────────────────────────────────── const toggleSelect = (id: string) => { setSelectedIds((prev) => { const next = new Set(prev); if (next.has(id)) next.delete(id); else next.add(id); return next; }); }; const toggleSelectAll = () => { const visibleIds = filteredVideos.map((v) => v.id); const allSelected = visibleIds.every((id) => selectedIds.has(id)); if (allSelected) { setSelectedIds(new Set()); } else { setSelectedIds(new Set(visibleIds)); } }; const allVisibleSelected = filteredVideos.length > 0 && filteredVideos.every((v) => selectedIds.has(v.id)); // ── Bulk Actions ──────────────────────────────────────────────────────── const runBulk = async (clean: boolean) => { const ids = Array.from(selectedIds); if (ids.length === 0) return; bulkCancelRef.current = false; setBulkLog([]); setBulkProgress({ total: ids.length, completed: 0, failed: 0, current: null, active: true }); let completed = 0; let failed = 0; for (const id of ids) { if (bulkCancelRef.current) break; const video = videos.find((v) => v.id === id); const filename = video?.filename ?? id; setBulkProgress((p) => p ? { ...p, current: filename } : null); try { if (clean) { await cleanRetriggerPipeline(id); } else { await triggerPipeline(id); } completed++; setBulkLog((prev) => [...prev, { filename, ok: true, message: clean ? "clean retriggered" : "triggered" }]); } catch (err) { failed++; setBulkLog((prev) => [...prev, { filename, ok: false, message: err instanceof Error ? err.message : "failed", }]); } setBulkProgress((p) => p ? { ...p, completed: completed, failed } : null); // Brief pause between dispatches to avoid slamming the API if (!bulkCancelRef.current && ids.indexOf(id) < ids.length - 1) { await new Promise((r) => setTimeout(r, 500)); } } setBulkProgress((p) => p ? { ...p, active: false, current: null } : null); setSelectedIds(new Set()); // Refresh video list after bulk completes setTimeout(() => void load(), 2000); }; const cancelBulk = () => { bulkCancelRef.current = true; }; return (

Pipeline Management

{videos.length} video{videos.length !== 1 ? "s" : ""}

{stalePagesCount !== null && stalePagesCount > 0 && ( )}
{loading ? (
Loading videos…
) : error ? (
Error: {error}
) : videos.length === 0 ? (
No videos in pipeline.
) : ( <>
{creators.length >= 1 && (
)}
setSearchQuery(e.target.value)} /> {searchQuery && ( )}
{/* Bulk toolbar */} {selectedIds.size > 0 && (
{selectedIds.size} selected {bulkProgress?.active ? ( <>
{bulkProgress.completed + bulkProgress.failed}/{bulkProgress.total} {bulkProgress.failed > 0 && ({bulkProgress.failed} failed)} {bulkProgress.current && ( {bulkProgress.current} )}
) : ( <> )}
)} {/* Bulk completion message */} {bulkProgress && !bulkProgress.active && (
0 ? "bulk-toolbar__done--warn" : "bulk-toolbar__done--ok"}`}> Bulk operation complete: {bulkProgress.completed} succeeded {bulkProgress.failed > 0 && `, ${bulkProgress.failed} failed`} {bulkCancelRef.current && " (cancelled)"}
)} {/* Bulk operation log */} {bulkLog.length > 0 && (bulkProgress?.active || (bulkProgress && !bulkProgress.active)) && (
{bulkLog.map((entry, i) => (
{entry.ok ? "✓" : "✗"} {entry.filename} {entry.message}
))}
)}
{filteredVideos.map((video) => (
{ if (el) videoRefs.current.set(video.id, el); }}>
toggleExpand(video.id)} >
e.stopPropagation()}> toggleSelect(video.id)} disabled={bulkProgress?.active ?? false} aria-label={`Select ${video.filename}`} />
{video.filename} {video.creator_name}
{STATUS_LABELS[video.processing_status] ?? video.processing_status} {video.event_count} events {formatTokens(video.total_tokens_used)} tokens {formatDate(video.last_event_at)}
e.stopPropagation()}>
{actionMessage?.id === video.id && (
{actionMessage.text}
)} {expandedId === video.id && (
Created: {formatDate(video.created_at)} Updated: {formatDate(video.updated_at)}
)}
))} {filteredVideos.length > 0 && (
)}
)} {/* Re-run Stage Modal */} {rerunModalVideo && (
setRerunModalVideo(null)}>
e.stopPropagation()}>

Re-run Single Stage

Re-run a specific pipeline stage without re-running predecessors.