From 910e945d9c17e9d3ad7de92f3c1082f347839727 Mon Sep 17 00:00:00 2001 From: jlightner Date: Sun, 29 Mar 2026 22:41:02 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Wired=20automatic=20run=5Fpipeline.dela?= =?UTF-8?q?y()=20dispatch=20after=20ingest=20commit=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/routers/pipeline.py" - "backend/routers/ingest.py" - "backend/main.py" GSD-Task: S03/T04 --- .gsd/milestones/M001/slices/S03/S03-PLAN.md | 2 +- .../M001/slices/S03/tasks/T03-VERIFY.json | 16 ++++ .../M001/slices/S03/tasks/T04-SUMMARY.md | 87 +++++++++++++++++++ backend/main.py | 3 +- backend/routers/ingest.py | 13 +++ backend/routers/pipeline.py | 54 ++++++++++++ 6 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 .gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json create mode 100644 .gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md create mode 100644 backend/routers/pipeline.py diff --git a/.gsd/milestones/M001/slices/S03/S03-PLAN.md b/.gsd/milestones/M001/slices/S03/S03-PLAN.md index 4907c4c..d840c86 100644 --- a/.gsd/milestones/M001/slices/S03/S03-PLAN.md +++ b/.gsd/milestones/M001/slices/S03/S03-PLAN.md @@ -192,7 +192,7 @@ - Estimate: 1.5h - Files: backend/pipeline/embedding_client.py, backend/pipeline/qdrant_client.py, backend/pipeline/stages.py - Verify: cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')" && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')" && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')" -- [ ] **T04: Wire ingest-to-pipeline trigger and add manual re-trigger endpoint** — Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos. +- [x] **T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing** — Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos. ## Steps diff --git a/.gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json b/.gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json new file mode 100644 index 0000000..1fd4805 --- /dev/null +++ b/.gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json @@ -0,0 +1,16 @@ +{ + "schemaVersion": 1, + "taskId": "T03", + "unitId": "M001/S03/T03", + "timestamp": 1774823944133, + "passed": true, + "discoverySource": "task-plan", + "checks": [ + { + "command": "cd backend", + "exitCode": 0, + "durationMs": 3, + "verdict": "pass" + } + ] +} diff --git a/.gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md b/.gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md new file mode 100644 index 0000000..da76c64 --- /dev/null +++ b/.gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md @@ -0,0 +1,87 @@ +--- +id: T04 +parent: S03 +milestone: M001 +provides: [] +requires: [] +affects: [] +key_files: ["backend/routers/pipeline.py", "backend/routers/ingest.py", "backend/main.py"] +key_decisions: ["Pipeline dispatch in ingest is best-effort: wrapped in try/except, logs WARNING on failure, ingest response still succeeds", "Manual trigger returns 503 (not swallowed) when Celery/Redis is down — deliberate distinction from ingest which silently swallows dispatch failures", "Pipeline router import of run_pipeline is inside the handler to avoid circular imports at module level"] +patterns_established: [] +drill_down_paths: [] +observability_surfaces: [] +duration: "" +verification_result: "All 3 task-level verification commands pass: pipeline router has /pipeline/trigger/{video_id} route, 'pipeline' found in main.py, 'run_pipeline' found in ingest.py. All 5 slice-level verification commands also pass: Settings defaults print correctly, pipeline schemas import, LLMClient imports, celery_app.main prints 'chrysopedia', and openai/qdrant-client are in requirements.txt." +completed_at: 2026-03-29T22:41:00.020Z +blocker_discovered: false +--- + +# T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing + +> Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing + +## What Happened +--- +id: T04 +parent: S03 +milestone: M001 +key_files: + - backend/routers/pipeline.py + - backend/routers/ingest.py + - backend/main.py +key_decisions: + - Pipeline dispatch in ingest is best-effort: wrapped in try/except, logs WARNING on failure, ingest response still succeeds + - Manual trigger returns 503 (not swallowed) when Celery/Redis is down — deliberate distinction from ingest which silently swallows dispatch failures + - Pipeline router import of run_pipeline is inside the handler to avoid circular imports at module level +duration: "" +verification_result: passed +completed_at: 2026-03-29T22:41:00.020Z +blocker_discovered: false +--- + +# T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing + +**Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing** + +## What Happened + +Added pipeline dispatch to two entry points: (1) the ingest endpoint now calls run_pipeline.delay() after db.commit(), wrapped in try/except so dispatch failures never fail the ingest response; (2) a new pipeline router with POST /trigger/{video_id} that looks up the video, returns 404 if missing, dispatches run_pipeline.delay(), and returns the current processing status (returns 503 on dispatch failure since it's an explicit user action). Mounted the pipeline router in main.py under /api/v1. + +## Verification + +All 3 task-level verification commands pass: pipeline router has /pipeline/trigger/{video_id} route, 'pipeline' found in main.py, 'run_pipeline' found in ingest.py. All 5 slice-level verification commands also pass: Settings defaults print correctly, pipeline schemas import, LLMClient imports, celery_app.main prints 'chrysopedia', and openai/qdrant-client are in requirements.txt. + +## Verification Evidence + +| # | Command | Exit Code | Verdict | Duration | +|---|---------|-----------|---------|----------| +| 1 | `cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])"` | 0 | ✅ pass | 500ms | +| 2 | `grep -q 'pipeline' backend/main.py` | 0 | ✅ pass | 50ms | +| 3 | `grep -q 'run_pipeline' backend/routers/ingest.py` | 0 | ✅ pass | 50ms | +| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 500ms | +| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 500ms | +| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 500ms | +| 7 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 500ms | +| 8 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms | + + +## Deviations + +None. + +## Known Issues + +None. + +## Files Created/Modified + +- `backend/routers/pipeline.py` +- `backend/routers/ingest.py` +- `backend/main.py` + + +## Deviations +None. + +## Known Issues +None. diff --git a/backend/main.py b/backend/main.py index 53abc33..bad1de2 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import get_settings -from routers import creators, health, ingest, videos +from routers import creators, health, ingest, pipeline, videos def _setup_logging() -> None: @@ -80,6 +80,7 @@ app.include_router(health.router) # Versioned API app.include_router(creators.router, prefix="/api/v1") app.include_router(ingest.router, prefix="/api/v1") +app.include_router(pipeline.router, prefix="/api/v1") app.include_router(videos.router, prefix="/api/v1") diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py index a08ce7a..77b1d1a 100644 --- a/backend/routers/ingest.py +++ b/backend/routers/ingest.py @@ -174,6 +174,19 @@ async def ingest_transcript( await db.refresh(video) await db.refresh(creator) + # ── 7. Dispatch LLM pipeline (best-effort) ────────────────────────── + try: + from pipeline.stages import run_pipeline + + run_pipeline.delay(str(video.id)) + logger.info("Pipeline dispatched for video_id=%s", video.id) + except Exception as exc: + logger.warning( + "Pipeline dispatch failed for video_id=%s (ingest still succeeds): %s", + video.id, + exc, + ) + logger.info( "Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s", creator.name, diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py new file mode 100644 index 0000000..a5b9367 --- /dev/null +++ b/backend/routers/pipeline.py @@ -0,0 +1,54 @@ +"""Pipeline management endpoints for manual re-trigger and status inspection.""" + +import logging + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from database import get_session +from models import SourceVideo + +logger = logging.getLogger("chrysopedia.pipeline") + +router = APIRouter(prefix="/pipeline", tags=["pipeline"]) + + +@router.post("/trigger/{video_id}") +async def trigger_pipeline( + video_id: str, + db: AsyncSession = Depends(get_session), +): + """Manually trigger (or re-trigger) the LLM extraction pipeline for a video. + + Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``, + and returns the current processing status. Returns 404 if the video + does not exist. + """ + stmt = select(SourceVideo).where(SourceVideo.id == video_id) + result = await db.execute(stmt) + video = result.scalar_one_or_none() + + if video is None: + raise HTTPException(status_code=404, detail=f"Video not found: {video_id}") + + # Import inside handler to avoid circular import at module level + from pipeline.stages import run_pipeline + + try: + run_pipeline.delay(str(video.id)) + logger.info("Pipeline manually triggered for video_id=%s", video_id) + except Exception as exc: + logger.warning( + "Failed to dispatch pipeline for video_id=%s: %s", video_id, exc + ) + raise HTTPException( + status_code=503, + detail="Pipeline dispatch failed — Celery/Redis may be unavailable", + ) from exc + + return { + "status": "triggered", + "video_id": str(video.id), + "current_processing_status": video.processing_status.value, + }