diff --git a/backend/main.py b/backend/main.py index 84c22b3..53abc33 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import get_settings -from routers import creators, health, videos +from routers import creators, health, ingest, videos def _setup_logging() -> None: @@ -79,6 +79,7 @@ app.include_router(health.router) # Versioned API app.include_router(creators.router, prefix="/api/v1") +app.include_router(ingest.router, prefix="/api/v1") app.include_router(videos.router, prefix="/api/v1") diff --git a/backend/requirements.txt b/backend/requirements.txt index 95a393b..5f1315a 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -8,4 +8,5 @@ pydantic-settings>=2.0,<3.0 celery[redis]>=5.4.0,<6.0 redis>=5.0,<6.0 python-dotenv>=1.0,<2.0 +python-multipart>=0.0.9,<1.0 httpx>=0.27.0,<1.0 diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py new file mode 100644 index 0000000..a08ce7a --- /dev/null +++ b/backend/routers/ingest.py @@ -0,0 +1,193 @@ +"""Transcript ingestion endpoint for the Chrysopedia API. + +Accepts a Whisper-format transcript JSON via multipart file upload, finds or +creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments, +persists the raw JSON to disk, and returns a structured response. +""" + +import json +import logging +import os +import re +import uuid + +from fastapi import APIRouter, Depends, HTTPException, UploadFile +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession + +from config import get_settings +from database import get_session +from models import ContentType, Creator, ProcessingStatus, SourceVideo, TranscriptSegment +from schemas import TranscriptIngestResponse + +logger = logging.getLogger("chrysopedia.ingest") + +router = APIRouter(prefix="/ingest", tags=["ingest"]) + +REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"} + + +def slugify(value: str) -> str: + """Lowercase, replace non-alphanumeric chars with hyphens, collapse/strip.""" + value = value.lower() + value = re.sub(r"[^a-z0-9]+", "-", value) + value = value.strip("-") + value = re.sub(r"-{2,}", "-", value) + return value + + +@router.post("", response_model=TranscriptIngestResponse) +async def ingest_transcript( + file: UploadFile, + db: AsyncSession = Depends(get_session), +) -> TranscriptIngestResponse: + """Ingest a Whisper transcript JSON file. + + Workflow: + 1. Parse and validate the uploaded JSON. + 2. Find-or-create a Creator by folder_name. + 3. Upsert a SourceVideo by (creator_id, filename). + 4. Bulk-insert TranscriptSegment rows. + 5. Save raw JSON to transcript_storage_path. + 6. Return structured response. + """ + settings = get_settings() + + # ── 1. Read & parse JSON ───────────────────────────────────────────── + try: + raw_bytes = await file.read() + raw_text = raw_bytes.decode("utf-8") + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Invalid file: {exc}") from exc + + try: + data = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise HTTPException( + status_code=422, detail=f"JSON parse error: {exc}" + ) from exc + + if not isinstance(data, dict): + raise HTTPException(status_code=422, detail="Expected a JSON object at the top level") + + missing = REQUIRED_KEYS - data.keys() + if missing: + raise HTTPException( + status_code=422, + detail=f"Missing required keys: {', '.join(sorted(missing))}", + ) + + source_file: str = data["source_file"] + creator_folder: str = data["creator_folder"] + duration_seconds: int | None = data.get("duration_seconds") + segments_data: list = data["segments"] + + if not isinstance(segments_data, list): + raise HTTPException(status_code=422, detail="'segments' must be an array") + + # ── 2. Find-or-create Creator ──────────────────────────────────────── + stmt = select(Creator).where(Creator.folder_name == creator_folder) + result = await db.execute(stmt) + creator = result.scalar_one_or_none() + + if creator is None: + creator = Creator( + name=creator_folder, + slug=slugify(creator_folder), + folder_name=creator_folder, + ) + db.add(creator) + await db.flush() # assign id + + # ── 3. Upsert SourceVideo ──────────────────────────────────────────── + stmt = select(SourceVideo).where( + SourceVideo.creator_id == creator.id, + SourceVideo.filename == source_file, + ) + result = await db.execute(stmt) + existing_video = result.scalar_one_or_none() + + is_reupload = existing_video is not None + + if is_reupload: + video = existing_video + # Delete old segments for idempotent re-upload + await db.execute( + delete(TranscriptSegment).where( + TranscriptSegment.source_video_id == video.id + ) + ) + video.duration_seconds = duration_seconds + video.processing_status = ProcessingStatus.transcribed + else: + video = SourceVideo( + creator_id=creator.id, + filename=source_file, + file_path=f"{creator_folder}/{source_file}", + duration_seconds=duration_seconds, + content_type=ContentType.tutorial, + processing_status=ProcessingStatus.transcribed, + ) + db.add(video) + await db.flush() # assign id + + # ── 4. Bulk-insert TranscriptSegments ──────────────────────────────── + segment_objs = [ + TranscriptSegment( + source_video_id=video.id, + start_time=float(seg["start"]), + end_time=float(seg["end"]), + text=str(seg["text"]), + segment_index=idx, + ) + for idx, seg in enumerate(segments_data) + ] + db.add_all(segment_objs) + + # ── 5. Save raw JSON to disk ───────────────────────────────────────── + transcript_dir = os.path.join( + settings.transcript_storage_path, creator_folder + ) + transcript_path = os.path.join(transcript_dir, f"{source_file}.json") + + try: + os.makedirs(transcript_dir, exist_ok=True) + with open(transcript_path, "w", encoding="utf-8") as f: + f.write(raw_text) + except OSError as exc: + raise HTTPException( + status_code=500, detail=f"Failed to save transcript: {exc}" + ) from exc + + video.transcript_path = transcript_path + + # ── 6. Commit & respond ────────────────────────────────────────────── + try: + await db.commit() + except Exception as exc: + await db.rollback() + logger.error("Database commit failed during ingest: %s", exc) + raise HTTPException( + status_code=500, detail="Database error during ingest" + ) from exc + + await db.refresh(video) + await db.refresh(creator) + + logger.info( + "Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s", + creator.name, + source_file, + len(segment_objs), + is_reupload, + ) + + return TranscriptIngestResponse( + video_id=video.id, + creator_id=creator.id, + creator_name=creator.name, + filename=source_file, + segments_stored=len(segment_objs), + processing_status=video.processing_status.value, + is_reupload=is_reupload, + ) diff --git a/backend/schemas.py b/backend/schemas.py index 2422d70..ad8092f 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -173,6 +173,19 @@ class TagRead(TagBase): id: uuid.UUID +# ── Transcript Ingestion ───────────────────────────────────────────────────── + +class TranscriptIngestResponse(BaseModel): + """Response returned after successfully ingesting a transcript.""" + video_id: uuid.UUID + creator_id: uuid.UUID + creator_name: str + filename: str + segments_stored: int + processing_status: str + is_reupload: bool + + # ── Pagination wrapper ─────────────────────────────────────────────────────── class PaginatedResponse(BaseModel):