chrysopedia/backend/routers/ingest.py
jlightner 910e945d9c feat: Wired automatic run_pipeline.delay() dispatch after ingest commit…
- "backend/routers/pipeline.py"
- "backend/routers/ingest.py"
- "backend/main.py"

GSD-Task: S03/T04
2026-03-29 22:41:02 +00:00

206 lines
7.2 KiB
Python

"""Transcript ingestion endpoint for the Chrysopedia API.
Accepts a Whisper-format transcript JSON via multipart file upload, finds or
creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments,
persists the raw JSON to disk, and returns a structured response.
"""
import json
import logging
import os
import re
import uuid
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import get_session
from models import ContentType, Creator, ProcessingStatus, SourceVideo, TranscriptSegment
from schemas import TranscriptIngestResponse
logger = logging.getLogger("chrysopedia.ingest")
router = APIRouter(prefix="/ingest", tags=["ingest"])
REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"}
def slugify(value: str) -> str:
"""Lowercase, replace non-alphanumeric chars with hyphens, collapse/strip."""
value = value.lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = value.strip("-")
value = re.sub(r"-{2,}", "-", value)
return value
@router.post("", response_model=TranscriptIngestResponse)
async def ingest_transcript(
file: UploadFile,
db: AsyncSession = Depends(get_session),
) -> TranscriptIngestResponse:
"""Ingest a Whisper transcript JSON file.
Workflow:
1. Parse and validate the uploaded JSON.
2. Find-or-create a Creator by folder_name.
3. Upsert a SourceVideo by (creator_id, filename).
4. Bulk-insert TranscriptSegment rows.
5. Save raw JSON to transcript_storage_path.
6. Return structured response.
"""
settings = get_settings()
# ── 1. Read & parse JSON ─────────────────────────────────────────────
try:
raw_bytes = await file.read()
raw_text = raw_bytes.decode("utf-8")
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid file: {exc}") from exc
try:
data = json.loads(raw_text)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=422, detail=f"JSON parse error: {exc}"
) from exc
if not isinstance(data, dict):
raise HTTPException(status_code=422, detail="Expected a JSON object at the top level")
missing = REQUIRED_KEYS - data.keys()
if missing:
raise HTTPException(
status_code=422,
detail=f"Missing required keys: {', '.join(sorted(missing))}",
)
source_file: str = data["source_file"]
creator_folder: str = data["creator_folder"]
duration_seconds: int | None = data.get("duration_seconds")
segments_data: list = data["segments"]
if not isinstance(segments_data, list):
raise HTTPException(status_code=422, detail="'segments' must be an array")
# ── 2. Find-or-create Creator ────────────────────────────────────────
stmt = select(Creator).where(Creator.folder_name == creator_folder)
result = await db.execute(stmt)
creator = result.scalar_one_or_none()
if creator is None:
creator = Creator(
name=creator_folder,
slug=slugify(creator_folder),
folder_name=creator_folder,
)
db.add(creator)
await db.flush() # assign id
# ── 3. Upsert SourceVideo ────────────────────────────────────────────
stmt = select(SourceVideo).where(
SourceVideo.creator_id == creator.id,
SourceVideo.filename == source_file,
)
result = await db.execute(stmt)
existing_video = result.scalar_one_or_none()
is_reupload = existing_video is not None
if is_reupload:
video = existing_video
# Delete old segments for idempotent re-upload
await db.execute(
delete(TranscriptSegment).where(
TranscriptSegment.source_video_id == video.id
)
)
video.duration_seconds = duration_seconds
video.processing_status = ProcessingStatus.transcribed
else:
video = SourceVideo(
creator_id=creator.id,
filename=source_file,
file_path=f"{creator_folder}/{source_file}",
duration_seconds=duration_seconds,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.transcribed,
)
db.add(video)
await db.flush() # assign id
# ── 4. Bulk-insert TranscriptSegments ────────────────────────────────
segment_objs = [
TranscriptSegment(
source_video_id=video.id,
start_time=float(seg["start"]),
end_time=float(seg["end"]),
text=str(seg["text"]),
segment_index=idx,
)
for idx, seg in enumerate(segments_data)
]
db.add_all(segment_objs)
# ── 5. Save raw JSON to disk ─────────────────────────────────────────
transcript_dir = os.path.join(
settings.transcript_storage_path, creator_folder
)
transcript_path = os.path.join(transcript_dir, f"{source_file}.json")
try:
os.makedirs(transcript_dir, exist_ok=True)
with open(transcript_path, "w", encoding="utf-8") as f:
f.write(raw_text)
except OSError as exc:
raise HTTPException(
status_code=500, detail=f"Failed to save transcript: {exc}"
) from exc
video.transcript_path = transcript_path
# ── 6. Commit & respond ──────────────────────────────────────────────
try:
await db.commit()
except Exception as exc:
await db.rollback()
logger.error("Database commit failed during ingest: %s", exc)
raise HTTPException(
status_code=500, detail="Database error during ingest"
) from exc
await db.refresh(video)
await db.refresh(creator)
# ── 7. Dispatch LLM pipeline (best-effort) ──────────────────────────
try:
from pipeline.stages import run_pipeline
run_pipeline.delay(str(video.id))
logger.info("Pipeline dispatched for video_id=%s", video.id)
except Exception as exc:
logger.warning(
"Pipeline dispatch failed for video_id=%s (ingest still succeeds): %s",
video.id,
exc,
)
logger.info(
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
creator.name,
source_file,
len(segment_objs),
is_reupload,
)
return TranscriptIngestResponse(
video_id=video.id,
creator_id=creator.id,
creator_name=creator.name,
filename=source_file,
segments_stored=len(segment_objs),
processing_status=video.processing_status.value,
is_reupload=is_reupload,
)