From b0ad4c2dfc192abda040bd0adccc242ee8415cfe Mon Sep 17 00:00:00 2001 From: jlightner Date: Tue, 31 Mar 2026 16:12:57 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20real-time=20pipeline=20visibility?= =?UTF-8?q?=20=E2=80=94=20auto-refresh,=20stage=20timeline,=20activity=20f?= =?UTF-8?q?eed,=20bulk=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Backend: Video list now includes active_stage, active_stage_status, and stage_started_at fields via DISTINCT ON subquery - Backend: New GET /admin/pipeline/recent-activity endpoint returns latest stage completions/errors with video context - Frontend: 15-second auto-refresh with change detection — video rows flash when status changes - Frontend: Stage timeline dots on processing/complete/error videos showing progress through stages 2-5, active stage pulses - Frontend: Collapsible Recent Activity feed at top showing last 8 stage completions/errors with duration and creator - Frontend: Bulk operation scrollable log showing per-video results as they complete - Frontend: Auto-refresh checkbox toggle in header --- backend/routers/pipeline.py | 68 ++++++++ frontend/src/App.css | 237 +++++++++++++++++++++++++++ frontend/src/api/public-client.ts | 23 +++ frontend/src/pages/AdminPipeline.tsx | 205 ++++++++++++++++++++++- 4 files changed, 526 insertions(+), 7 deletions(-) diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index d4b9336..393c8c5 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -88,6 +88,20 @@ async def list_pipeline_videos( .subquery() ) + # Subquery for the most recent stage start event per video (active stage indicator) + latest_stage = ( + select( + PipelineEvent.video_id, + PipelineEvent.stage.label("active_stage"), + PipelineEvent.event_type.label("active_stage_status"), + PipelineEvent.created_at.label("stage_started_at"), + ) + .where(PipelineEvent.event_type.in_(["start", "complete", "error"])) + .order_by(PipelineEvent.video_id, PipelineEvent.created_at.desc()) + .distinct(PipelineEvent.video_id) + .subquery() + ) + stmt = ( select( SourceVideo.id, @@ -100,9 +114,13 @@ async def list_pipeline_videos( event_counts.c.event_count, event_counts.c.total_tokens_used, event_counts.c.last_event_at, + latest_stage.c.active_stage, + latest_stage.c.active_stage_status, + latest_stage.c.stage_started_at, ) .join(Creator, SourceVideo.creator_id == Creator.id) .outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id) + .outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id) .order_by(SourceVideo.updated_at.desc()) ) @@ -122,6 +140,9 @@ async def list_pipeline_videos( "event_count": r.event_count or 0, "total_tokens_used": r.total_tokens_used or 0, "last_event_at": r.last_event_at.isoformat() if r.last_event_at else None, + "active_stage": r.active_stage, + "active_stage_status": r.active_stage_status, + "stage_started_at": r.stage_started_at.isoformat() if r.stage_started_at else None, } for r in rows ], @@ -250,6 +271,53 @@ async def revoke_pipeline(video_id: str): ) from exc +# ── Admin: Recent activity feed ────────────────────────────────────────────── + +@router.get("/admin/pipeline/recent-activity") +async def recent_pipeline_activity( + limit: Annotated[int, Query(ge=1, le=20)] = 10, + db: AsyncSession = Depends(get_session), +): + """Get the most recent pipeline stage completions and errors with video context.""" + stmt = ( + select( + PipelineEvent.id, + PipelineEvent.video_id, + PipelineEvent.stage, + PipelineEvent.event_type, + PipelineEvent.total_tokens, + PipelineEvent.duration_ms, + PipelineEvent.created_at, + SourceVideo.filename, + Creator.name.label("creator_name"), + ) + .join(SourceVideo, PipelineEvent.video_id == SourceVideo.id) + .join(Creator, SourceVideo.creator_id == Creator.id) + .where(PipelineEvent.event_type.in_(["complete", "error"])) + .order_by(PipelineEvent.created_at.desc()) + .limit(limit) + ) + result = await db.execute(stmt) + rows = result.all() + + return { + "items": [ + { + "id": str(r.id), + "video_id": str(r.video_id), + "filename": r.filename, + "creator_name": r.creator_name, + "stage": r.stage, + "event_type": r.event_type, + "total_tokens": r.total_tokens, + "duration_ms": r.duration_ms, + "created_at": r.created_at.isoformat() if r.created_at else None, + } + for r in rows + ], + } + + # ── Admin: Event log ───────────────────────────────────────────────────────── @router.get("/admin/pipeline/events/{video_id}") diff --git a/frontend/src/App.css b/frontend/src/App.css index 0623eea..6898093 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -3503,6 +3503,243 @@ a.app-footer__repo:hover { border: 1px solid rgba(255, 152, 0, 0.3); } +/* ── Auto-refresh toggle ──────────────────────────────────────────────────── */ + +.auto-refresh-toggle { + display: flex; + align-items: center; + gap: 0.4rem; + font-size: 0.8rem; + color: var(--color-text-muted); + cursor: pointer; + white-space: nowrap; +} + +.auto-refresh-toggle input { + accent-color: var(--color-accent); +} + +/* ── Video row change highlight ───────────────────────────────────────────── */ + +@keyframes statusChange { + 0% { box-shadow: 0 0 0 2px var(--color-accent); } + 100% { box-shadow: 0 0 0 0 transparent; } +} + +.pipeline-video--changed { + animation: statusChange 2s ease-out; +} + +/* ── Stage Timeline ───────────────────────────────────────────────────────── */ + +.stage-timeline { + display: flex; + align-items: center; + gap: 2px; +} + +.stage-timeline__step { + display: flex; + align-items: center; + gap: 3px; +} + +.stage-timeline__dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: var(--color-border); + flex-shrink: 0; + transition: background 300ms, box-shadow 300ms; +} + +.stage-timeline__label { + font-size: 0.65rem; + color: var(--color-text-muted); + display: none; +} + +.stage-timeline__step--done .stage-timeline__dot { + background: #00c853; +} + +.stage-timeline__step--active .stage-timeline__dot { + background: var(--color-accent); + box-shadow: 0 0 6px var(--color-accent); + animation: stagePulse 1.5s ease-in-out infinite; +} + +.stage-timeline__step--active .stage-timeline__label { + display: inline; + color: var(--color-accent); + font-weight: 600; +} + +.stage-timeline__step--error .stage-timeline__dot { + background: #f44336; +} + +.stage-timeline__step--error .stage-timeline__label { + display: inline; + color: #f44336; +} + +@keyframes stagePulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.4; } +} + +/* Connector line between dots */ +.stage-timeline__step + .stage-timeline__step::before { + content: ""; + display: block; + width: 8px; + height: 1px; + background: var(--color-border); + margin-right: 2px; +} + +.stage-timeline__step--done + .stage-timeline__step::before { + background: #00c853; +} + +/* ── Recent Activity Feed ─────────────────────────────────────────────────── */ + +.recent-activity { + margin-bottom: 1rem; + border: 1px solid var(--color-border); + border-radius: 8px; + overflow: hidden; +} + +.recent-activity__toggle { + display: flex; + align-items: center; + justify-content: space-between; + width: 100%; + padding: 0.6rem 1rem; + background: var(--color-surface); + border: none; + color: var(--color-text); + cursor: pointer; + font-size: 0.85rem; +} + +.recent-activity__toggle:hover { + background: var(--color-bg-input); +} + +.recent-activity__title { + font-weight: 600; +} + +.recent-activity__list { + max-height: 200px; + overflow-y: auto; +} + +.recent-activity__item { + display: flex; + align-items: center; + gap: 0.5rem; + padding: 0.35rem 1rem; + font-size: 0.8rem; + border-top: 1px solid var(--color-border); +} + +.recent-activity__item--complete { + color: #00c853; +} + +.recent-activity__item--error { + color: #f44336; +} + +.recent-activity__icon { + flex-shrink: 0; + width: 1rem; + text-align: center; +} + +.recent-activity__stage { + flex-shrink: 0; + width: 6rem; + font-family: monospace; + font-size: 0.75rem; +} + +.recent-activity__file { + flex: 1; + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + color: var(--color-text); +} + +.recent-activity__creator { + flex-shrink: 0; + color: var(--color-text-muted); +} + +.recent-activity__duration { + flex-shrink: 0; + color: var(--color-text-muted); + font-family: monospace; + font-size: 0.75rem; +} + +.recent-activity__time { + flex-shrink: 0; + color: var(--color-text-muted); + font-size: 0.75rem; +} + +/* ── Bulk Log ─────────────────────────────────────────────────────────────── */ + +.bulk-log { + margin-bottom: 0.75rem; + border: 1px solid var(--color-border); + border-radius: 8px; + overflow: hidden; +} + +.bulk-log__list { + max-height: 150px; + overflow-y: auto; + padding: 0.25rem 0; +} + +.bulk-log__entry { + display: flex; + align-items: center; + gap: 0.5rem; + padding: 0.2rem 1rem; + font-size: 0.8rem; + font-family: monospace; +} + +.bulk-log__entry--ok .bulk-log__icon { + color: #00c853; +} + +.bulk-log__entry--err .bulk-log__icon { + color: #f44336; +} + +.bulk-log__file { + flex: 1; + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.bulk-log__msg { + color: var(--color-text-muted); + flex-shrink: 0; +} + /* ── Worker Status Indicator ────────────────────────────────────────────── */ .worker-status { diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts index d3f41bf..9df7597 100644 --- a/frontend/src/api/public-client.ts +++ b/frontend/src/api/public-client.ts @@ -410,6 +410,9 @@ export interface PipelineVideoItem { event_count: number; total_tokens_used: number; last_event_at: string | null; + active_stage: string | null; + active_stage_status: string | null; + stage_started_at: string | null; } export interface PipelineVideoListResponse { @@ -479,6 +482,26 @@ export async function fetchPipelineVideos(): Promise return request(`${BASE}/admin/pipeline/videos`); } +export interface RecentActivityItem { + id: string; + video_id: string; + filename: string; + creator_name: string; + stage: string; + event_type: string; + total_tokens: number | null; + duration_ms: number | null; + created_at: string | null; +} + +export interface RecentActivityResponse { + items: RecentActivityItem[]; +} + +export async function fetchRecentActivity(limit = 10): Promise { + return request(`${BASE}/admin/pipeline/recent-activity?limit=${limit}`); +} + export async function fetchPipelineEvents( videoId: string, params: { offset?: number; limit?: number; stage?: string; event_type?: string; order?: "asc" | "desc" } = {}, diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx index 95f7638..dada332 100644 --- a/frontend/src/pages/AdminPipeline.tsx +++ b/frontend/src/pages/AdminPipeline.tsx @@ -16,9 +16,11 @@ import { revokePipeline, cleanRetriggerPipeline, fetchCreators, + fetchRecentActivity, type PipelineVideoItem, type PipelineEvent, type WorkerStatusResponse, + type RecentActivityItem, } from "../api/public-client"; // ── Helpers ────────────────────────────────────────────────────────────────── @@ -466,6 +468,122 @@ function StatusFilter({ ); } +// ── Stage Timeline ─────────────────────────────────────────────────────────── + +const PIPELINE_STAGES = [ + { key: "stage2_segmentation", label: "Segment" }, + { key: "stage3_extraction", label: "Extract" }, + { key: "stage4_classification", label: "Classify" }, + { key: "stage5_synthesis", label: "Synthesize" }, + { key: "stage6_embed", label: "Embed" }, +]; + +function StageTimeline({ video }: { video: PipelineVideoItem }) { + if (video.processing_status !== "processing" && video.processing_status !== "complete" && video.processing_status !== "error") { + return null; + } + + const activeStage = video.active_stage; + const activeStatus = video.active_stage_status; // "start" = running, "complete" = done, "error" = failed + + // Determine each stage's state + const activeIdx = PIPELINE_STAGES.findIndex((s) => s.key === activeStage); + + return ( +
+ {PIPELINE_STAGES.map((stage, i) => { + let stateClass = "stage-timeline__step--future"; + if (activeIdx >= 0) { + if (i < activeIdx) { + stateClass = "stage-timeline__step--done"; + } else if (i === activeIdx) { + if (activeStatus === "start") stateClass = "stage-timeline__step--active"; + else if (activeStatus === "complete") stateClass = "stage-timeline__step--done"; + else if (activeStatus === "error") stateClass = "stage-timeline__step--error"; + } + } + // If video is complete, all stages are done + if (video.processing_status === "complete") { + stateClass = "stage-timeline__step--done"; + } + + return ( +
+
+ {stage.label} +
+ ); + })} +
+ ); +} + +// ── Recent Activity Feed ───────────────────────────────────────────────────── + +function RecentActivityFeed() { + const [items, setItems] = useState([]); + const [collapsed, setCollapsed] = useState(false); + const [loading, setLoading] = useState(true); + + const load = useCallback(async () => { + try { + const res = await fetchRecentActivity(8); + setItems(res.items); + } catch { + // silently fail + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + void load(); + const id = setInterval(() => void load(), 15_000); + return () => clearInterval(id); + }, [load]); + + if (loading || items.length === 0) return null; + + return ( +
+ + {!collapsed && ( +
+ {items.map((item) => ( +
+ + {item.event_type === "complete" ? "✓" : "✗"} + + {item.stage.replace("stage", "S").replace("_", " ")} + + {item.filename} + + {item.creator_name} + {item.duration_ms != null && ( + + {item.duration_ms > 60000 + ? `${(item.duration_ms / 60000).toFixed(1)}m` + : item.duration_ms > 1000 + ? `${(item.duration_ms / 1000).toFixed(1)}s` + : `${item.duration_ms}ms`} + + )} + {formatDate(item.created_at)} +
+ ))} +
+ )} +
+ ); +} + // ── Main Page ──────────────────────────────────────────────────────────────── interface BulkProgress { @@ -476,6 +594,12 @@ interface BulkProgress { active: boolean; } +interface BulkLogEntry { + filename: string; + ok: boolean; + message: string; +} + export default function AdminPipeline() { useDocumentTitle("Pipeline Management — Chrysopedia"); const [searchParams] = useSearchParams(); @@ -491,7 +615,11 @@ export default function AdminPipeline() { const [creatorFilter, setCreatorFilter] = useState(null); const [selectedIds, setSelectedIds] = useState>(new Set()); const [bulkProgress, setBulkProgress] = useState(null); + const [bulkLog, setBulkLog] = useState([]); + const [changedIds, setChangedIds] = useState>(new Set()); + const [autoRefresh, setAutoRefresh] = useState(true); const bulkCancelRef = useRef(false); + const prevStatusRef = useRef>(new Map()); const videoRefs = useRef>(new Map()); const deepLinked = useRef(false); @@ -502,16 +630,39 @@ export default function AdminPipeline() { return true; }); - const load = useCallback(async () => { - setLoading(true); + const load = useCallback(async (isAutoRefresh = false) => { + if (!isAutoRefresh) setLoading(true); setError(null); try { const res = await fetchPipelineVideos(); + // Detect status changes for highlight animation + if (prevStatusRef.current.size > 0) { + const changed = new Set(); + for (const v of res.items) { + const prev = prevStatusRef.current.get(v.id); + if (prev && prev !== v.processing_status) { + changed.add(v.id); + } + } + if (changed.size > 0) { + setChangedIds(changed); + // Clear highlights after animation + setTimeout(() => setChangedIds(new Set()), 2000); + } + } + // Store current statuses for next comparison + const statusMap = new Map(); + for (const v of res.items) { + statusMap.set(v.id, v.processing_status); + } + prevStatusRef.current = statusMap; setVideos(res.items); } catch (err) { - setError(err instanceof Error ? err.message : "Failed to load videos"); + if (!isAutoRefresh) { + setError(err instanceof Error ? err.message : "Failed to load videos"); + } } finally { - setLoading(false); + if (!isAutoRefresh) setLoading(false); } }, []); @@ -519,6 +670,13 @@ export default function AdminPipeline() { void load(); }, [load]); + // Auto-refresh every 15 seconds + useEffect(() => { + if (!autoRefresh) return; + const id = setInterval(() => void load(true), 15_000); + return () => clearInterval(id); + }, [autoRefresh, load]); + // Load creators for filter dropdown useEffect(() => { fetchCreators({ limit: 200 }) @@ -641,6 +799,7 @@ export default function AdminPipeline() { if (ids.length === 0) return; bulkCancelRef.current = false; + setBulkLog([]); setBulkProgress({ total: ids.length, completed: 0, failed: 0, current: null, active: true }); let completed = 0; @@ -650,7 +809,8 @@ export default function AdminPipeline() { if (bulkCancelRef.current) break; const video = videos.find((v) => v.id === id); - setBulkProgress((p) => p ? { ...p, current: video?.filename ?? id } : null); + const filename = video?.filename ?? id; + setBulkProgress((p) => p ? { ...p, current: filename } : null); try { if (clean) { @@ -659,8 +819,14 @@ export default function AdminPipeline() { await triggerPipeline(id); } completed++; - } catch { + setBulkLog((prev) => [...prev, { filename, ok: true, message: clean ? "clean retriggered" : "triggered" }]); + } catch (err) { failed++; + setBulkLog((prev) => [...prev, { + filename, + ok: false, + message: err instanceof Error ? err.message : "failed", + }]); } setBulkProgress((p) => p ? { ...p, completed: completed, failed } : null); @@ -693,6 +859,14 @@ export default function AdminPipeline() {
+ @@ -707,6 +881,7 @@ export default function AdminPipeline() {
No videos in pipeline.
) : ( <> +
)} + {/* Bulk operation log */} + {bulkLog.length > 0 && (bulkProgress?.active || (bulkProgress && !bulkProgress.active)) && ( +
+
+ {bulkLog.map((entry, i) => ( +
+ {entry.ok ? "✓" : "✗"} + {entry.filename} + {entry.message} +
+ ))} +
+
+ )} +
{filteredVideos.map((video) => ( -
{ if (el) videoRefs.current.set(video.id, el); }}> +
{ if (el) videoRefs.current.set(video.id, el); }}>
toggleExpand(video.id)} @@ -822,6 +1012,7 @@ export default function AdminPipeline() { {STATUS_LABELS[video.processing_status] ?? video.processing_status} + {video.event_count} events