feat: Wired automatic run_pipeline.delay() dispatch after ingest commit…
- "backend/routers/pipeline.py" - "backend/routers/ingest.py" - "backend/main.py" GSD-Task: S03/T04
This commit is contained in:
parent
5c46d1e922
commit
910e945d9c
6 changed files with 173 additions and 2 deletions
|
|
@ -192,7 +192,7 @@
|
||||||
- Estimate: 1.5h
|
- Estimate: 1.5h
|
||||||
- Files: backend/pipeline/embedding_client.py, backend/pipeline/qdrant_client.py, backend/pipeline/stages.py
|
- Files: backend/pipeline/embedding_client.py, backend/pipeline/qdrant_client.py, backend/pipeline/stages.py
|
||||||
- Verify: cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')" && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')" && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"
|
- Verify: cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')" && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')" && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"
|
||||||
- [ ] **T04: Wire ingest-to-pipeline trigger and add manual re-trigger endpoint** — Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos.
|
- [x] **T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing** — Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos.
|
||||||
|
|
||||||
## Steps
|
## Steps
|
||||||
|
|
||||||
|
|
|
||||||
16
.gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json
Normal file
16
.gsd/milestones/M001/slices/S03/tasks/T03-VERIFY.json
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
{
|
||||||
|
"schemaVersion": 1,
|
||||||
|
"taskId": "T03",
|
||||||
|
"unitId": "M001/S03/T03",
|
||||||
|
"timestamp": 1774823944133,
|
||||||
|
"passed": true,
|
||||||
|
"discoverySource": "task-plan",
|
||||||
|
"checks": [
|
||||||
|
{
|
||||||
|
"command": "cd backend",
|
||||||
|
"exitCode": 0,
|
||||||
|
"durationMs": 3,
|
||||||
|
"verdict": "pass"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
87
.gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md
Normal file
87
.gsd/milestones/M001/slices/S03/tasks/T04-SUMMARY.md
Normal file
|
|
@ -0,0 +1,87 @@
|
||||||
|
---
|
||||||
|
id: T04
|
||||||
|
parent: S03
|
||||||
|
milestone: M001
|
||||||
|
provides: []
|
||||||
|
requires: []
|
||||||
|
affects: []
|
||||||
|
key_files: ["backend/routers/pipeline.py", "backend/routers/ingest.py", "backend/main.py"]
|
||||||
|
key_decisions: ["Pipeline dispatch in ingest is best-effort: wrapped in try/except, logs WARNING on failure, ingest response still succeeds", "Manual trigger returns 503 (not swallowed) when Celery/Redis is down — deliberate distinction from ingest which silently swallows dispatch failures", "Pipeline router import of run_pipeline is inside the handler to avoid circular imports at module level"]
|
||||||
|
patterns_established: []
|
||||||
|
drill_down_paths: []
|
||||||
|
observability_surfaces: []
|
||||||
|
duration: ""
|
||||||
|
verification_result: "All 3 task-level verification commands pass: pipeline router has /pipeline/trigger/{video_id} route, 'pipeline' found in main.py, 'run_pipeline' found in ingest.py. All 5 slice-level verification commands also pass: Settings defaults print correctly, pipeline schemas import, LLMClient imports, celery_app.main prints 'chrysopedia', and openai/qdrant-client are in requirements.txt."
|
||||||
|
completed_at: 2026-03-29T22:41:00.020Z
|
||||||
|
blocker_discovered: false
|
||||||
|
---
|
||||||
|
|
||||||
|
# T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing
|
||||||
|
|
||||||
|
> Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing
|
||||||
|
|
||||||
|
## What Happened
|
||||||
|
---
|
||||||
|
id: T04
|
||||||
|
parent: S03
|
||||||
|
milestone: M001
|
||||||
|
key_files:
|
||||||
|
- backend/routers/pipeline.py
|
||||||
|
- backend/routers/ingest.py
|
||||||
|
- backend/main.py
|
||||||
|
key_decisions:
|
||||||
|
- Pipeline dispatch in ingest is best-effort: wrapped in try/except, logs WARNING on failure, ingest response still succeeds
|
||||||
|
- Manual trigger returns 503 (not swallowed) when Celery/Redis is down — deliberate distinction from ingest which silently swallows dispatch failures
|
||||||
|
- Pipeline router import of run_pipeline is inside the handler to avoid circular imports at module level
|
||||||
|
duration: ""
|
||||||
|
verification_result: passed
|
||||||
|
completed_at: 2026-03-29T22:41:00.020Z
|
||||||
|
blocker_discovered: false
|
||||||
|
---
|
||||||
|
|
||||||
|
# T04: Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing
|
||||||
|
|
||||||
|
**Wired automatic run_pipeline.delay() dispatch after ingest commit and added POST /api/v1/pipeline/trigger/{video_id} for manual re-processing**
|
||||||
|
|
||||||
|
## What Happened
|
||||||
|
|
||||||
|
Added pipeline dispatch to two entry points: (1) the ingest endpoint now calls run_pipeline.delay() after db.commit(), wrapped in try/except so dispatch failures never fail the ingest response; (2) a new pipeline router with POST /trigger/{video_id} that looks up the video, returns 404 if missing, dispatches run_pipeline.delay(), and returns the current processing status (returns 503 on dispatch failure since it's an explicit user action). Mounted the pipeline router in main.py under /api/v1.
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
All 3 task-level verification commands pass: pipeline router has /pipeline/trigger/{video_id} route, 'pipeline' found in main.py, 'run_pipeline' found in ingest.py. All 5 slice-level verification commands also pass: Settings defaults print correctly, pipeline schemas import, LLMClient imports, celery_app.main prints 'chrysopedia', and openai/qdrant-client are in requirements.txt.
|
||||||
|
|
||||||
|
## Verification Evidence
|
||||||
|
|
||||||
|
| # | Command | Exit Code | Verdict | Duration |
|
||||||
|
|---|---------|-----------|---------|----------|
|
||||||
|
| 1 | `cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])"` | 0 | ✅ pass | 500ms |
|
||||||
|
| 2 | `grep -q 'pipeline' backend/main.py` | 0 | ✅ pass | 50ms |
|
||||||
|
| 3 | `grep -q 'run_pipeline' backend/routers/ingest.py` | 0 | ✅ pass | 50ms |
|
||||||
|
| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 500ms |
|
||||||
|
| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 500ms |
|
||||||
|
| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 500ms |
|
||||||
|
| 7 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 500ms |
|
||||||
|
| 8 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
|
||||||
|
|
||||||
|
|
||||||
|
## Deviations
|
||||||
|
|
||||||
|
None.
|
||||||
|
|
||||||
|
## Known Issues
|
||||||
|
|
||||||
|
None.
|
||||||
|
|
||||||
|
## Files Created/Modified
|
||||||
|
|
||||||
|
- `backend/routers/pipeline.py`
|
||||||
|
- `backend/routers/ingest.py`
|
||||||
|
- `backend/main.py`
|
||||||
|
|
||||||
|
|
||||||
|
## Deviations
|
||||||
|
None.
|
||||||
|
|
||||||
|
## Known Issues
|
||||||
|
None.
|
||||||
|
|
@ -12,7 +12,7 @@ from fastapi import FastAPI
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from config import get_settings
|
from config import get_settings
|
||||||
from routers import creators, health, ingest, videos
|
from routers import creators, health, ingest, pipeline, videos
|
||||||
|
|
||||||
|
|
||||||
def _setup_logging() -> None:
|
def _setup_logging() -> None:
|
||||||
|
|
@ -80,6 +80,7 @@ app.include_router(health.router)
|
||||||
# Versioned API
|
# Versioned API
|
||||||
app.include_router(creators.router, prefix="/api/v1")
|
app.include_router(creators.router, prefix="/api/v1")
|
||||||
app.include_router(ingest.router, prefix="/api/v1")
|
app.include_router(ingest.router, prefix="/api/v1")
|
||||||
|
app.include_router(pipeline.router, prefix="/api/v1")
|
||||||
app.include_router(videos.router, prefix="/api/v1")
|
app.include_router(videos.router, prefix="/api/v1")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -174,6 +174,19 @@ async def ingest_transcript(
|
||||||
await db.refresh(video)
|
await db.refresh(video)
|
||||||
await db.refresh(creator)
|
await db.refresh(creator)
|
||||||
|
|
||||||
|
# ── 7. Dispatch LLM pipeline (best-effort) ──────────────────────────
|
||||||
|
try:
|
||||||
|
from pipeline.stages import run_pipeline
|
||||||
|
|
||||||
|
run_pipeline.delay(str(video.id))
|
||||||
|
logger.info("Pipeline dispatched for video_id=%s", video.id)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
"Pipeline dispatch failed for video_id=%s (ingest still succeeds): %s",
|
||||||
|
video.id,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
|
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
|
||||||
creator.name,
|
creator.name,
|
||||||
|
|
|
||||||
54
backend/routers/pipeline.py
Normal file
54
backend/routers/pipeline.py
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
"""Pipeline management endpoints for manual re-trigger and status inspection."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from database import get_session
|
||||||
|
from models import SourceVideo
|
||||||
|
|
||||||
|
logger = logging.getLogger("chrysopedia.pipeline")
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/pipeline", tags=["pipeline"])
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/trigger/{video_id}")
|
||||||
|
async def trigger_pipeline(
|
||||||
|
video_id: str,
|
||||||
|
db: AsyncSession = Depends(get_session),
|
||||||
|
):
|
||||||
|
"""Manually trigger (or re-trigger) the LLM extraction pipeline for a video.
|
||||||
|
|
||||||
|
Looks up the SourceVideo by ID, dispatches ``run_pipeline.delay()``,
|
||||||
|
and returns the current processing status. Returns 404 if the video
|
||||||
|
does not exist.
|
||||||
|
"""
|
||||||
|
stmt = select(SourceVideo).where(SourceVideo.id == video_id)
|
||||||
|
result = await db.execute(stmt)
|
||||||
|
video = result.scalar_one_or_none()
|
||||||
|
|
||||||
|
if video is None:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Video not found: {video_id}")
|
||||||
|
|
||||||
|
# Import inside handler to avoid circular import at module level
|
||||||
|
from pipeline.stages import run_pipeline
|
||||||
|
|
||||||
|
try:
|
||||||
|
run_pipeline.delay(str(video.id))
|
||||||
|
logger.info("Pipeline manually triggered for video_id=%s", video_id)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to dispatch pipeline for video_id=%s: %s", video_id, exc
|
||||||
|
)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=503,
|
||||||
|
detail="Pipeline dispatch failed — Celery/Redis may be unavailable",
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "triggered",
|
||||||
|
"video_id": str(video.id),
|
||||||
|
"current_processing_status": video.processing_status.value,
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue