feat: Add bulk pipeline reprocessing — creator filter, multi-select, clean retrigger

- Backend: POST /admin/pipeline/clean-retrigger/{video_id} endpoint that
  deletes pipeline_events, key_moments, transcript_segments, and Qdrant
  vectors before retriggering the pipeline
- Backend: QdrantManager.delete_by_video_id() for vector cleanup
- Frontend: Creator filter dropdown on pipeline admin page
- Frontend: Checkbox selection column with select-all
- Frontend: Bulk toolbar with Retrigger Selected and Clean Reprocess
  actions, sequential dispatch with progress bar, cancel support
- Bulk dispatch uses 500ms delay between requests to avoid slamming API
This commit is contained in:
jlightner 2026-03-31 15:24:59 +00:00
parent f3e6a9c885
commit 04ae6d0703
5 changed files with 504 additions and 15 deletions

View file

@ -63,6 +63,45 @@ class QdrantManager:
exc, exc,
) )
# ── Deletion ───────────────────────────────────────────────────────────
def delete_by_video_id(self, video_id: str) -> int:
"""Delete all points (key moments + technique pages) associated with a video.
Key moments have source_video_id in payload.
Technique pages don't have direct video linkage, so only moments are deleted.
Returns the count of deleted points (best-effort Qdrant may not report exact counts).
"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
try:
result = self._client.delete(
collection_name=self._collection,
points_selector=Filter(
must=[
FieldCondition(
key="source_video_id",
match=MatchValue(value=video_id),
),
],
),
)
logger.info(
"Deleted Qdrant points for video_id=%s from collection '%s'.",
video_id,
self._collection,
)
return 0 # Qdrant delete doesn't return count
except Exception as exc:
logger.warning(
"Qdrant delete for video_id=%s failed (%s: %s). Skipping.",
video_id,
type(exc).__name__,
exc,
)
return 0
# ── Low-level upsert ───────────────────────────────────────────────── # ── Low-level upsert ─────────────────────────────────────────────────
def upsert_points(self, points: list[PointStruct]) -> None: def upsert_points(self, points: list[PointStruct]) -> None:

View file

@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings from config import get_settings
from database import get_session from database import get_session
from models import PipelineEvent, SourceVideo, Creator from models import PipelineEvent, SourceVideo, Creator, KeyMoment, TranscriptSegment, ProcessingStatus
from redis_client import get_redis from redis_client import get_redis
from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse from schemas import DebugModeResponse, DebugModeUpdate, TokenStageSummary, TokenSummaryResponse
@ -140,6 +140,78 @@ async def admin_trigger_pipeline(
return await trigger_pipeline(video_id, db) return await trigger_pipeline(video_id, db)
# ── Admin: Clean Retrigger ───────────────────────────────────────────────────
@router.post("/admin/pipeline/clean-retrigger/{video_id}")
async def clean_retrigger_pipeline(
video_id: str,
db: AsyncSession = Depends(get_session),
):
"""Wipe prior pipeline output for a video, then retrigger.
Deletes: pipeline_events, key_moments, transcript_segments,
and associated Qdrant vectors. Resets processing_status to 'not_started'.
Does NOT delete technique_pages the pipeline re-synthesizes via upsert.
"""
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
result = await db.execute(stmt)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
# Delete pipeline events
await db.execute(
PipelineEvent.__table__.delete().where(PipelineEvent.video_id == video_id)
)
# Delete key moments
await db.execute(
KeyMoment.__table__.delete().where(KeyMoment.source_video_id == video_id)
)
# Delete transcript segments
await db.execute(
TranscriptSegment.__table__.delete().where(TranscriptSegment.source_video_id == video_id)
)
# Reset status
video.processing_status = ProcessingStatus.not_started
await db.commit()
deleted_counts = {
"pipeline_events": "cleared",
"key_moments": "cleared",
"transcript_segments": "cleared",
}
# Best-effort Qdrant cleanup (non-blocking)
try:
settings = get_settings()
from pipeline.qdrant_client import QdrantManager
qdrant = QdrantManager(settings)
qdrant.delete_by_video_id(str(video_id))
deleted_counts["qdrant_vectors"] = "cleared"
except Exception as exc:
logger.warning("Qdrant cleanup failed for video_id=%s: %s", video_id, exc)
deleted_counts["qdrant_vectors"] = f"skipped: {exc}"
# Now trigger the pipeline
from pipeline.stages import run_pipeline
try:
run_pipeline.delay(str(video.id))
logger.info("Clean retrigger dispatched for video_id=%s", video_id)
except Exception as exc:
logger.warning("Failed to dispatch pipeline after cleanup for video_id=%s: %s", video_id, exc)
raise HTTPException(
status_code=503,
detail="Cleanup succeeded but pipeline dispatch failed — Celery/Redis may be unavailable",
) from exc
return {
"status": "clean_retriggered",
"video_id": str(video.id),
"cleaned": deleted_counts,
}
# ── Admin: Revoke ──────────────────────────────────────────────────────────── # ── Admin: Revoke ────────────────────────────────────────────────────────────
@router.post("/admin/pipeline/revoke/{video_id}") @router.post("/admin/pipeline/revoke/{video_id}")

View file

@ -3349,6 +3349,160 @@ a.app-footer__repo:hover {
gap: 0.75rem; gap: 0.75rem;
} }
.admin-pipeline__filters {
display: flex;
align-items: center;
gap: 1rem;
flex-wrap: wrap;
margin-bottom: 0.5rem;
}
.admin-pipeline__select-all {
padding: 0.5rem 0;
font-size: 0.85rem;
color: var(--color-text-muted);
}
.admin-pipeline__select-all label {
display: flex;
align-items: center;
gap: 0.5rem;
cursor: pointer;
}
/* ── Creator Filter ───────────────────────────────────────────────────────── */
.creator-filter__select {
padding: 0.4rem 0.75rem;
border-radius: 6px;
border: 1px solid var(--color-border);
background: var(--color-surface);
color: var(--color-text);
font-size: 0.85rem;
cursor: pointer;
}
.creator-filter__select:focus {
outline: 2px solid var(--color-accent);
outline-offset: 1px;
}
/* ── Checkbox in video rows ───────────────────────────────────────────────── */
.pipeline-video__checkbox {
display: flex;
align-items: center;
padding-right: 0.5rem;
flex-shrink: 0;
}
.pipeline-video__checkbox input[type="checkbox"] {
width: 16px;
height: 16px;
cursor: pointer;
accent-color: var(--color-accent);
}
/* ── Bulk Toolbar ─────────────────────────────────────────────────────────── */
.bulk-toolbar {
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.75rem 1rem;
background: var(--color-surface);
border: 1px solid var(--color-accent);
border-radius: 8px;
margin-bottom: 0.75rem;
flex-wrap: wrap;
}
.bulk-toolbar__count {
font-weight: 600;
color: var(--color-accent);
white-space: nowrap;
}
.bulk-toolbar__progress {
display: flex;
align-items: center;
gap: 0.5rem;
flex: 1;
min-width: 200px;
}
.bulk-toolbar__progress-bar {
flex: 1;
height: 6px;
background: var(--color-bg);
border-radius: 3px;
overflow: hidden;
}
.bulk-toolbar__progress-fill {
height: 100%;
background: var(--color-accent);
border-radius: 3px;
transition: width 300ms ease;
}
.bulk-toolbar__progress-text {
font-size: 0.85rem;
white-space: nowrap;
}
.bulk-toolbar__current {
font-size: 0.8rem;
color: var(--color-text-muted);
max-width: 200px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.bulk-toolbar__done {
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.5rem 1rem;
border-radius: 8px;
margin-bottom: 0.75rem;
font-size: 0.9rem;
}
.bulk-toolbar__done--ok {
background: rgba(0, 200, 83, 0.1);
border: 1px solid rgba(0, 200, 83, 0.3);
color: #00c853;
}
.bulk-toolbar__done--warn {
background: rgba(255, 152, 0, 0.1);
border: 1px solid rgba(255, 152, 0, 0.3);
color: #ff9800;
}
.bulk-toolbar__dismiss {
margin-left: auto;
background: none;
border: none;
color: inherit;
cursor: pointer;
font-size: 1rem;
padding: 0.2rem;
opacity: 0.7;
}
.bulk-toolbar__dismiss:hover {
opacity: 1;
}
.btn--warning {
background: rgba(255, 152, 0, 0.15);
color: #ff9800;
border: 1px solid rgba(255, 152, 0, 0.3);
}
/* ── Worker Status Indicator ────────────────────────────────────────────── */ /* ── Worker Status Indicator ────────────────────────────────────────────── */
.worker-status { .worker-status {
@ -3410,7 +3564,7 @@ a.app-footer__repo:hover {
.pipeline-video__header { .pipeline-video__header {
display: grid; display: grid;
grid-template-columns: 1fr auto auto; grid-template-columns: auto 1fr auto auto;
gap: 0.75rem; gap: 0.75rem;
align-items: center; align-items: center;
padding: 0.75rem 1rem; padding: 0.75rem 1rem;

View file

@ -511,6 +511,18 @@ export async function revokePipeline(videoId: string): Promise<RevokeResponse> {
}); });
} }
export interface CleanRetriggerResponse {
status: string;
video_id: string;
cleaned: Record<string, string>;
}
export async function cleanRetriggerPipeline(videoId: string): Promise<CleanRetriggerResponse> {
return request<CleanRetriggerResponse>(`${BASE}/admin/pipeline/clean-retrigger/${videoId}`, {
method: "POST",
});
}
// ── Debug Mode ────────────────────────────────────────────────────────────── // ── Debug Mode ──────────────────────────────────────────────────────────────
export interface DebugModeResponse { export interface DebugModeResponse {

View file

@ -14,6 +14,8 @@ import {
setDebugMode, setDebugMode,
triggerPipeline, triggerPipeline,
revokePipeline, revokePipeline,
cleanRetriggerPipeline,
fetchCreators,
type PipelineVideoItem, type PipelineVideoItem,
type PipelineEvent, type PipelineEvent,
type WorkerStatusResponse, type WorkerStatusResponse,
@ -466,6 +468,14 @@ function StatusFilter({
// ── Main Page ──────────────────────────────────────────────────────────────── // ── Main Page ────────────────────────────────────────────────────────────────
interface BulkProgress {
total: number;
completed: number;
failed: number;
current: string | null;
active: boolean;
}
export default function AdminPipeline() { export default function AdminPipeline() {
useDocumentTitle("Pipeline Management — Chrysopedia"); useDocumentTitle("Pipeline Management — Chrysopedia");
const [searchParams] = useSearchParams(); const [searchParams] = useSearchParams();
@ -477,9 +487,21 @@ export default function AdminPipeline() {
const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null); const [actionMessage, setActionMessage] = useState<{ id: string; text: string; ok: boolean } | null>(null);
const [activeFilter, setActiveFilter] = useState<string | null>(null); const [activeFilter, setActiveFilter] = useState<string | null>(null);
const [debugMode, setDebugModeState] = useState<boolean | null>(null); const [debugMode, setDebugModeState] = useState<boolean | null>(null);
const [creators, setCreators] = useState<{ name: string; slug: string }[]>([]);
const [creatorFilter, setCreatorFilter] = useState<string | null>(null);
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
const [bulkProgress, setBulkProgress] = useState<BulkProgress | null>(null);
const bulkCancelRef = useRef(false);
const videoRefs = useRef<Map<string, HTMLDivElement>>(new Map()); const videoRefs = useRef<Map<string, HTMLDivElement>>(new Map());
const deepLinked = useRef(false); const deepLinked = useRef(false);
// Compute filtered list (status + creator)
const filteredVideos = videos.filter((v) => {
if (activeFilter !== null && v.processing_status !== activeFilter) return false;
if (creatorFilter !== null && v.creator_name !== creatorFilter) return false;
return true;
});
const load = useCallback(async () => { const load = useCallback(async () => {
setLoading(true); setLoading(true);
setError(null); setError(null);
@ -497,6 +519,22 @@ export default function AdminPipeline() {
void load(); void load();
}, [load]); }, [load]);
// Load creators for filter dropdown
useEffect(() => {
fetchCreators({ limit: 200 })
.then((res) => {
const list = res.items.map((c: { name: string; slug: string }) => ({
name: c.name,
slug: c.slug,
}));
list.sort((a: { name: string }, b: { name: string }) => a.name.localeCompare(b.name));
setCreators(list);
})
.catch(() => {
// silently fail — filter stays hidden
});
}, []);
// Deep-link: auto-expand and scroll to ?video=<id> on first load // Deep-link: auto-expand and scroll to ?video=<id> on first load
useEffect(() => { useEffect(() => {
if (deepLinked.current || loading || videos.length === 0) return; if (deepLinked.current || loading || videos.length === 0) return;
@ -506,7 +544,6 @@ export default function AdminPipeline() {
if (!match) return; if (!match) return;
deepLinked.current = true; deepLinked.current = true;
setExpandedId(targetVideoId); setExpandedId(targetVideoId);
// Scroll after the expanded detail renders
requestAnimationFrame(() => { requestAnimationFrame(() => {
const el = videoRefs.current.get(targetVideoId); const el = videoRefs.current.get(targetVideoId);
if (el) el.scrollIntoView({ behavior: "smooth", block: "start" }); if (el) el.scrollIntoView({ behavior: "smooth", block: "start" });
@ -519,19 +556,21 @@ export default function AdminPipeline() {
.then((res) => { .then((res) => {
if (!cancelled) setDebugModeState(res.debug_mode); if (!cancelled) setDebugModeState(res.debug_mode);
}) })
.catch(() => { .catch(() => {});
// silently fail — toggle stays hidden
});
return () => { cancelled = true; }; return () => { cancelled = true; };
}, []); }, []);
// Clear selection when filters change
useEffect(() => {
setSelectedIds(new Set());
}, [activeFilter, creatorFilter]);
const handleTrigger = async (videoId: string) => { const handleTrigger = async (videoId: string) => {
setActionLoading(videoId); setActionLoading(videoId);
setActionMessage(null); setActionMessage(null);
try { try {
const res = await triggerPipeline(videoId); const res = await triggerPipeline(videoId);
setActionMessage({ id: videoId, text: `Triggered (${res.status})`, ok: true }); setActionMessage({ id: videoId, text: `Triggered (${res.status})`, ok: true });
// Refresh after short delay to let status update
setTimeout(() => void load(), 2000); setTimeout(() => void load(), 2000);
} catch (err) { } catch (err) {
setActionMessage({ setActionMessage({
@ -572,6 +611,76 @@ export default function AdminPipeline() {
setExpandedId((prev) => (prev === id ? null : id)); setExpandedId((prev) => (prev === id ? null : id));
}; };
// ── Selection ───────────────────────────────────────────────────────────
const toggleSelect = (id: string) => {
setSelectedIds((prev) => {
const next = new Set(prev);
if (next.has(id)) next.delete(id);
else next.add(id);
return next;
});
};
const toggleSelectAll = () => {
const visibleIds = filteredVideos.map((v) => v.id);
const allSelected = visibleIds.every((id) => selectedIds.has(id));
if (allSelected) {
setSelectedIds(new Set());
} else {
setSelectedIds(new Set(visibleIds));
}
};
const allVisibleSelected = filteredVideos.length > 0 && filteredVideos.every((v) => selectedIds.has(v.id));
// ── Bulk Actions ────────────────────────────────────────────────────────
const runBulk = async (clean: boolean) => {
const ids = Array.from(selectedIds);
if (ids.length === 0) return;
bulkCancelRef.current = false;
setBulkProgress({ total: ids.length, completed: 0, failed: 0, current: null, active: true });
let completed = 0;
let failed = 0;
for (const id of ids) {
if (bulkCancelRef.current) break;
const video = videos.find((v) => v.id === id);
setBulkProgress((p) => p ? { ...p, current: video?.filename ?? id } : null);
try {
if (clean) {
await cleanRetriggerPipeline(id);
} else {
await triggerPipeline(id);
}
completed++;
} catch {
failed++;
}
setBulkProgress((p) => p ? { ...p, completed: completed, failed } : null);
// Brief pause between dispatches to avoid slamming the API
if (!bulkCancelRef.current && ids.indexOf(id) < ids.length - 1) {
await new Promise((r) => setTimeout(r, 500));
}
}
setBulkProgress((p) => p ? { ...p, active: false, current: null } : null);
setSelectedIds(new Set());
// Refresh video list after bulk completes
setTimeout(() => void load(), 2000);
};
const cancelBulk = () => {
bulkCancelRef.current = true;
};
return ( return (
<div className="admin-pipeline"> <div className="admin-pipeline">
<div className="admin-pipeline__header"> <div className="admin-pipeline__header">
@ -598,20 +707,110 @@ export default function AdminPipeline() {
<div className="empty-state">No videos in pipeline.</div> <div className="empty-state">No videos in pipeline.</div>
) : ( ) : (
<> <>
<div className="admin-pipeline__filters">
<StatusFilter <StatusFilter
videos={videos} videos={videos}
activeFilter={activeFilter} activeFilter={activeFilter}
onFilterChange={setActiveFilter} onFilterChange={setActiveFilter}
/> />
{creators.length > 1 && (
<div className="creator-filter">
<select
className="creator-filter__select"
value={creatorFilter ?? ""}
onChange={(e) => setCreatorFilter(e.target.value || null)}
>
<option value="">All Creators</option>
{creators.map((c) => (
<option key={c.slug} value={c.name}>{c.name}</option>
))}
</select>
</div>
)}
</div>
{/* Bulk toolbar */}
{selectedIds.size > 0 && (
<div className="bulk-toolbar">
<span className="bulk-toolbar__count">
{selectedIds.size} selected
</span>
{bulkProgress?.active ? (
<>
<div className="bulk-toolbar__progress">
<div className="bulk-toolbar__progress-bar">
<div
className="bulk-toolbar__progress-fill"
style={{ width: `${((bulkProgress.completed + bulkProgress.failed) / bulkProgress.total) * 100}%` }}
/>
</div>
<span className="bulk-toolbar__progress-text">
{bulkProgress.completed + bulkProgress.failed}/{bulkProgress.total}
{bulkProgress.failed > 0 && <span className="error-text"> ({bulkProgress.failed} failed)</span>}
</span>
{bulkProgress.current && (
<span className="bulk-toolbar__current" title={bulkProgress.current}>
{bulkProgress.current}
</span>
)}
</div>
<button className="btn btn--small btn--danger" onClick={cancelBulk}>
Cancel
</button>
</>
) : (
<>
<button
className="btn btn--small btn--primary"
onClick={() => void runBulk(false)}
title="Retrigger pipeline for selected videos"
>
Retrigger Selected
</button>
<button
className="btn btn--small btn--warning"
onClick={() => void runBulk(true)}
title="Wipe pipeline output (events, moments, segments) then retrigger"
>
🧹 Clean Reprocess
</button>
<button
className="btn btn--small btn--secondary"
onClick={() => setSelectedIds(new Set())}
>
Clear
</button>
</>
)}
</div>
)}
{/* Bulk completion message */}
{bulkProgress && !bulkProgress.active && (
<div className={`bulk-toolbar__done ${bulkProgress.failed > 0 ? "bulk-toolbar__done--warn" : "bulk-toolbar__done--ok"}`}>
Bulk operation complete: {bulkProgress.completed} succeeded
{bulkProgress.failed > 0 && `, ${bulkProgress.failed} failed`}
{bulkCancelRef.current && " (cancelled)"}
<button className="bulk-toolbar__dismiss" onClick={() => setBulkProgress(null)}></button>
</div>
)}
<div className="admin-pipeline__list"> <div className="admin-pipeline__list">
{videos {filteredVideos.map((video) => (
.filter((v) => activeFilter === null || v.processing_status === activeFilter)
.map((video) => (
<div key={video.id} className="pipeline-video" ref={(el) => { if (el) videoRefs.current.set(video.id, el); }}> <div key={video.id} className="pipeline-video" ref={(el) => { if (el) videoRefs.current.set(video.id, el); }}>
<div <div
className="pipeline-video__header" className="pipeline-video__header"
onClick={() => toggleExpand(video.id)} onClick={() => toggleExpand(video.id)}
> >
<div className="pipeline-video__checkbox" onClick={(e) => e.stopPropagation()}>
<input
type="checkbox"
checked={selectedIds.has(video.id)}
onChange={() => toggleSelect(video.id)}
disabled={bulkProgress?.active ?? false}
aria-label={`Select ${video.filename}`}
/>
</div>
<div className="pipeline-video__info"> <div className="pipeline-video__info">
<span className="pipeline-video__filename" title={video.filename}> <span className="pipeline-video__filename" title={video.filename}>
{video.filename} {video.filename}
@ -671,6 +870,19 @@ export default function AdminPipeline() {
)} )}
</div> </div>
))} ))}
{filteredVideos.length > 0 && (
<div className="admin-pipeline__select-all">
<label>
<input
type="checkbox"
checked={allVisibleSelected}
onChange={toggleSelectAll}
disabled={bulkProgress?.active ?? false}
/>
{allVisibleSelected ? "Deselect" : "Select"} all {filteredVideos.length} visible
</label>
</div>
)}
</div> </div>
</> </>
)} )}