feat: Add real-time pipeline visibility — auto-refresh, stage timeline, activity feed, bulk log

- Backend: Video list now includes active_stage, active_stage_status, and
  stage_started_at fields via DISTINCT ON subquery
- Backend: New GET /admin/pipeline/recent-activity endpoint returns
  latest stage completions/errors with video context
- Frontend: 15-second auto-refresh with change detection — video rows
  flash when status changes
- Frontend: Stage timeline dots on processing/complete/error videos
  showing progress through stages 2-5, active stage pulses
- Frontend: Collapsible Recent Activity feed at top showing last 8
  stage completions/errors with duration and creator
- Frontend: Bulk operation scrollable log showing per-video results
  as they complete
- Frontend: Auto-refresh checkbox toggle in header
This commit is contained in:
jlightner 2026-03-31 16:12:57 +00:00
parent e17132bd60
commit b0ad4c2dfc
4 changed files with 526 additions and 7 deletions

View file

@ -88,6 +88,20 @@ async def list_pipeline_videos(
.subquery()
)
# Subquery for the most recent stage start event per video (active stage indicator)
latest_stage = (
select(
PipelineEvent.video_id,
PipelineEvent.stage.label("active_stage"),
PipelineEvent.event_type.label("active_stage_status"),
PipelineEvent.created_at.label("stage_started_at"),
)
.where(PipelineEvent.event_type.in_(["start", "complete", "error"]))
.order_by(PipelineEvent.video_id, PipelineEvent.created_at.desc())
.distinct(PipelineEvent.video_id)
.subquery()
)
stmt = (
select(
SourceVideo.id,
@ -100,9 +114,13 @@ async def list_pipeline_videos(
event_counts.c.event_count,
event_counts.c.total_tokens_used,
event_counts.c.last_event_at,
latest_stage.c.active_stage,
latest_stage.c.active_stage_status,
latest_stage.c.stage_started_at,
)
.join(Creator, SourceVideo.creator_id == Creator.id)
.outerjoin(event_counts, SourceVideo.id == event_counts.c.video_id)
.outerjoin(latest_stage, SourceVideo.id == latest_stage.c.video_id)
.order_by(SourceVideo.updated_at.desc())
)
@ -122,6 +140,9 @@ async def list_pipeline_videos(
"event_count": r.event_count or 0,
"total_tokens_used": r.total_tokens_used or 0,
"last_event_at": r.last_event_at.isoformat() if r.last_event_at else None,
"active_stage": r.active_stage,
"active_stage_status": r.active_stage_status,
"stage_started_at": r.stage_started_at.isoformat() if r.stage_started_at else None,
}
for r in rows
],
@ -250,6 +271,53 @@ async def revoke_pipeline(video_id: str):
) from exc
# ── Admin: Recent activity feed ──────────────────────────────────────────────
@router.get("/admin/pipeline/recent-activity")
async def recent_pipeline_activity(
limit: Annotated[int, Query(ge=1, le=20)] = 10,
db: AsyncSession = Depends(get_session),
):
"""Get the most recent pipeline stage completions and errors with video context."""
stmt = (
select(
PipelineEvent.id,
PipelineEvent.video_id,
PipelineEvent.stage,
PipelineEvent.event_type,
PipelineEvent.total_tokens,
PipelineEvent.duration_ms,
PipelineEvent.created_at,
SourceVideo.filename,
Creator.name.label("creator_name"),
)
.join(SourceVideo, PipelineEvent.video_id == SourceVideo.id)
.join(Creator, SourceVideo.creator_id == Creator.id)
.where(PipelineEvent.event_type.in_(["complete", "error"]))
.order_by(PipelineEvent.created_at.desc())
.limit(limit)
)
result = await db.execute(stmt)
rows = result.all()
return {
"items": [
{
"id": str(r.id),
"video_id": str(r.video_id),
"filename": r.filename,
"creator_name": r.creator_name,
"stage": r.stage,
"event_type": r.event_type,
"total_tokens": r.total_tokens,
"duration_ms": r.duration_ms,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
],
}
# ── Admin: Event log ─────────────────────────────────────────────────────────
@router.get("/admin/pipeline/events/{video_id}")

View file

@ -3503,6 +3503,243 @@ a.app-footer__repo:hover {
border: 1px solid rgba(255, 152, 0, 0.3);
}
/* ── Auto-refresh toggle ──────────────────────────────────────────────────── */
.auto-refresh-toggle {
display: flex;
align-items: center;
gap: 0.4rem;
font-size: 0.8rem;
color: var(--color-text-muted);
cursor: pointer;
white-space: nowrap;
}
.auto-refresh-toggle input {
accent-color: var(--color-accent);
}
/* ── Video row change highlight ───────────────────────────────────────────── */
@keyframes statusChange {
0% { box-shadow: 0 0 0 2px var(--color-accent); }
100% { box-shadow: 0 0 0 0 transparent; }
}
.pipeline-video--changed {
animation: statusChange 2s ease-out;
}
/* ── Stage Timeline ───────────────────────────────────────────────────────── */
.stage-timeline {
display: flex;
align-items: center;
gap: 2px;
}
.stage-timeline__step {
display: flex;
align-items: center;
gap: 3px;
}
.stage-timeline__dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: var(--color-border);
flex-shrink: 0;
transition: background 300ms, box-shadow 300ms;
}
.stage-timeline__label {
font-size: 0.65rem;
color: var(--color-text-muted);
display: none;
}
.stage-timeline__step--done .stage-timeline__dot {
background: #00c853;
}
.stage-timeline__step--active .stage-timeline__dot {
background: var(--color-accent);
box-shadow: 0 0 6px var(--color-accent);
animation: stagePulse 1.5s ease-in-out infinite;
}
.stage-timeline__step--active .stage-timeline__label {
display: inline;
color: var(--color-accent);
font-weight: 600;
}
.stage-timeline__step--error .stage-timeline__dot {
background: #f44336;
}
.stage-timeline__step--error .stage-timeline__label {
display: inline;
color: #f44336;
}
@keyframes stagePulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.4; }
}
/* Connector line between dots */
.stage-timeline__step + .stage-timeline__step::before {
content: "";
display: block;
width: 8px;
height: 1px;
background: var(--color-border);
margin-right: 2px;
}
.stage-timeline__step--done + .stage-timeline__step::before {
background: #00c853;
}
/* ── Recent Activity Feed ─────────────────────────────────────────────────── */
.recent-activity {
margin-bottom: 1rem;
border: 1px solid var(--color-border);
border-radius: 8px;
overflow: hidden;
}
.recent-activity__toggle {
display: flex;
align-items: center;
justify-content: space-between;
width: 100%;
padding: 0.6rem 1rem;
background: var(--color-surface);
border: none;
color: var(--color-text);
cursor: pointer;
font-size: 0.85rem;
}
.recent-activity__toggle:hover {
background: var(--color-bg-input);
}
.recent-activity__title {
font-weight: 600;
}
.recent-activity__list {
max-height: 200px;
overflow-y: auto;
}
.recent-activity__item {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.35rem 1rem;
font-size: 0.8rem;
border-top: 1px solid var(--color-border);
}
.recent-activity__item--complete {
color: #00c853;
}
.recent-activity__item--error {
color: #f44336;
}
.recent-activity__icon {
flex-shrink: 0;
width: 1rem;
text-align: center;
}
.recent-activity__stage {
flex-shrink: 0;
width: 6rem;
font-family: monospace;
font-size: 0.75rem;
}
.recent-activity__file {
flex: 1;
min-width: 0;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
color: var(--color-text);
}
.recent-activity__creator {
flex-shrink: 0;
color: var(--color-text-muted);
}
.recent-activity__duration {
flex-shrink: 0;
color: var(--color-text-muted);
font-family: monospace;
font-size: 0.75rem;
}
.recent-activity__time {
flex-shrink: 0;
color: var(--color-text-muted);
font-size: 0.75rem;
}
/* ── Bulk Log ─────────────────────────────────────────────────────────────── */
.bulk-log {
margin-bottom: 0.75rem;
border: 1px solid var(--color-border);
border-radius: 8px;
overflow: hidden;
}
.bulk-log__list {
max-height: 150px;
overflow-y: auto;
padding: 0.25rem 0;
}
.bulk-log__entry {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.2rem 1rem;
font-size: 0.8rem;
font-family: monospace;
}
.bulk-log__entry--ok .bulk-log__icon {
color: #00c853;
}
.bulk-log__entry--err .bulk-log__icon {
color: #f44336;
}
.bulk-log__file {
flex: 1;
min-width: 0;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.bulk-log__msg {
color: var(--color-text-muted);
flex-shrink: 0;
}
/* ── Worker Status Indicator ────────────────────────────────────────────── */
.worker-status {

View file

@ -410,6 +410,9 @@ export interface PipelineVideoItem {
event_count: number;
total_tokens_used: number;
last_event_at: string | null;
active_stage: string | null;
active_stage_status: string | null;
stage_started_at: string | null;
}
export interface PipelineVideoListResponse {
@ -479,6 +482,26 @@ export async function fetchPipelineVideos(): Promise<PipelineVideoListResponse>
return request<PipelineVideoListResponse>(`${BASE}/admin/pipeline/videos`);
}
export interface RecentActivityItem {
id: string;
video_id: string;
filename: string;
creator_name: string;
stage: string;
event_type: string;
total_tokens: number | null;
duration_ms: number | null;
created_at: string | null;
}
export interface RecentActivityResponse {
items: RecentActivityItem[];
}
export async function fetchRecentActivity(limit = 10): Promise<RecentActivityResponse> {
return request<RecentActivityResponse>(`${BASE}/admin/pipeline/recent-activity?limit=${limit}`);
}
export async function fetchPipelineEvents(
videoId: string,
params: { offset?: number; limit?: number; stage?: string; event_type?: string; order?: "asc" | "desc" } = {},

View file

@ -16,9 +16,11 @@ import {
revokePipeline,
cleanRetriggerPipeline,
fetchCreators,
fetchRecentActivity,
type PipelineVideoItem,
type PipelineEvent,
type WorkerStatusResponse,
type RecentActivityItem,
} from "../api/public-client";
// ── Helpers ──────────────────────────────────────────────────────────────────
@ -466,6 +468,122 @@ function StatusFilter({
);
}
// ── Stage Timeline ───────────────────────────────────────────────────────────
const PIPELINE_STAGES = [
{ key: "stage2_segmentation", label: "Segment" },
{ key: "stage3_extraction", label: "Extract" },
{ key: "stage4_classification", label: "Classify" },
{ key: "stage5_synthesis", label: "Synthesize" },
{ key: "stage6_embed", label: "Embed" },
];
function StageTimeline({ video }: { video: PipelineVideoItem }) {
if (video.processing_status !== "processing" && video.processing_status !== "complete" && video.processing_status !== "error") {
return null;
}
const activeStage = video.active_stage;
const activeStatus = video.active_stage_status; // "start" = running, "complete" = done, "error" = failed
// Determine each stage's state
const activeIdx = PIPELINE_STAGES.findIndex((s) => s.key === activeStage);
return (
<div className="stage-timeline">
{PIPELINE_STAGES.map((stage, i) => {
let stateClass = "stage-timeline__step--future";
if (activeIdx >= 0) {
if (i < activeIdx) {
stateClass = "stage-timeline__step--done";
} else if (i === activeIdx) {
if (activeStatus === "start") stateClass = "stage-timeline__step--active";
else if (activeStatus === "complete") stateClass = "stage-timeline__step--done";
else if (activeStatus === "error") stateClass = "stage-timeline__step--error";
}
}
// If video is complete, all stages are done
if (video.processing_status === "complete") {
stateClass = "stage-timeline__step--done";
}
return (
<div key={stage.key} className={`stage-timeline__step ${stateClass}`}>
<div className="stage-timeline__dot" />
<span className="stage-timeline__label">{stage.label}</span>
</div>
);
})}
</div>
);
}
// ── Recent Activity Feed ─────────────────────────────────────────────────────
function RecentActivityFeed() {
const [items, setItems] = useState<RecentActivityItem[]>([]);
const [collapsed, setCollapsed] = useState(false);
const [loading, setLoading] = useState(true);
const load = useCallback(async () => {
try {
const res = await fetchRecentActivity(8);
setItems(res.items);
} catch {
// silently fail
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
void load();
const id = setInterval(() => void load(), 15_000);
return () => clearInterval(id);
}, [load]);
if (loading || items.length === 0) return null;
return (
<div className="recent-activity">
<button
className="recent-activity__toggle"
onClick={() => setCollapsed((v) => !v)}
aria-expanded={!collapsed}
>
<span className="recent-activity__title">Recent Activity</span>
<span className="recent-activity__arrow">{collapsed ? "▸" : "▾"}</span>
</button>
{!collapsed && (
<div className="recent-activity__list">
{items.map((item) => (
<div key={item.id} className={`recent-activity__item recent-activity__item--${item.event_type}`}>
<span className="recent-activity__icon">
{item.event_type === "complete" ? "✓" : "✗"}
</span>
<span className="recent-activity__stage">{item.stage.replace("stage", "S").replace("_", " ")}</span>
<span className="recent-activity__file" title={item.filename}>
{item.filename}
</span>
<span className="recent-activity__creator">{item.creator_name}</span>
{item.duration_ms != null && (
<span className="recent-activity__duration">
{item.duration_ms > 60000
? `${(item.duration_ms / 60000).toFixed(1)}m`
: item.duration_ms > 1000
? `${(item.duration_ms / 1000).toFixed(1)}s`
: `${item.duration_ms}ms`}
</span>
)}
<span className="recent-activity__time">{formatDate(item.created_at)}</span>
</div>
))}
</div>
)}
</div>
);
}
// ── Main Page ────────────────────────────────────────────────────────────────
interface BulkProgress {
@ -476,6 +594,12 @@ interface BulkProgress {
active: boolean;
}
interface BulkLogEntry {
filename: string;
ok: boolean;
message: string;
}
export default function AdminPipeline() {
useDocumentTitle("Pipeline Management — Chrysopedia");
const [searchParams] = useSearchParams();
@ -491,7 +615,11 @@ export default function AdminPipeline() {
const [creatorFilter, setCreatorFilter] = useState<string | null>(null);
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
const [bulkProgress, setBulkProgress] = useState<BulkProgress | null>(null);
const [bulkLog, setBulkLog] = useState<BulkLogEntry[]>([]);
const [changedIds, setChangedIds] = useState<Set<string>>(new Set());
const [autoRefresh, setAutoRefresh] = useState(true);
const bulkCancelRef = useRef(false);
const prevStatusRef = useRef<Map<string, string>>(new Map());
const videoRefs = useRef<Map<string, HTMLDivElement>>(new Map());
const deepLinked = useRef(false);
@ -502,16 +630,39 @@ export default function AdminPipeline() {
return true;
});
const load = useCallback(async () => {
setLoading(true);
const load = useCallback(async (isAutoRefresh = false) => {
if (!isAutoRefresh) setLoading(true);
setError(null);
try {
const res = await fetchPipelineVideos();
// Detect status changes for highlight animation
if (prevStatusRef.current.size > 0) {
const changed = new Set<string>();
for (const v of res.items) {
const prev = prevStatusRef.current.get(v.id);
if (prev && prev !== v.processing_status) {
changed.add(v.id);
}
}
if (changed.size > 0) {
setChangedIds(changed);
// Clear highlights after animation
setTimeout(() => setChangedIds(new Set()), 2000);
}
}
// Store current statuses for next comparison
const statusMap = new Map<string, string>();
for (const v of res.items) {
statusMap.set(v.id, v.processing_status);
}
prevStatusRef.current = statusMap;
setVideos(res.items);
} catch (err) {
setError(err instanceof Error ? err.message : "Failed to load videos");
if (!isAutoRefresh) {
setError(err instanceof Error ? err.message : "Failed to load videos");
}
} finally {
setLoading(false);
if (!isAutoRefresh) setLoading(false);
}
}, []);
@ -519,6 +670,13 @@ export default function AdminPipeline() {
void load();
}, [load]);
// Auto-refresh every 15 seconds
useEffect(() => {
if (!autoRefresh) return;
const id = setInterval(() => void load(true), 15_000);
return () => clearInterval(id);
}, [autoRefresh, load]);
// Load creators for filter dropdown
useEffect(() => {
fetchCreators({ limit: 200 })
@ -641,6 +799,7 @@ export default function AdminPipeline() {
if (ids.length === 0) return;
bulkCancelRef.current = false;
setBulkLog([]);
setBulkProgress({ total: ids.length, completed: 0, failed: 0, current: null, active: true });
let completed = 0;
@ -650,7 +809,8 @@ export default function AdminPipeline() {
if (bulkCancelRef.current) break;
const video = videos.find((v) => v.id === id);
setBulkProgress((p) => p ? { ...p, current: video?.filename ?? id } : null);
const filename = video?.filename ?? id;
setBulkProgress((p) => p ? { ...p, current: filename } : null);
try {
if (clean) {
@ -659,8 +819,14 @@ export default function AdminPipeline() {
await triggerPipeline(id);
}
completed++;
} catch {
setBulkLog((prev) => [...prev, { filename, ok: true, message: clean ? "clean retriggered" : "triggered" }]);
} catch (err) {
failed++;
setBulkLog((prev) => [...prev, {
filename,
ok: false,
message: err instanceof Error ? err.message : "failed",
}]);
}
setBulkProgress((p) => p ? { ...p, completed: completed, failed } : null);
@ -693,6 +859,14 @@ export default function AdminPipeline() {
<div className="admin-pipeline__header-right">
<DebugModeToggle debugMode={debugMode} onDebugModeChange={setDebugModeState} />
<WorkerStatus />
<label className="auto-refresh-toggle" title="Auto-refresh video list every 15 seconds">
<input
type="checkbox"
checked={autoRefresh}
onChange={(e) => setAutoRefresh(e.target.checked)}
/>
Auto-refresh
</label>
<button className="btn btn--secondary" onClick={() => void load()} disabled={loading}>
Refresh
</button>
@ -707,6 +881,7 @@ export default function AdminPipeline() {
<div className="empty-state">No videos in pipeline.</div>
) : (
<>
<RecentActivityFeed />
<div className="admin-pipeline__filters">
<StatusFilter
videos={videos}
@ -795,9 +970,24 @@ export default function AdminPipeline() {
</div>
)}
{/* Bulk operation log */}
{bulkLog.length > 0 && (bulkProgress?.active || (bulkProgress && !bulkProgress.active)) && (
<div className="bulk-log">
<div className="bulk-log__list">
{bulkLog.map((entry, i) => (
<div key={i} className={`bulk-log__entry ${entry.ok ? "bulk-log__entry--ok" : "bulk-log__entry--err"}`}>
<span className="bulk-log__icon">{entry.ok ? "✓" : "✗"}</span>
<span className="bulk-log__file">{entry.filename}</span>
<span className="bulk-log__msg">{entry.message}</span>
</div>
))}
</div>
</div>
)}
<div className="admin-pipeline__list">
{filteredVideos.map((video) => (
<div key={video.id} className="pipeline-video" ref={(el) => { if (el) videoRefs.current.set(video.id, el); }}>
<div key={video.id} className={`pipeline-video${changedIds.has(video.id) ? " pipeline-video--changed" : ""}`} ref={(el) => { if (el) videoRefs.current.set(video.id, el); }}>
<div
className="pipeline-video__header"
onClick={() => toggleExpand(video.id)}
@ -822,6 +1012,7 @@ export default function AdminPipeline() {
<span className={`pipeline-badge ${statusBadgeClass(video.processing_status)}`}>
{STATUS_LABELS[video.processing_status] ?? video.processing_status}
</span>
<StageTimeline video={video} />
<span className="pipeline-video__stat" title="Events">
{video.event_count} events
</span>