feat: Created POST /api/v1/ingest endpoint that accepts Whisper transcr…
- "backend/routers/ingest.py" - "backend/schemas.py" - "backend/requirements.txt" - "backend/main.py" GSD-Task: S02/T01
This commit is contained in:
parent
9bddaed5d1
commit
88170a41f6
4 changed files with 209 additions and 1 deletions
|
|
@ -12,7 +12,7 @@ from fastapi import FastAPI
|
|||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from config import get_settings
|
||||
from routers import creators, health, videos
|
||||
from routers import creators, health, ingest, videos
|
||||
|
||||
|
||||
def _setup_logging() -> None:
|
||||
|
|
@ -79,6 +79,7 @@ app.include_router(health.router)
|
|||
|
||||
# Versioned API
|
||||
app.include_router(creators.router, prefix="/api/v1")
|
||||
app.include_router(ingest.router, prefix="/api/v1")
|
||||
app.include_router(videos.router, prefix="/api/v1")
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -8,4 +8,5 @@ pydantic-settings>=2.0,<3.0
|
|||
celery[redis]>=5.4.0,<6.0
|
||||
redis>=5.0,<6.0
|
||||
python-dotenv>=1.0,<2.0
|
||||
python-multipart>=0.0.9,<1.0
|
||||
httpx>=0.27.0,<1.0
|
||||
|
|
|
|||
193
backend/routers/ingest.py
Normal file
193
backend/routers/ingest.py
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
"""Transcript ingestion endpoint for the Chrysopedia API.
|
||||
|
||||
Accepts a Whisper-format transcript JSON via multipart file upload, finds or
|
||||
creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments,
|
||||
persists the raw JSON to disk, and returns a structured response.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, UploadFile
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from config import get_settings
|
||||
from database import get_session
|
||||
from models import ContentType, Creator, ProcessingStatus, SourceVideo, TranscriptSegment
|
||||
from schemas import TranscriptIngestResponse
|
||||
|
||||
logger = logging.getLogger("chrysopedia.ingest")
|
||||
|
||||
router = APIRouter(prefix="/ingest", tags=["ingest"])
|
||||
|
||||
REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"}
|
||||
|
||||
|
||||
def slugify(value: str) -> str:
|
||||
"""Lowercase, replace non-alphanumeric chars with hyphens, collapse/strip."""
|
||||
value = value.lower()
|
||||
value = re.sub(r"[^a-z0-9]+", "-", value)
|
||||
value = value.strip("-")
|
||||
value = re.sub(r"-{2,}", "-", value)
|
||||
return value
|
||||
|
||||
|
||||
@router.post("", response_model=TranscriptIngestResponse)
|
||||
async def ingest_transcript(
|
||||
file: UploadFile,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
) -> TranscriptIngestResponse:
|
||||
"""Ingest a Whisper transcript JSON file.
|
||||
|
||||
Workflow:
|
||||
1. Parse and validate the uploaded JSON.
|
||||
2. Find-or-create a Creator by folder_name.
|
||||
3. Upsert a SourceVideo by (creator_id, filename).
|
||||
4. Bulk-insert TranscriptSegment rows.
|
||||
5. Save raw JSON to transcript_storage_path.
|
||||
6. Return structured response.
|
||||
"""
|
||||
settings = get_settings()
|
||||
|
||||
# ── 1. Read & parse JSON ─────────────────────────────────────────────
|
||||
try:
|
||||
raw_bytes = await file.read()
|
||||
raw_text = raw_bytes.decode("utf-8")
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid file: {exc}") from exc
|
||||
|
||||
try:
|
||||
data = json.loads(raw_text)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise HTTPException(
|
||||
status_code=422, detail=f"JSON parse error: {exc}"
|
||||
) from exc
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise HTTPException(status_code=422, detail="Expected a JSON object at the top level")
|
||||
|
||||
missing = REQUIRED_KEYS - data.keys()
|
||||
if missing:
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail=f"Missing required keys: {', '.join(sorted(missing))}",
|
||||
)
|
||||
|
||||
source_file: str = data["source_file"]
|
||||
creator_folder: str = data["creator_folder"]
|
||||
duration_seconds: int | None = data.get("duration_seconds")
|
||||
segments_data: list = data["segments"]
|
||||
|
||||
if not isinstance(segments_data, list):
|
||||
raise HTTPException(status_code=422, detail="'segments' must be an array")
|
||||
|
||||
# ── 2. Find-or-create Creator ────────────────────────────────────────
|
||||
stmt = select(Creator).where(Creator.folder_name == creator_folder)
|
||||
result = await db.execute(stmt)
|
||||
creator = result.scalar_one_or_none()
|
||||
|
||||
if creator is None:
|
||||
creator = Creator(
|
||||
name=creator_folder,
|
||||
slug=slugify(creator_folder),
|
||||
folder_name=creator_folder,
|
||||
)
|
||||
db.add(creator)
|
||||
await db.flush() # assign id
|
||||
|
||||
# ── 3. Upsert SourceVideo ────────────────────────────────────────────
|
||||
stmt = select(SourceVideo).where(
|
||||
SourceVideo.creator_id == creator.id,
|
||||
SourceVideo.filename == source_file,
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
existing_video = result.scalar_one_or_none()
|
||||
|
||||
is_reupload = existing_video is not None
|
||||
|
||||
if is_reupload:
|
||||
video = existing_video
|
||||
# Delete old segments for idempotent re-upload
|
||||
await db.execute(
|
||||
delete(TranscriptSegment).where(
|
||||
TranscriptSegment.source_video_id == video.id
|
||||
)
|
||||
)
|
||||
video.duration_seconds = duration_seconds
|
||||
video.processing_status = ProcessingStatus.transcribed
|
||||
else:
|
||||
video = SourceVideo(
|
||||
creator_id=creator.id,
|
||||
filename=source_file,
|
||||
file_path=f"{creator_folder}/{source_file}",
|
||||
duration_seconds=duration_seconds,
|
||||
content_type=ContentType.tutorial,
|
||||
processing_status=ProcessingStatus.transcribed,
|
||||
)
|
||||
db.add(video)
|
||||
await db.flush() # assign id
|
||||
|
||||
# ── 4. Bulk-insert TranscriptSegments ────────────────────────────────
|
||||
segment_objs = [
|
||||
TranscriptSegment(
|
||||
source_video_id=video.id,
|
||||
start_time=float(seg["start"]),
|
||||
end_time=float(seg["end"]),
|
||||
text=str(seg["text"]),
|
||||
segment_index=idx,
|
||||
)
|
||||
for idx, seg in enumerate(segments_data)
|
||||
]
|
||||
db.add_all(segment_objs)
|
||||
|
||||
# ── 5. Save raw JSON to disk ─────────────────────────────────────────
|
||||
transcript_dir = os.path.join(
|
||||
settings.transcript_storage_path, creator_folder
|
||||
)
|
||||
transcript_path = os.path.join(transcript_dir, f"{source_file}.json")
|
||||
|
||||
try:
|
||||
os.makedirs(transcript_dir, exist_ok=True)
|
||||
with open(transcript_path, "w", encoding="utf-8") as f:
|
||||
f.write(raw_text)
|
||||
except OSError as exc:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to save transcript: {exc}"
|
||||
) from exc
|
||||
|
||||
video.transcript_path = transcript_path
|
||||
|
||||
# ── 6. Commit & respond ──────────────────────────────────────────────
|
||||
try:
|
||||
await db.commit()
|
||||
except Exception as exc:
|
||||
await db.rollback()
|
||||
logger.error("Database commit failed during ingest: %s", exc)
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Database error during ingest"
|
||||
) from exc
|
||||
|
||||
await db.refresh(video)
|
||||
await db.refresh(creator)
|
||||
|
||||
logger.info(
|
||||
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
|
||||
creator.name,
|
||||
source_file,
|
||||
len(segment_objs),
|
||||
is_reupload,
|
||||
)
|
||||
|
||||
return TranscriptIngestResponse(
|
||||
video_id=video.id,
|
||||
creator_id=creator.id,
|
||||
creator_name=creator.name,
|
||||
filename=source_file,
|
||||
segments_stored=len(segment_objs),
|
||||
processing_status=video.processing_status.value,
|
||||
is_reupload=is_reupload,
|
||||
)
|
||||
|
|
@ -173,6 +173,19 @@ class TagRead(TagBase):
|
|||
id: uuid.UUID
|
||||
|
||||
|
||||
# ── Transcript Ingestion ─────────────────────────────────────────────────────
|
||||
|
||||
class TranscriptIngestResponse(BaseModel):
|
||||
"""Response returned after successfully ingesting a transcript."""
|
||||
video_id: uuid.UUID
|
||||
creator_id: uuid.UUID
|
||||
creator_name: str
|
||||
filename: str
|
||||
segments_stored: int
|
||||
processing_status: str
|
||||
is_reupload: bool
|
||||
|
||||
|
||||
# ── Pagination wrapper ───────────────────────────────────────────────────────
|
||||
|
||||
class PaginatedResponse(BaseModel):
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue