From 04ae6d07037faf73c230f89a8dc1a2107d88c705 Mon Sep 17 00:00:00 2001 From: jlightner Date: Tue, 31 Mar 2026 15:24:59 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20bulk=20pipeline=20reprocessing=20?= =?UTF-8?q?=E2=80=94=20creator=20filter,=20multi-select,=20clean=20retrigg?= =?UTF-8?q?er?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Backend: POST /admin/pipeline/clean-retrigger/{video_id} endpoint that deletes pipeline_events, key_moments, transcript_segments, and Qdrant vectors before retriggering the pipeline - Backend: QdrantManager.delete_by_video_id() for vector cleanup - Frontend: Creator filter dropdown on pipeline admin page - Frontend: Checkbox selection column with select-all - Frontend: Bulk toolbar with Retrigger Selected and Clean Reprocess actions, sequential dispatch with progress bar, cancel support - Bulk dispatch uses 500ms delay between requests to avoid slamming API --- backend/pipeline/qdrant_client.py | 39 +++++ backend/routers/pipeline.py | 74 ++++++++- frontend/src/App.css | 156 +++++++++++++++++- frontend/src/api/public-client.ts | 12 ++ frontend/src/pages/AdminPipeline.tsx | 238 +++++++++++++++++++++++++-- 5 files changed, 504 insertions(+), 15 deletions(-) diff --git a/backend/pipeline/qdrant_client.py b/backend/pipeline/qdrant_client.py index 75c66a1..85cc4ee 100644 --- a/backend/pipeline/qdrant_client.py +++ b/backend/pipeline/qdrant_client.py @@ -63,6 +63,45 @@ class QdrantManager: exc, ) + # ── Deletion ─────────────────────────────────────────────────────────── + + def delete_by_video_id(self, video_id: str) -> int: + """Delete all points (key moments + technique pages) associated with a video. + + Key moments have source_video_id in payload. + Technique pages don't have direct video linkage, so only moments are deleted. + + Returns the count of deleted points (best-effort — Qdrant may not report exact counts). + """ + from qdrant_client.models import Filter, FieldCondition, MatchValue + + try: + result = self._client.delete( + collection_name=self._collection, + points_selector=Filter( + must=[ + FieldCondition( + key="source_video_id", + match=MatchValue(value=video_id), + ), + ], + ), + ) + logger.info( + "Deleted Qdrant points for video_id=%s from collection '%s'.", + video_id, + self._collection, + ) + return 0 # Qdrant delete doesn't return count + except Exception as exc: + logger.warning( + "Qdrant delete for video_id=%s failed (%s: %s). Skipping.", + video_id, + type(exc).__name__, + exc, + ) + return 0 + # ── Low-level upsert ───────────────────────────────────────────────── def upsert_points(self, points: list[PointStruct]) -> None: diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index 102230b..d4b9336 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from config import get_settings from database import get_session -from models import PipelineEvent, SourceVideo, Creator +from models import PipelineEvent, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus from redis_client import get_redis from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse @@ -140,6 +140,78 @@ async def admin_trigger_pipeline( return await trigger_pipeline(video_id, db) +# ── Admin: Clean Retrigger ─────────────────────────────────────────────────── + +@router.post("/admin/pipeline/clean-retrigger/{video_id}") +async def clean_retrigger_pipeline( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Wipe prior pipeline output for a video, then retrigger. + + Deletes: pipeline_events, key_moments, transcript_segments, + and associated Qdrant vectors. Resets processing_status to 'not_started'. + Does NOT delete technique_pages — the pipeline re-synthesizes via upsert. + """ + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + # Delete pipeline events + await db.execute( + PipelineEvent.__table__.delete().where(PipelineEvent.video_id == video_id) + ) + # Delete key moments + await db.execute( + KeyMoment.__table__.delete().where(KeyMoment.source_video_id == video_id) + ) + # Delete transcript segments + await db.execute( + TranscriptSegment.__table__.delete().where(TranscriptSegment.source_video_id == video_id) + ) + # Reset status + video.processing_status = ProcessingStatus.not_started + await db.commit() + + deleted_counts = { + "pipeline_events": "cleared", + "key_moments": "cleared", + "transcript_segments": "cleared", + } + + # Best-effort Qdrant cleanup (non-blocking) + try: + settings = get_settings() + from pipeline.qdrant_client import QdrantManager + qdrant = QdrantManager(settings) + qdrant.delete_by_video_id(str(video_id)) + deleted_counts["qdrant_vectors"] = "cleared" + except Exception as exc: + logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc) + deleted_counts["qdrant_vectors"] = f"skipped: {exc}" + + # Now trigger the pipeline + from pipeline.stages import run_pipeline + try: + run_pipeline.delay(str(video.id)) + logger.info("Clean retrigger dispatched for video_id=%s", video_id) + except Exception as exc: + logger.warning("Failed to dispatch pipeline after cleanup for video_id=%s: %s", video_id, exc) + raise HTTPException( + status_code=503, + detail="Cleanup succeeded but pipeline dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "clean_retriggered", + "video_id": str(video.id), + "cleaned": deleted_counts, + } + + # ── Admin: Revoke ──────────────────────────────────────────────────────────── @router.post("/admin/pipeline/revoke/{video_id}") diff --git a/frontend/src/App.css b/frontend/src/App.css index 5376611..0623eea 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -3349,6 +3349,160 @@ a.app-footer__repo:hover { gap: 0.75rem; } +.admin-pipeline__filters { + display: flex; + align-items: center; + gap: 1rem; + flex-wrap: wrap; + margin-bottom: 0.5rem; +} + +.admin-pipeline__select-all { + padding: 0.5rem 0; + font-size: 0.85rem; + color: var(--color-text-muted); +} + +.admin-pipeline__select-all label { + display: flex; + align-items: center; + gap: 0.5rem; + cursor: pointer; +} + +/* ── Creator Filter ───────────────────────────────────────────────────────── */ + +.creator-filter__select { + padding: 0.4rem 0.75rem; + border-radius: 6px; + border: 1px solid var(--color-border); + background: var(--color-surface); + color: var(--color-text); + font-size: 0.85rem; + cursor: pointer; +} + +.creator-filter__select:focus { + outline: 2px solid var(--color-accent); + outline-offset: 1px; +} + +/* ── Checkbox in video rows ───────────────────────────────────────────────── */ + +.pipeline-video__checkbox { + display: flex; + align-items: center; + padding-right: 0.5rem; + flex-shrink: 0; +} + +.pipeline-video__checkbox input[type="checkbox"] { + width: 16px; + height: 16px; + cursor: pointer; + accent-color: var(--color-accent); +} + +/* ── Bulk Toolbar ─────────────────────────────────────────────────────────── */ + +.bulk-toolbar { + display: flex; + align-items: center; + gap: 0.75rem; + padding: 0.75rem 1rem; + background: var(--color-surface); + border: 1px solid var(--color-accent); + border-radius: 8px; + margin-bottom: 0.75rem; + flex-wrap: wrap; +} + +.bulk-toolbar__count { + font-weight: 600; + color: var(--color-accent); + white-space: nowrap; +} + +.bulk-toolbar__progress { + display: flex; + align-items: center; + gap: 0.5rem; + flex: 1; + min-width: 200px; +} + +.bulk-toolbar__progress-bar { + flex: 1; + height: 6px; + background: var(--color-bg); + border-radius: 3px; + overflow: hidden; +} + +.bulk-toolbar__progress-fill { + height: 100%; + background: var(--color-accent); + border-radius: 3px; + transition: width 300ms ease; +} + +.bulk-toolbar__progress-text { + font-size: 0.85rem; + white-space: nowrap; +} + +.bulk-toolbar__current { + font-size: 0.8rem; + color: var(--color-text-muted); + max-width: 200px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.bulk-toolbar__done { + display: flex; + align-items: center; + gap: 0.75rem; + padding: 0.5rem 1rem; + border-radius: 8px; + margin-bottom: 0.75rem; + font-size: 0.9rem; +} + +.bulk-toolbar__done--ok { + background: rgba(0, 200, 83, 0.1); + border: 1px solid rgba(0, 200, 83, 0.3); + color: #00c853; +} + +.bulk-toolbar__done--warn { + background: rgba(255, 152, 0, 0.1); + border: 1px solid rgba(255, 152, 0, 0.3); + color: #ff9800; +} + +.bulk-toolbar__dismiss { + margin-left: auto; + background: none; + border: none; + color: inherit; + cursor: pointer; + font-size: 1rem; + padding: 0.2rem; + opacity: 0.7; +} + +.bulk-toolbar__dismiss:hover { + opacity: 1; +} + +.btn--warning { + background: rgba(255, 152, 0, 0.15); + color: #ff9800; + border: 1px solid rgba(255, 152, 0, 0.3); +} + /* ── Worker Status Indicator ────────────────────────────────────────────── */ .worker-status { @@ -3410,7 +3564,7 @@ a.app-footer__repo:hover { .pipeline-video__header { display: grid; - grid-template-columns: 1fr auto auto; + grid-template-columns: auto 1fr auto auto; gap: 0.75rem; align-items: center; padding: 0.75rem 1rem; diff --git a/frontend/src/api/public-client.ts b/frontend/src/api/public-client.ts index 6f571b6..d3f41bf 100644 --- a/frontend/src/api/public-client.ts +++ b/frontend/src/api/public-client.ts @@ -511,6 +511,18 @@ export async function revokePipeline(videoId: string): Promise { }); } +export interface CleanRetriggerResponse { + status: string; + video_id: string; + cleaned: Record; +} + +export async function cleanRetriggerPipeline(videoId: string): Promise { + return request(`${BASE}/admin/pipeline/clean-retrigger/${videoId}`, { + method: "POST", + }); +} + // ── Debug Mode ────────────────────────────────────────────────────────────── export interface DebugModeResponse { diff --git a/frontend/src/pages/AdminPipeline.tsx b/frontend/src/pages/AdminPipeline.tsx index 36a88b0..95f7638 100644 --- a/frontend/src/pages/AdminPipeline.tsx +++ b/frontend/src/pages/AdminPipeline.tsx @@ -14,6 +14,8 @@ import { setDebugMode, triggerPipeline, revokePipeline, + cleanRetriggerPipeline, + fetchCreators, type PipelineVideoItem, type PipelineEvent, type WorkerStatusResponse, @@ -466,6 +468,14 @@ function StatusFilter({ // ── Main Page ──────────────────────────────────────────────────────────────── +interface BulkProgress { + total: number; + completed: number; + failed: number; + current: string | null; + active: boolean; +} + export default function AdminPipeline() { useDocumentTitle("Pipeline Management — Chrysopedia"); const [searchParams] = useSearchParams(); @@ -477,9 +487,21 @@ export default function AdminPipeline() { const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null); const [activeFilter, setActiveFilter] = useState(null); const [debugMode, setDebugModeState] = useState(null); + const [creators, setCreators] = useState<{ name: string; slug: string }[]>([]); + const [creatorFilter, setCreatorFilter] = useState(null); + const [selectedIds, setSelectedIds] = useState>(new Set()); + const [bulkProgress, setBulkProgress] = useState(null); + const bulkCancelRef = useRef(false); const videoRefs = useRef>(new Map()); const deepLinked = useRef(false); + // Compute filtered list (status + creator) + const filteredVideos = videos.filter((v) => { + if (activeFilter !== null && v.processing_status !== activeFilter) return false; + if (creatorFilter !== null && v.creator_name !== creatorFilter) return false; + return true; + }); + const load = useCallback(async () => { setLoading(true); setError(null); @@ -497,6 +519,22 @@ export default function AdminPipeline() { void load(); }, [load]); + // Load creators for filter dropdown + useEffect(() => { + fetchCreators({ limit: 200 }) + .then((res) => { + const list = res.items.map((c: { name: string; slug: string }) => ({ + name: c.name, + slug: c.slug, + })); + list.sort((a: { name: string }, b: { name: string }) => a.name.localeCompare(b.name)); + setCreators(list); + }) + .catch(() => { + // silently fail — filter stays hidden + }); + }, []); + // Deep-link: auto-expand and scroll to ?video= on first load useEffect(() => { if (deepLinked.current || loading || videos.length === 0) return; @@ -506,7 +544,6 @@ export default function AdminPipeline() { if (!match) return; deepLinked.current = true; setExpandedId(targetVideoId); - // Scroll after the expanded detail renders requestAnimationFrame(() => { const el = videoRefs.current.get(targetVideoId); if (el) el.scrollIntoView({ behavior: "smooth", block: "start" }); @@ -519,19 +556,21 @@ export default function AdminPipeline() { .then((res) => { if (!cancelled) setDebugModeState(res.debug_mode); }) - .catch(() => { - // silently fail — toggle stays hidden - }); + .catch(() => {}); return () => { cancelled = true; }; }, []); + // Clear selection when filters change + useEffect(() => { + setSelectedIds(new Set()); + }, [activeFilter, creatorFilter]); + const handleTrigger = async (videoId: string) => { setActionLoading(videoId); setActionMessage(null); try { const res = await triggerPipeline(videoId); setActionMessage({ id: videoId, text: `Triggered (${res.status})`, ok: true }); - // Refresh after short delay to let status update setTimeout(() => void load(), 2000); } catch (err) { setActionMessage({ @@ -572,6 +611,76 @@ export default function AdminPipeline() { setExpandedId((prev) => (prev === id ? null : id)); }; + // ── Selection ─────────────────────────────────────────────────────────── + + const toggleSelect = (id: string) => { + setSelectedIds((prev) => { + const next = new Set(prev); + if (next.has(id)) next.delete(id); + else next.add(id); + return next; + }); + }; + + const toggleSelectAll = () => { + const visibleIds = filteredVideos.map((v) => v.id); + const allSelected = visibleIds.every((id) => selectedIds.has(id)); + if (allSelected) { + setSelectedIds(new Set()); + } else { + setSelectedIds(new Set(visibleIds)); + } + }; + + const allVisibleSelected = filteredVideos.length > 0 && filteredVideos.every((v) => selectedIds.has(v.id)); + + // ── Bulk Actions ──────────────────────────────────────────────────────── + + const runBulk = async (clean: boolean) => { + const ids = Array.from(selectedIds); + if (ids.length === 0) return; + + bulkCancelRef.current = false; + setBulkProgress({ total: ids.length, completed: 0, failed: 0, current: null, active: true }); + + let completed = 0; + let failed = 0; + + for (const id of ids) { + if (bulkCancelRef.current) break; + + const video = videos.find((v) => v.id === id); + setBulkProgress((p) => p ? { ...p, current: video?.filename ?? id } : null); + + try { + if (clean) { + await cleanRetriggerPipeline(id); + } else { + await triggerPipeline(id); + } + completed++; + } catch { + failed++; + } + + setBulkProgress((p) => p ? { ...p, completed: completed, failed } : null); + + // Brief pause between dispatches to avoid slamming the API + if (!bulkCancelRef.current && ids.indexOf(id) < ids.length - 1) { + await new Promise((r) => setTimeout(r, 500)); + } + } + + setBulkProgress((p) => p ? { ...p, active: false, current: null } : null); + setSelectedIds(new Set()); + // Refresh video list after bulk completes + setTimeout(() => void load(), 2000); + }; + + const cancelBulk = () => { + bulkCancelRef.current = true; + }; + return (
@@ -598,20 +707,110 @@ export default function AdminPipeline() {
No videos in pipeline.
) : ( <> - +
+ + {creators.length > 1 && ( +
+ +
+ )} +
+ + {/* Bulk toolbar */} + {selectedIds.size > 0 && ( +
+ + {selectedIds.size} selected + + {bulkProgress?.active ? ( + <> +
+
+
+
+ + {bulkProgress.completed + bulkProgress.failed}/{bulkProgress.total} + {bulkProgress.failed > 0 && ({bulkProgress.failed} failed)} + + {bulkProgress.current && ( + + {bulkProgress.current} + + )} +
+ + + ) : ( + <> + + + + + )} +
+ )} + + {/* Bulk completion message */} + {bulkProgress && !bulkProgress.active && ( +
0 ? "bulk-toolbar__done--warn" : "bulk-toolbar__done--ok"}`}> + Bulk operation complete: {bulkProgress.completed} succeeded + {bulkProgress.failed > 0 && `, ${bulkProgress.failed} failed`} + {bulkCancelRef.current && " (cancelled)"} + +
+ )} +
- {videos - .filter((v) => activeFilter === null || v.processing_status === activeFilter) - .map((video) => ( + {filteredVideos.map((video) => (
{ if (el) videoRefs.current.set(video.id, el); }}>
toggleExpand(video.id)} > +
e.stopPropagation()}> + toggleSelect(video.id)} + disabled={bulkProgress?.active ?? false} + aria-label={`Select ${video.filename}`} + /> +
{video.filename} @@ -671,6 +870,19 @@ export default function AdminPipeline() { )}
))} + {filteredVideos.length > 0 && ( +
+ +
+ )}
)}