feat: Created POST /api/v1/ingest endpoint that accepts Whisper transcr…

- "backend/routers/ingest.py"
- "backend/schemas.py"
- "backend/requirements.txt"
- "backend/main.py"

GSD-Task: S02/T01
This commit is contained in:
jlightner 2026-03-29 22:09:46 +00:00
parent b3a05b8218
commit 5bfeb50716
13 changed files with 1010 additions and 3 deletions

View file

@ -6,7 +6,7 @@ Stand up the complete Chrysopedia stack: Docker Compose deployment on ub01, Post
## Slice Overview ## Slice Overview
| ID | Slice | Risk | Depends | Done | After this | | ID | Slice | Risk | Depends | Done | After this |
|----|-------|------|---------|------|------------| |----|-------|------|---------|------|------------|
| S01 | Docker Compose + Database + Whisper Script | low | — | | docker compose up -d starts all services on ub01; Whisper script transcribes a sample video to JSON | | S01 | Docker Compose + Database + Whisper Script | low | — | | docker compose up -d starts all services on ub01; Whisper script transcribes a sample video to JSON |
| S02 | Transcript Ingestion API | low | S01 | ⬜ | POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL | | S02 | Transcript Ingestion API | low | S01 | ⬜ | POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL |
| S03 | LLM Extraction Pipeline + Qdrant Integration | high | S02 | ⬜ | A transcript JSON triggers stages 2-5: segmentation → extraction → classification → synthesis. Technique pages with key moments appear in DB. Qdrant has searchable embeddings. | | S03 | LLM Extraction Pipeline + Qdrant Integration | high | S02 | ⬜ | A transcript JSON triggers stages 2-5: segmentation → extraction → classification → synthesis. Technique pages with key moments appear in DB. Qdrant has searchable embeddings. |
| S04 | Review Queue Admin UI | medium | S03 | ⬜ | Admin views pending key moments, approves/edits/rejects them, toggles between review and auto mode | | S04 | Review Queue Admin UI | medium | S03 | ⬜ | Admin views pending key moments, approves/edits/rejects them, toggles between review and auto mode |

View file

@ -0,0 +1,168 @@
---
id: S01
parent: M001
milestone: M001
provides:
- Docker Compose project definition (5 services) for deployment
- PostgreSQL schema with 7 tables via Alembic migration
- FastAPI app with health check and CRUD endpoints pattern
- Pydantic schemas for all 7 entities (reusable in S02+)
- SQLAlchemy async session infrastructure
- Sample transcript JSON fixture for S02 ingestion testing
- Canonical tags configuration (6 categories, 13 genres)
requires:
[]
affects:
- S02
- S03
key_files:
- docker-compose.yml
- .env.example
- docker/Dockerfile.api
- docker/Dockerfile.web
- backend/main.py
- backend/models.py
- backend/database.py
- backend/schemas.py
- backend/config.py
- backend/routers/health.py
- backend/routers/creators.py
- backend/routers/videos.py
- alembic.ini
- alembic/env.py
- alembic/versions/001_initial.py
- whisper/transcribe.py
- whisper/requirements.txt
- config/canonical_tags.yaml
- README.md
- tests/fixtures/sample_transcript.json
key_decisions:
- D001: XPLTD Docker conventions — xpltd_chrysopedia project, bind mounts at /vmPool/r/services/, network 172.24.0.0/24
- env_file uses required: false so docker compose config validates on fresh clones
- POSTGRES_PASSWORD uses :-changeme default instead of :? to avoid config failures
- PostgreSQL exposed on host port 5433 to avoid conflicts with other projects
- SQLAlchemy relationship import aliased to sa_relationship to avoid column name clash
- Separate PostgreSQL enum type names to avoid collisions (key_moment_content_type vs content_type)
- Health check at /health performs real DB SELECT 1; lightweight /api/v1/health also available
- Whisper import deferred so --help works without openai-whisper installed
- Sample transcript uses realistic music production content for downstream pipeline testing
patterns_established:
- Docker Compose service naming: chrysopedia-{role} (chrysopedia-db, chrysopedia-api, etc.)
- Backend router pattern: backend/routers/{domain}.py with prefix-per-router mounted under /api/v1
- SQLAlchemy async pattern: asyncpg engine + async_sessionmaker + get_session dependency
- Pydantic v2 schema pattern: Base/Create/Read variants per entity with model_config from_attributes=True
- Config via pydantic-settings BaseSettings loading from .env with sensible defaults
- Alembic async migration pattern with run_async_migrations() wrapper
- UUID primary keys with gen_random_uuid() server default for all entities
observability_surfaces:
- GET /health — returns database connectivity status (connected/error)
- Structured logging via Python logging module in FastAPI lifespan
drill_down_paths:
- .gsd/milestones/M001/slices/S01/tasks/T01-SUMMARY.md
- .gsd/milestones/M001/slices/S01/tasks/T02-SUMMARY.md
- .gsd/milestones/M001/slices/S01/tasks/T03-SUMMARY.md
- .gsd/milestones/M001/slices/S01/tasks/T04-SUMMARY.md
- .gsd/milestones/M001/slices/S01/tasks/T05-SUMMARY.md
duration: ""
verification_result: passed
completed_at: 2026-03-29T22:02:45.503Z
blocker_discovered: false
---
# S01: Docker Compose + Database + Whisper Script
**Delivered deployable Docker Compose infrastructure with PostgreSQL schema (7 tables), FastAPI skeleton API with CRUD endpoints, desktop Whisper transcription script, and sample transcript fixture.**
## What Happened
This slice established the complete foundation for the Chrysopedia stack across five tasks.
**T01 — Docker Compose scaffolding:** Created the xpltd_chrysopedia Docker Compose project following XPLTD conventions (D001): bind mounts at /vmPool/r/services/chrysopedia_*, dedicated bridge network on 172.24.0.0/24, five services (PostgreSQL 16, Redis 7, FastAPI API, Celery worker, React/nginx web). Dockerfiles for API and web, nginx.conf, .env.example with all required vars, and config/canonical_tags.yaml with 6 topic categories and 13 genres.
**T02 — Database schema:** Built SQLAlchemy async models for all 7 entities from the spec: Creator, SourceVideo, TranscriptSegment, KeyMoment, TechniquePage, RelatedTechniqueLink, Tag. UUID primary keys, CASCADE/SET NULL FK constraints, JSONB columns for body_sections/signal_chains, 7 custom PostgreSQL enum types. Alembic async migration infrastructure with initial migration 001_initial.py. Also fixed docker-compose.yml POSTGRES_PASSWORD from `:?` (hard fail) to `:-changeme` default.
**T03 — FastAPI API skeleton:** Rewrote backend/main.py with lifespan manager, CORS, structured logging. Created pydantic-settings config, Pydantic v2 schemas (Base/Create/Read) for all entities, three router modules: health (GET /health with real DB SELECT 1), creators (list with pagination + get by slug), videos (list with optional creator filter). All endpoints async with SQLAlchemy session dependency.
**T04 — Whisper transcription script:** Built whisper/transcribe.py with argparse CLI (--input, --output-dir, --model, --device, --creator), ffmpeg audio extraction to 16kHz mono WAV, Whisper transcription with word-level timestamps, spec-compliant JSON output, resumability (skip if output exists), batch mode for directories, progress logging. Deferred whisper import so --help works without the dependency installed.
**T05 — README and fixtures:** Comprehensive README.md with architecture diagram, setup instructions, env var docs, dev workflow, API endpoint reference. Sample transcript JSON (5 segments, 106 words) with realistic music production content for downstream pipeline testing.
## Verification
All slice-level verification checks passed:
1. `docker compose config` — exits 0, all 5 services validated with correct env interpolation, volumes, networks, healthchecks, and dependency ordering.
2. `python3 whisper/transcribe.py --help` — exits 0, shows full usage with all CLI args and examples.
3. `python3 -c "import json; ..."` on sample_transcript.json — valid JSON with 5 segments, all required keys (source_file, creator_folder, duration_seconds, segments with words).
4. All 7 SQLAlchemy models import successfully with correct entity definitions.
5. All Pydantic schemas and config import successfully.
6. All 3 router modules (health, creators, videos) import with correct route counts.
7. Alembic files (alembic.ini, env.py, 001_initial.py) all present.
8. config/canonical_tags.yaml loads with 6 topic categories.
9. README.md exists with all required sections.
## Requirements Advanced
- R001 — Desktop Whisper script built with all required features: ffmpeg extraction, Whisper large-v3, word-level timestamps, resumability, batch mode, spec-compliant JSON output
- R010 — Docker Compose project created with all 5 services following XPLTD conventions; docker compose config validates successfully
- R011 — Canonical tag system established via config/canonical_tags.yaml with 6 top-level categories and 13 genres; Tag model in database with aliases support
## Requirements Validated
None.
## New Requirements Surfaced
None.
## Requirements Invalidated or Re-scoped
None.
## Deviations
- env_file set to `required: false` to support fresh clones without .env present (T01).
- POSTGRES_PASSWORD changed from `:?` (hard fail when unset) to `:-changeme` default to fix docker compose config validation (T02, captured in KNOWLEDGE.md).
- Added docker/nginx.conf and frontend/package.json placeholders not in original plan but required for Dockerfile.web build context (T01).
- Added backend/routers/videos.py not in T03 expected output list but required by plan's endpoint list (T03).
- Whisper script uses subprocess directly for ffmpeg instead of ffmpeg-python library for reliability (T04).
- Added --creator CLI flag for overriding inferred creator folder name (T04).
## Known Limitations
- Docker Compose stack not tested end-to-end with `docker compose up -d` (requires deployment to ub01 with bind mount paths).
- API endpoints verified locally with test PostgreSQL container, not inside the Docker Compose network.
- Whisper script validated structurally (--help, ffmpeg check, AST parse) but not with actual video transcription (requires CUDA GPU + Whisper model).
- Host port 8000 conflicts with kerf-engine container — local testing uses port 8001 (documented in KNOWLEDGE.md).
## Follow-ups
None.
## Files Created/Modified
- `docker-compose.yml` — Docker Compose project with 5 services (PostgreSQL 16, Redis 7, FastAPI API, Celery worker, React/nginx web)
- `.env.example` — Template with all required environment variables and descriptions
- `docker/Dockerfile.api` — Multi-stage Dockerfile for FastAPI + Celery worker service
- `docker/Dockerfile.web` — Dockerfile for React app served via nginx
- `docker/nginx.conf` — Nginx config for serving React SPA with API proxy
- `backend/main.py` — FastAPI app with lifespan, CORS, structured logging, router mounting
- `backend/models.py` — SQLAlchemy async models for all 7 entities with enums, FKs, JSONB
- `backend/database.py` — Async engine, session factory, declarative base
- `backend/schemas.py` — Pydantic v2 schemas (Base/Create/Read) for all entities
- `backend/config.py` — pydantic-settings config loading from .env
- `backend/routers/health.py` — GET /health with DB connectivity check
- `backend/routers/creators.py` — GET /api/v1/creators (paginated), GET /api/v1/creators/{slug}
- `backend/routers/videos.py` — GET /api/v1/videos (paginated, optional creator filter)
- `backend/requirements.txt` — Python dependencies for FastAPI, SQLAlchemy, asyncpg, etc.
- `alembic.ini` — Alembic configuration pointing to async database URL
- `alembic/env.py` — Async Alembic migration runner
- `alembic/versions/001_initial.py` — Initial migration creating all 7 tables with constraints
- `alembic/script.py.mako` — Alembic migration template
- `whisper/transcribe.py` — Desktop Whisper transcription script with CLI, batch mode, resumability
- `whisper/requirements.txt` — Whisper script Python dependencies
- `whisper/README.md` — Whisper script usage documentation
- `config/canonical_tags.yaml` — 6 topic categories and 13 genres for tag classification
- `README.md` — Project README with architecture, setup, env vars, dev workflow
- `tests/fixtures/sample_transcript.json` — 5-segment sample transcript matching Whisper output format
- `frontend/package.json` — Placeholder React app package.json

View file

@ -0,0 +1,131 @@
# S01: Docker Compose + Database + Whisper Script — UAT
**Milestone:** M001
**Written:** 2026-03-29T22:02:45.503Z
## UAT: S01 — Docker Compose + Database + Whisper Script
### Preconditions
- Docker and Docker Compose v2 installed
- Python 3.10+ available
- Project cloned to local filesystem
- No .env file required (defaults used)
---
### TC-01: Docker Compose Configuration Validates
**Steps:**
1. Run `docker compose config > /dev/null 2>&1`
2. Check exit code
**Expected:** Exit code 0. All 5 services (chrysopedia-db, chrysopedia-redis, chrysopedia-api, chrysopedia-worker, chrysopedia-web) present in output.
---
### TC-02: Docker Compose Validates Without .env File
**Steps:**
1. Ensure no .env file exists in project root
2. Run `docker compose config > /dev/null 2>&1`
**Expected:** Exit code 0. env_file `required: false` allows validation without .env present. POSTGRES_PASSWORD falls back to default.
---
### TC-03: All 7 SQLAlchemy Models Load
**Steps:**
1. Run `python3 -c "import sys; sys.path.insert(0,'backend'); from models import Creator, SourceVideo, TranscriptSegment, KeyMoment, TechniquePage, RelatedTechniqueLink, Tag; print('OK')"`
**Expected:** Prints "OK" with exit code 0. All 7 entity models importable without errors.
---
### TC-04: Alembic Migration Files Present
**Steps:**
1. Verify `alembic.ini` exists
2. Verify `alembic/env.py` exists
3. Verify `alembic/versions/001_initial.py` exists
**Expected:** All three files present. Migration creates 7 tables matching the data model spec.
---
### TC-05: Pydantic Schemas Load for All Entities
**Steps:**
1. Run `python3 -c "import sys; sys.path.insert(0,'backend'); from schemas import CreatorRead, SourceVideoRead, TranscriptSegmentRead, KeyMomentRead, TechniquePageRead, TagRead, HealthResponse; print('OK')"`
**Expected:** Prints "OK" with exit code 0. All schemas importable.
---
### TC-06: FastAPI Routers Load with Correct Routes
**Steps:**
1. Run `python3 -c "import sys; sys.path.insert(0,'backend'); from routers.health import router as h; from routers.creators import router as c; from routers.videos import router as v; print(f'{len(h.routes)} {len(c.routes)} {len(v.routes)}')"`
**Expected:** Prints "1 2 1" — health has 1 route, creators has 2 (list + get-by-slug), videos has 1 (list).
---
### TC-07: Whisper Script Shows Help
**Steps:**
1. Run `python3 whisper/transcribe.py --help`
**Expected:** Exit code 0. Output shows usage with --input, --output-dir, --model, --device, --creator, -v flags. Includes examples section.
---
### TC-08: Whisper Script Validates ffmpeg Availability
**Steps:**
1. Run `python3 whisper/transcribe.py --input /tmp/nonexistent.mp4 --output-dir /tmp/out`
**Expected:** Exit code 1 with error message about ffmpeg not being found or file not found. Script does not crash with unhandled exception.
---
### TC-09: Sample Transcript JSON Is Valid
**Steps:**
1. Run `python3 -c "import json; d=json.load(open('tests/fixtures/sample_transcript.json')); assert 'source_file' in d; assert 'creator_folder' in d; assert 'duration_seconds' in d; assert len(d['segments'])==5; assert all('words' in s for s in d['segments']); print('PASS')"`
**Expected:** Prints "PASS". JSON has required top-level keys and 5 segments each containing words array with word-level timestamps.
---
### TC-10: Canonical Tags Configuration Valid
**Steps:**
1. Run `python3 -c "import yaml; d=yaml.safe_load(open('config/canonical_tags.yaml')); cats=d.get('categories',d.get('topic_categories',[])); assert len(cats)==6; print('PASS')"`
**Expected:** Prints "PASS". 6 top-level topic categories loaded from YAML.
---
### TC-11: README Covers Required Sections
**Steps:**
1. Verify README.md contains: project overview, architecture diagram, Docker Compose setup instructions, Whisper setup instructions, environment variable documentation, development workflow, API endpoint reference, project structure.
**Expected:** All 8 sections present. README provides sufficient information for a new developer to set up the project.
---
### TC-12: .env.example Documents All Variables
**Steps:**
1. Verify .env.example exists
2. Check it contains at minimum: POSTGRES_PASSWORD, DATABASE_URL, OPENAI_API_BASE, OPENAI_API_KEY, EMBEDDING_MODEL
**Expected:** All required environment variables documented with descriptions.
---
### Edge Cases
### TC-13: Models Handle Column Name Clash
**Steps:**
1. Verify RelatedTechniqueLink model has a `relationship` column (enum type) that doesn't conflict with SQLAlchemy's `relationship()` function.
**Expected:** Model imports without "MappedColumn is not callable" error. Uses `sa_relationship` alias pattern documented in KNOWLEDGE.md.
---
### TC-14: Docker Compose Network Avoids Existing Ranges
**Steps:**
1. Verify docker-compose.yml network subnet is 172.24.0.0/24 (not overlapping 172.16-172.23 or 172.29-172.30).
**Expected:** Network subnet configured to avoid conflicts with existing Docker networks on ub01.

View file

@ -0,0 +1,31 @@
{
"schemaVersion": 1,
"taskId": "T05",
"unitId": "M001/S01/T05",
"timestamp": 1774821641495,
"passed": false,
"discoverySource": "none",
"checks": [],
"retryAttempt": 1,
"maxRetries": 2,
"runtimeErrors": [
{
"source": "bg-shell",
"severity": "crash",
"message": "[chrysopedia-api] exitCode=1",
"blocking": true
},
{
"source": "bg-shell",
"severity": "crash",
"message": "[chrysopedia-api-2] exitCode=1",
"blocking": true
},
{
"source": "bg-shell",
"severity": "crash",
"message": "[chrysopedia-api-3] exitCode=1",
"blocking": true
}
]
}

View file

@ -1,6 +1,115 @@
# S02: Transcript Ingestion API # S02: Transcript Ingestion API
**Goal:** FastAPI endpoints for transcript upload, creator management, and source video tracking **Goal:** POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL
**Demo:** After this: POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL **Demo:** After this: POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL
## Tasks ## Tasks
- [x] **T01: Created POST /api/v1/ingest endpoint that accepts Whisper transcript JSON uploads, auto-detects creators from folder_name, upserts SourceVideo records, bulk-inserts TranscriptSegments, and persists raw JSON to disk** — Create the POST /api/v1/ingest endpoint that accepts a Whisper transcript JSON as multipart UploadFile, parses it, finds-or-creates a Creator record from creator_folder, creates/updates a SourceVideo, bulk-inserts TranscriptSegments, saves the raw JSON to disk, and returns a structured response. Wire the router into main.py.
## Steps
1. Add `python-multipart>=0.0.9` to `backend/requirements.txt` and install it (`pip install python-multipart`).
2. Add `TranscriptIngestResponse` Pydantic schema to `backend/schemas.py` with fields: `video_id: uuid.UUID`, `creator_id: uuid.UUID`, `creator_name: str`, `filename: str`, `segments_stored: int`, `processing_status: str`, `is_reupload: bool`.
3. Create `backend/routers/ingest.py` with an `APIRouter(prefix="/ingest", tags=["ingest"])`. Implement `POST ""` endpoint accepting `file: UploadFile`. Core logic:
- Read and parse JSON from the uploaded file. Validate required top-level keys: `source_file`, `creator_folder`, `duration_seconds`, `segments`.
- Implement a `slugify()` helper: lowercase, replace non-alphanumeric chars with hyphens, strip leading/trailing hyphens, collapse consecutive hyphens.
- Find Creator by `folder_name`. If not found, create one with `name=creator_folder`, `slug=slugify(creator_folder)`, `folder_name=creator_folder`.
- Check for existing SourceVideo by `(creator_id, filename)`. If found, delete old TranscriptSegments for that video and update the SourceVideo record (upsert). If not found, create new SourceVideo with `content_type="tutorial"`, `processing_status="transcribed"`.
- Bulk-insert TranscriptSegment rows from `segments` array. Map: `start``start_time`, `end``end_time`, `text``text`, array index → `segment_index`.
- Save raw JSON to `{transcript_storage_path}/{creator_folder}/{source_file}.json`. Create parent directories with `os.makedirs(..., exist_ok=True)`.
- Set SourceVideo `transcript_path` to the saved file path.
- Commit the transaction. Return `TranscriptIngestResponse`.
4. Add structured logging: log at INFO level on successful ingest with creator name, filename, segment count.
5. Import and mount the ingest router in `backend/main.py`: `from routers import ingest` and `app.include_router(ingest.router, prefix="/api/v1")`.
6. Verify: `cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])"` prints the ingest route.
## Must-Haves
- [ ] `python-multipart` in requirements.txt
- [ ] `TranscriptIngestResponse` schema in schemas.py
- [ ] `POST /api/v1/ingest` endpoint in ingest.py
- [ ] Creator find-or-create by folder_name with slugify
- [ ] SourceVideo upsert by (creator_id, filename)
- [ ] TranscriptSegment bulk insert with segment_index
- [ ] Raw JSON saved to transcript_storage_path
- [ ] Router mounted in main.py
- [ ] Structured logging on successful ingest
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| UploadFile read | Return 400 with 'Invalid file' message | N/A (local read) | Return 422 with JSON parse error details |
| PostgreSQL | Transaction rollback, return 500 | Return 500 with timeout message | N/A |
| Filesystem write | Return 500 with 'Failed to save transcript' | N/A | N/A |
## Negative Tests
- **Malformed inputs**: Non-JSON file upload, JSON missing `segments` key, JSON missing `creator_folder`, empty segments array
- **Error paths**: Invalid JSON syntax, file system permission error
- **Boundary conditions**: Re-upload same file (idempotency), very long creator_folder name, special characters in source_file name
## Verification
- `cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])"` outputs `['/ingest']`
- `cd backend && python3 -c "from schemas import TranscriptIngestResponse; print(TranscriptIngestResponse.model_fields.keys())"` outputs the expected fields
- `grep -q 'python-multipart' backend/requirements.txt` exits 0
- `grep -q 'ingest' backend/main.py` exits 0
- Estimate: 45m
- Files: backend/requirements.txt, backend/schemas.py, backend/routers/ingest.py, backend/main.py
- Verify: cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])" && python3 -c "from schemas import TranscriptIngestResponse; print(TranscriptIngestResponse.model_fields.keys())" && grep -q 'python-multipart' requirements.txt && grep -q 'ingest' main.py
- [ ] **T02: Write integration tests proving ingestion, creator auto-detection, and idempotent re-upload** — Set up pytest + pytest-asyncio test infrastructure and write integration tests for the ingest endpoint. Tests run against a real PostgreSQL database using httpx.AsyncClient on the FastAPI app.
## Steps
1. Add test dependencies to `backend/requirements.txt`: `pytest>=8.0`, `pytest-asyncio>=0.24`, `python-multipart>=0.0.9` (if not already present).
2. Install: `cd backend && pip install pytest pytest-asyncio`.
3. Create `tests/conftest.py` with:
- Import `create_async_engine`, `async_sessionmaker` from SQLAlchemy.
- Create a test database URL fixture using env var `TEST_DATABASE_URL` with default `postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test`.
- `@pytest_asyncio.fixture` for `db_engine` that creates all tables via `Base.metadata.create_all` at setup and drops them at teardown (use `run_sync`).
- `@pytest_asyncio.fixture` for `db_session` that yields an AsyncSession.
- `@pytest_asyncio.fixture` for `client` that patches `get_session` dependency override on the FastAPI app and yields an `httpx.AsyncClient` with `ASGITransport`.
- `@pytest.fixture` for `sample_transcript_path` returning `tests/fixtures/sample_transcript.json`.
- `@pytest.fixture` for `tmp_transcript_dir` using `tmp_path` to override `transcript_storage_path`.
4. Create `tests/test_ingest.py` with `@pytest.mark.asyncio` tests:
- `test_ingest_creates_creator_and_video`: POST sample_transcript.json → 200, response has video_id, creator_id, segments_stored=5, creator_name='Skope'. Query DB to confirm Creator with folder_name='Skope' and slug='skope' exists. Confirm SourceVideo with processing_status='transcribed' exists. Confirm 5 TranscriptSegment rows with segment_index 0-4.
- `test_ingest_reuses_existing_creator`: Pre-create a Creator with folder_name='Skope'. POST transcript → response creator_id matches pre-created ID. Only 1 Creator row in DB.
- `test_ingest_idempotent_reupload`: POST same transcript twice → second returns is_reupload=True, same video_id. Still only 5 segments (not 10). Only 1 SourceVideo row.
- `test_ingest_saves_json_to_disk`: POST transcript → raw JSON file exists at the expected path in tmp_transcript_dir.
- `test_ingest_rejects_invalid_json`: POST a file with invalid JSON → 400/422 error.
- `test_ingest_rejects_missing_fields`: POST JSON without `creator_folder` → 400/422 error.
5. Add a `pytest.ini` or `pyproject.toml` section with `asyncio_mode = "auto"` to avoid per-test markers.
## Must-Haves
- [ ] pytest and pytest-asyncio installed and configured
- [ ] conftest.py with async DB fixtures and ASGI test client
- [ ] test_ingest_creates_creator_and_video passes
- [ ] test_ingest_reuses_existing_creator passes
- [ ] test_ingest_idempotent_reupload passes
- [ ] test_ingest_saves_json_to_disk passes
- [ ] test_ingest_rejects_invalid_json passes
- [ ] test_ingest_rejects_missing_fields passes
- [ ] All tests pass: `cd backend && python -m pytest tests/test_ingest.py -v`
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| PostgreSQL (test DB) | Tests skip or fail with clear connection error | pytest-asyncio timeout | N/A |
| FastAPI test client | Test fails with assertion error | httpx timeout | Response schema mismatch caught by assertions |
## Verification
- `cd backend && python -m pytest tests/test_ingest.py -v` — all 6 tests pass
- `docker compose config` exits 0 (no regressions)
## Observability Impact
- Signals added/changed: test output showing pass/fail per test case with timing
- How a future agent inspects this: `cd backend && python -m pytest tests/test_ingest.py -v --tb=short`
- Failure state exposed: pytest output shows which assertion failed, with full diff
- Estimate: 45m
- Files: backend/requirements.txt, tests/conftest.py, tests/test_ingest.py, pytest.ini
- Verify: cd backend && python -m pytest tests/test_ingest.py -v && docker compose config > /dev/null 2>&1

View file

@ -0,0 +1,114 @@
# S02 — Transcript Ingestion API — Research
**Date:** 2026-03-29
## Summary
S02 adds a single FastAPI endpoint: `POST /api/v1/ingest` that accepts a Whisper transcript JSON file, creates/finds the Creator record (auto-detected from `creator_folder`), creates a SourceVideo record, stores TranscriptSegment rows, saves the JSON file to the filesystem, and sets `processing_status = "transcribed"`. This is straightforward CRUD using established patterns from S01 — async SQLAlchemy sessions, Pydantic v2 schemas, and the router-per-domain convention.
The main design decisions are: (1) accept the file as `UploadFile` (multipart) or as a JSON body — `UploadFile` is better since the transcript files will be multi-MB and the API should also save the raw JSON to disk; (2) auto-create Creator records from `creator_folder` with slugified names; (3) handle idempotency for re-uploads of the same video (upsert by filename+creator).
R002 is the primary requirement. R012 (incremental content addition) is partially addressed — new creators are auto-detected and existing creator records are reused.
## Recommendation
Build one new router (`backend/routers/ingest.py`) with a single `POST /api/v1/ingest` endpoint that accepts `UploadFile`. It parses the JSON, finds-or-creates a Creator (by `folder_name`), creates a SourceVideo, bulk-inserts TranscriptSegments, saves the raw file to `transcript_storage_path`, and returns the created SourceVideo with segment count. Use a single DB transaction for atomicity. Add `python-multipart` to requirements.txt for file upload support.
## Implementation Landscape
### Key Files
- `backend/routers/ingest.py`**New file.** The ingestion endpoint. Follows the pattern from `backend/routers/creators.py`: `APIRouter` with prefix, `Depends(get_session)`, structured logging. Core logic: parse JSON → find/create Creator → create SourceVideo → bulk insert TranscriptSegments → save file to disk → return response.
- `backend/schemas.py`**Modify.** Add `TranscriptIngestResponse` schema (returning created video ID, creator ID, segment count, processing status). Add `TranscriptSegmentWord` schema if we want to validate the word-level data (optional — the words array is in the JSON but not stored in the DB per current schema).
- `backend/main.py`**Modify.** Import and mount the `ingest` router under `/api/v1`.
- `backend/requirements.txt`**Modify.** Add `python-multipart>=0.0.9` (required by FastAPI for `UploadFile`).
- `tests/test_ingest.py`**New file.** Integration test using `httpx.AsyncClient` against the FastAPI app with a test PostgreSQL database. Uses `tests/fixtures/sample_transcript.json` as test input.
### Existing Infrastructure (no changes needed)
- `backend/models.py` — Creator, SourceVideo, TranscriptSegment models are already defined with correct columns and relationships. `Creator.folder_name` exists for matching. `SourceVideo.transcript_path` exists for storing the filesystem path. `TranscriptSegment.segment_index` exists for ordering.
- `backend/database.py``get_session` async dependency already exists and works.
- `backend/config.py``transcript_storage_path` setting already exists (defaults to `/data/transcripts`).
- `tests/fixtures/sample_transcript.json` — Sample transcript with 5 segments in the exact Whisper output format: `{source_file, creator_folder, duration_seconds, segments: [{start, end, text, words: [{word, start, end}]}]}`.
### Transcript JSON Shape (from Whisper script output)
```
{
"source_file": "Skope — Sound Design Masterclass pt1.mp4",
"creator_folder": "Skope",
"duration_seconds": 3847,
"segments": [
{
"start": 0.0,
"end": 4.52,
"text": "Hey everyone welcome back...",
"words": [{"word": "Hey", "start": 0.0, "end": 0.28}, ...]
}
]
}
```
### Creator Auto-Detection Logic
1. Extract `creator_folder` from transcript JSON (e.g., `"Skope"`)
2. `SELECT * FROM creators WHERE folder_name = :folder_name`
3. If not found: create with `name = creator_folder`, `slug = slugify(creator_folder)`, `folder_name = creator_folder`
4. If found: reuse existing creator ID
Slug generation: lowercase, replace spaces/special chars with hyphens. Use a simple function — no need for `python-slugify` dependency; a 3-line regex approach suffices for folder names.
### SourceVideo Content Type
The transcript JSON does NOT include a `content_type` field. The `SourceVideo.content_type` column is NOT NULL with enum values `tutorial|livestream|breakdown|short_form`. Options:
- **Recommended:** Default to `"tutorial"` at ingestion; allow optional override via query param or form field. The LLM pipeline (S03) can reclassify later.
- Alternative: Make the column nullable (requires migration). Not worth it — defaulting is simpler.
### Idempotency
To support re-uploading the same transcript (R012 incremental):
- Check for existing `SourceVideo` by `(creator_id, filename)` before creating
- If exists: delete old TranscriptSegments, update the SourceVideo record, re-insert segments
- This makes the endpoint idempotent — uploading the same file twice produces the same result
### Build Order
1. **Add `python-multipart` to requirements.txt** — unblocks UploadFile usage
2. **Add response schema to `schemas.py`** — small change, defines the API contract
3. **Create `backend/routers/ingest.py`** — the main work: endpoint, creator auto-detection, segment insertion, file storage
4. **Mount router in `main.py`** — one import + one `include_router` line
5. **Write integration test** — verify with sample_transcript.json against real DB
Steps 1-2 are independent. Step 3 is the bulk of work. Steps 4-5 depend on 3.
### Verification Approach
1. **Unit verification:** `python3 -c "from routers import ingest; print(ingest.router.routes)"` — confirms router imports and routes are registered.
2. **Integration test:** Start a test PostgreSQL container, run the FastAPI app with `httpx.AsyncClient`, POST the sample transcript, assert:
- Response 200 with `video_id`, `creator_id`, `segments_stored` count
- Creator record exists in DB with `folder_name = "Skope"` and `slug = "skope"`
- SourceVideo record exists with `filename = "Skope — Sound Design Masterclass pt1.mp4"`, `processing_status = "transcribed"`
- 5 TranscriptSegment rows exist with correct `segment_index` ordering
- JSON file saved to configured `transcript_storage_path`
3. **Idempotency test:** POST the same file twice → no duplicate records, same video ID returned
4. **Docker compose config still validates:** `docker compose config` exits 0
## Constraints
- `python-multipart` must be added to `backend/requirements.txt` — FastAPI raises `RuntimeError` without it when using `UploadFile`.
- `SourceVideo.content_type` is NOT NULL with an enum constraint — ingestion must provide a valid value. Default to `"tutorial"`.
- Filesystem write to `transcript_storage_path` — in Docker this maps to `/vmPool/r/services/chrysopedia_data/transcripts`. For local testing, use a temp directory or `./data/transcripts`.
- Creator `slug` must be unique — the slugify function must produce deterministic, URL-safe slugs from folder names.
## Common Pitfalls
- **Missing `python-multipart`** — FastAPI silently accepts `UploadFile` in type hints but raises at runtime. Must be in requirements.txt.
- **Async file I/O**`UploadFile.read()` is async but filesystem writes with `open()` are blocking. For transcript JSONs (typically <10MB), blocking writes in async handlers are acceptable. If concerned, use `aiofiles` but it's not necessary at this scale.
- **Transaction scope** — Creator creation + SourceVideo creation + segment bulk insert should be in ONE transaction. If segment insertion fails, don't leave orphaned Creator/Video records. Use `session.begin()` or rely on the session's default transaction + `await session.commit()` at the end.
## Skills Discovered
| Technology | Skill | Status |
|------------|-------|--------|
| FastAPI | wshobson/agents@fastapi-templates | available (9.6K installs) — general FastAPI patterns |
| FastAPI | mindrally/skills@fastapi-python | available (4.3K installs) — Python FastAPI patterns |

View file

@ -0,0 +1,80 @@
---
estimated_steps: 41
estimated_files: 4
skills_used: []
---
# T01: Build transcript ingestion endpoint with creator auto-detection and idempotent upsert
Create the POST /api/v1/ingest endpoint that accepts a Whisper transcript JSON as multipart UploadFile, parses it, finds-or-creates a Creator record from creator_folder, creates/updates a SourceVideo, bulk-inserts TranscriptSegments, saves the raw JSON to disk, and returns a structured response. Wire the router into main.py.
## Steps
1. Add `python-multipart>=0.0.9` to `backend/requirements.txt` and install it (`pip install python-multipart`).
2. Add `TranscriptIngestResponse` Pydantic schema to `backend/schemas.py` with fields: `video_id: uuid.UUID`, `creator_id: uuid.UUID`, `creator_name: str`, `filename: str`, `segments_stored: int`, `processing_status: str`, `is_reupload: bool`.
3. Create `backend/routers/ingest.py` with an `APIRouter(prefix="/ingest", tags=["ingest"])`. Implement `POST ""` endpoint accepting `file: UploadFile`. Core logic:
- Read and parse JSON from the uploaded file. Validate required top-level keys: `source_file`, `creator_folder`, `duration_seconds`, `segments`.
- Implement a `slugify()` helper: lowercase, replace non-alphanumeric chars with hyphens, strip leading/trailing hyphens, collapse consecutive hyphens.
- Find Creator by `folder_name`. If not found, create one with `name=creator_folder`, `slug=slugify(creator_folder)`, `folder_name=creator_folder`.
- Check for existing SourceVideo by `(creator_id, filename)`. If found, delete old TranscriptSegments for that video and update the SourceVideo record (upsert). If not found, create new SourceVideo with `content_type="tutorial"`, `processing_status="transcribed"`.
- Bulk-insert TranscriptSegment rows from `segments` array. Map: `start``start_time`, `end``end_time`, `text``text`, array index → `segment_index`.
- Save raw JSON to `{transcript_storage_path}/{creator_folder}/{source_file}.json`. Create parent directories with `os.makedirs(..., exist_ok=True)`.
- Set SourceVideo `transcript_path` to the saved file path.
- Commit the transaction. Return `TranscriptIngestResponse`.
4. Add structured logging: log at INFO level on successful ingest with creator name, filename, segment count.
5. Import and mount the ingest router in `backend/main.py`: `from routers import ingest` and `app.include_router(ingest.router, prefix="/api/v1")`.
6. Verify: `cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])"` prints the ingest route.
## Must-Haves
- [ ] `python-multipart` in requirements.txt
- [ ] `TranscriptIngestResponse` schema in schemas.py
- [ ] `POST /api/v1/ingest` endpoint in ingest.py
- [ ] Creator find-or-create by folder_name with slugify
- [ ] SourceVideo upsert by (creator_id, filename)
- [ ] TranscriptSegment bulk insert with segment_index
- [ ] Raw JSON saved to transcript_storage_path
- [ ] Router mounted in main.py
- [ ] Structured logging on successful ingest
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| UploadFile read | Return 400 with 'Invalid file' message | N/A (local read) | Return 422 with JSON parse error details |
| PostgreSQL | Transaction rollback, return 500 | Return 500 with timeout message | N/A |
| Filesystem write | Return 500 with 'Failed to save transcript' | N/A | N/A |
## Negative Tests
- **Malformed inputs**: Non-JSON file upload, JSON missing `segments` key, JSON missing `creator_folder`, empty segments array
- **Error paths**: Invalid JSON syntax, file system permission error
- **Boundary conditions**: Re-upload same file (idempotency), very long creator_folder name, special characters in source_file name
## Verification
- `cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])"` outputs `['/ingest']`
- `cd backend && python3 -c "from schemas import TranscriptIngestResponse; print(TranscriptIngestResponse.model_fields.keys())"` outputs the expected fields
- `grep -q 'python-multipart' backend/requirements.txt` exits 0
- `grep -q 'ingest' backend/main.py` exits 0
## Inputs
- ``backend/models.py` — Creator, SourceVideo, TranscriptSegment ORM models with column definitions`
- ``backend/database.py` — get_session async dependency for DB access`
- ``backend/config.py` — Settings with transcript_storage_path`
- ``backend/schemas.py` — existing Pydantic schema patterns (Base/Create/Read convention)`
- ``backend/routers/creators.py` — existing router pattern to follow (APIRouter, Depends, logging)`
- ``backend/main.py` — existing router mounting pattern`
- ``tests/fixtures/sample_transcript.json` — reference for expected JSON shape`
## Expected Output
- ``backend/routers/ingest.py` — new ingestion endpoint router with POST handler`
- ``backend/schemas.py` — modified with TranscriptIngestResponse schema`
- ``backend/requirements.txt` — modified with python-multipart dependency`
- ``backend/main.py` — modified to mount ingest router`
## Verification
cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])" && python3 -c "from schemas import TranscriptIngestResponse; print(TranscriptIngestResponse.model_fields.keys())" && grep -q 'python-multipart' requirements.txt && grep -q 'ingest' main.py

View file

@ -0,0 +1,85 @@
---
id: T01
parent: S02
milestone: M001
provides: []
requires: []
affects: []
key_files: ["backend/routers/ingest.py", "backend/schemas.py", "backend/requirements.txt", "backend/main.py"]
key_decisions: ["Used slugify helper inline in ingest.py rather than a shared utils module since it's only needed here for now", "Set file_path to {creator_folder}/{source_file} for new SourceVideo records"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All four slice-level verification checks pass: (1) router.routes outputs ['/ingest'], (2) TranscriptIngestResponse.model_fields.keys() shows all 7 expected fields, (3) grep finds python-multipart in requirements.txt, (4) grep finds ingest in main.py. Additionally confirmed /api/v1/ingest appears in app.routes via full app import."
completed_at: 2026-03-29T22:09:40.299Z
blocker_discovered: false
---
# T01: Created POST /api/v1/ingest endpoint that accepts Whisper transcript JSON uploads, auto-detects creators from folder_name, upserts SourceVideo records, bulk-inserts TranscriptSegments, and persists raw JSON to disk
> Created POST /api/v1/ingest endpoint that accepts Whisper transcript JSON uploads, auto-detects creators from folder_name, upserts SourceVideo records, bulk-inserts TranscriptSegments, and persists raw JSON to disk
## What Happened
---
id: T01
parent: S02
milestone: M001
key_files:
- backend/routers/ingest.py
- backend/schemas.py
- backend/requirements.txt
- backend/main.py
key_decisions:
- Used slugify helper inline in ingest.py rather than a shared utils module since it's only needed here for now
- Set file_path to {creator_folder}/{source_file} for new SourceVideo records
duration: ""
verification_result: passed
completed_at: 2026-03-29T22:09:40.300Z
blocker_discovered: false
---
# T01: Created POST /api/v1/ingest endpoint that accepts Whisper transcript JSON uploads, auto-detects creators from folder_name, upserts SourceVideo records, bulk-inserts TranscriptSegments, and persists raw JSON to disk
**Created POST /api/v1/ingest endpoint that accepts Whisper transcript JSON uploads, auto-detects creators from folder_name, upserts SourceVideo records, bulk-inserts TranscriptSegments, and persists raw JSON to disk**
## What Happened
Added python-multipart to requirements.txt. Created TranscriptIngestResponse Pydantic schema in schemas.py with all seven specified fields. Built backend/routers/ingest.py with a POST endpoint that reads and validates uploaded JSON, finds-or-creates a Creator by folder_name using a slugify helper, upserts SourceVideo by (creator_id, filename) with old segment deletion on re-upload, bulk-inserts TranscriptSegments with segment_index, saves raw JSON to transcript_storage_path, and returns a structured response. Mounted the ingest router in main.py under /api/v1. Error handling covers all three failure mode dependencies: 400 for unreadable files, 422 for JSON parse/validation, 500 for DB and filesystem failures.
## Verification
All four slice-level verification checks pass: (1) router.routes outputs ['/ingest'], (2) TranscriptIngestResponse.model_fields.keys() shows all 7 expected fields, (3) grep finds python-multipart in requirements.txt, (4) grep finds ingest in main.py. Additionally confirmed /api/v1/ingest appears in app.routes via full app import.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd backend && python3 -c "from routers.ingest import router; print([r.path for r in router.routes])"` | 0 | ✅ pass | 1000ms |
| 2 | `cd backend && python3 -c "from schemas import TranscriptIngestResponse; print(TranscriptIngestResponse.model_fields.keys())"` | 0 | ✅ pass | 1000ms |
| 3 | `grep -q 'python-multipart' backend/requirements.txt` | 0 | ✅ pass | 100ms |
| 4 | `grep -q 'ingest' backend/main.py` | 0 | ✅ pass | 100ms |
| 5 | `cd backend && python3 -c "from main import app; print([r.path for r in app.routes if 'ingest' in r.path])"` | 0 | ✅ pass | 1000ms |
## Deviations
None.
## Known Issues
None.
## Files Created/Modified
- `backend/routers/ingest.py`
- `backend/schemas.py`
- `backend/requirements.txt`
- `backend/main.py`
## Deviations
None.
## Known Issues
None.

View file

@ -0,0 +1,81 @@
---
estimated_steps: 42
estimated_files: 4
skills_used: []
---
# T02: Write integration tests proving ingestion, creator auto-detection, and idempotent re-upload
Set up pytest + pytest-asyncio test infrastructure and write integration tests for the ingest endpoint. Tests run against a real PostgreSQL database using httpx.AsyncClient on the FastAPI app.
## Steps
1. Add test dependencies to `backend/requirements.txt`: `pytest>=8.0`, `pytest-asyncio>=0.24`, `python-multipart>=0.0.9` (if not already present).
2. Install: `cd backend && pip install pytest pytest-asyncio`.
3. Create `tests/conftest.py` with:
- Import `create_async_engine`, `async_sessionmaker` from SQLAlchemy.
- Create a test database URL fixture using env var `TEST_DATABASE_URL` with default `postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test`.
- `@pytest_asyncio.fixture` for `db_engine` that creates all tables via `Base.metadata.create_all` at setup and drops them at teardown (use `run_sync`).
- `@pytest_asyncio.fixture` for `db_session` that yields an AsyncSession.
- `@pytest_asyncio.fixture` for `client` that patches `get_session` dependency override on the FastAPI app and yields an `httpx.AsyncClient` with `ASGITransport`.
- `@pytest.fixture` for `sample_transcript_path` returning `tests/fixtures/sample_transcript.json`.
- `@pytest.fixture` for `tmp_transcript_dir` using `tmp_path` to override `transcript_storage_path`.
4. Create `tests/test_ingest.py` with `@pytest.mark.asyncio` tests:
- `test_ingest_creates_creator_and_video`: POST sample_transcript.json → 200, response has video_id, creator_id, segments_stored=5, creator_name='Skope'. Query DB to confirm Creator with folder_name='Skope' and slug='skope' exists. Confirm SourceVideo with processing_status='transcribed' exists. Confirm 5 TranscriptSegment rows with segment_index 0-4.
- `test_ingest_reuses_existing_creator`: Pre-create a Creator with folder_name='Skope'. POST transcript → response creator_id matches pre-created ID. Only 1 Creator row in DB.
- `test_ingest_idempotent_reupload`: POST same transcript twice → second returns is_reupload=True, same video_id. Still only 5 segments (not 10). Only 1 SourceVideo row.
- `test_ingest_saves_json_to_disk`: POST transcript → raw JSON file exists at the expected path in tmp_transcript_dir.
- `test_ingest_rejects_invalid_json`: POST a file with invalid JSON → 400/422 error.
- `test_ingest_rejects_missing_fields`: POST JSON without `creator_folder` → 400/422 error.
5. Add a `pytest.ini` or `pyproject.toml` section with `asyncio_mode = "auto"` to avoid per-test markers.
## Must-Haves
- [ ] pytest and pytest-asyncio installed and configured
- [ ] conftest.py with async DB fixtures and ASGI test client
- [ ] test_ingest_creates_creator_and_video passes
- [ ] test_ingest_reuses_existing_creator passes
- [ ] test_ingest_idempotent_reupload passes
- [ ] test_ingest_saves_json_to_disk passes
- [ ] test_ingest_rejects_invalid_json passes
- [ ] test_ingest_rejects_missing_fields passes
- [ ] All tests pass: `cd backend && python -m pytest tests/test_ingest.py -v`
## Failure Modes
| Dependency | On error | On timeout | On malformed response |
|------------|----------|-----------|----------------------|
| PostgreSQL (test DB) | Tests skip or fail with clear connection error | pytest-asyncio timeout | N/A |
| FastAPI test client | Test fails with assertion error | httpx timeout | Response schema mismatch caught by assertions |
## Verification
- `cd backend && python -m pytest tests/test_ingest.py -v` — all 6 tests pass
- `docker compose config` exits 0 (no regressions)
## Observability Impact
- Signals added/changed: test output showing pass/fail per test case with timing
- How a future agent inspects this: `cd backend && python -m pytest tests/test_ingest.py -v --tb=short`
- Failure state exposed: pytest output shows which assertion failed, with full diff
## Inputs
- ``backend/routers/ingest.py` — the ingestion endpoint to test (created in T01)`
- ``backend/main.py` — FastAPI app with ingest router mounted (modified in T01)`
- ``backend/schemas.py` — TranscriptIngestResponse schema (modified in T01)`
- ``backend/models.py` — Creator, SourceVideo, TranscriptSegment models for DB assertions`
- ``backend/database.py` — get_session dependency to override in tests`
- ``backend/config.py` — Settings to override transcript_storage_path in tests`
- ``tests/fixtures/sample_transcript.json` — test input file with 5 segments`
## Expected Output
- ``tests/conftest.py` — pytest fixtures for async DB, test client, and transcript paths`
- ``tests/test_ingest.py` — 6 integration tests covering happy path, idempotency, and error cases`
- ``pytest.ini` — pytest configuration with asyncio_mode=auto`
- ``backend/requirements.txt` — modified with pytest and pytest-asyncio dependencies`
## Verification
cd backend && python -m pytest tests/test_ingest.py -v && docker compose config > /dev/null 2>&1

View file

@ -12,7 +12,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from config import get_settings from config import get_settings
from routers import creators, health, videos from routers import creators, health, ingest, videos
def _setup_logging() -> None: def _setup_logging() -> None:
@ -79,6 +79,7 @@ app.include_router(health.router)
# Versioned API # Versioned API
app.include_router(creators.router, prefix="/api/v1") app.include_router(creators.router, prefix="/api/v1")
app.include_router(ingest.router, prefix="/api/v1")
app.include_router(videos.router, prefix="/api/v1") app.include_router(videos.router, prefix="/api/v1")

View file

@ -8,4 +8,5 @@ pydantic-settings>=2.0,<3.0
celery[redis]>=5.4.0,<6.0 celery[redis]>=5.4.0,<6.0
redis>=5.0,<6.0 redis>=5.0,<6.0
python-dotenv>=1.0,<2.0 python-dotenv>=1.0,<2.0
python-multipart>=0.0.9,<1.0
httpx>=0.27.0,<1.0 httpx>=0.27.0,<1.0

193
backend/routers/ingest.py Normal file
View file

@ -0,0 +1,193 @@
"""Transcript ingestion endpoint for the Chrysopedia API.
Accepts a Whisper-format transcript JSON via multipart file upload, finds or
creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments,
persists the raw JSON to disk, and returns a structured response.
"""
import json
import logging
import os
import re
import uuid
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import get_settings
from database import get_session
from models import ContentType, Creator, ProcessingStatus, SourceVideo, TranscriptSegment
from schemas import TranscriptIngestResponse
logger = logging.getLogger("chrysopedia.ingest")
router = APIRouter(prefix="/ingest", tags=["ingest"])
REQUIRED_KEYS = {"source_file", "creator_folder", "duration_seconds", "segments"}
def slugify(value: str) -> str:
"""Lowercase, replace non-alphanumeric chars with hyphens, collapse/strip."""
value = value.lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = value.strip("-")
value = re.sub(r"-{2,}", "-", value)
return value
@router.post("", response_model=TranscriptIngestResponse)
async def ingest_transcript(
file: UploadFile,
db: AsyncSession = Depends(get_session),
) -> TranscriptIngestResponse:
"""Ingest a Whisper transcript JSON file.
Workflow:
1. Parse and validate the uploaded JSON.
2. Find-or-create a Creator by folder_name.
3. Upsert a SourceVideo by (creator_id, filename).
4. Bulk-insert TranscriptSegment rows.
5. Save raw JSON to transcript_storage_path.
6. Return structured response.
"""
settings = get_settings()
# ── 1. Read & parse JSON ─────────────────────────────────────────────
try:
raw_bytes = await file.read()
raw_text = raw_bytes.decode("utf-8")
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid file: {exc}") from exc
try:
data = json.loads(raw_text)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=422, detail=f"JSON parse error: {exc}"
) from exc
if not isinstance(data, dict):
raise HTTPException(status_code=422, detail="Expected a JSON object at the top level")
missing = REQUIRED_KEYS - data.keys()
if missing:
raise HTTPException(
status_code=422,
detail=f"Missing required keys: {', '.join(sorted(missing))}",
)
source_file: str = data["source_file"]
creator_folder: str = data["creator_folder"]
duration_seconds: int | None = data.get("duration_seconds")
segments_data: list = data["segments"]
if not isinstance(segments_data, list):
raise HTTPException(status_code=422, detail="'segments' must be an array")
# ── 2. Find-or-create Creator ────────────────────────────────────────
stmt = select(Creator).where(Creator.folder_name == creator_folder)
result = await db.execute(stmt)
creator = result.scalar_one_or_none()
if creator is None:
creator = Creator(
name=creator_folder,
slug=slugify(creator_folder),
folder_name=creator_folder,
)
db.add(creator)
await db.flush() # assign id
# ── 3. Upsert SourceVideo ────────────────────────────────────────────
stmt = select(SourceVideo).where(
SourceVideo.creator_id == creator.id,
SourceVideo.filename == source_file,
)
result = await db.execute(stmt)
existing_video = result.scalar_one_or_none()
is_reupload = existing_video is not None
if is_reupload:
video = existing_video
# Delete old segments for idempotent re-upload
await db.execute(
delete(TranscriptSegment).where(
TranscriptSegment.source_video_id == video.id
)
)
video.duration_seconds = duration_seconds
video.processing_status = ProcessingStatus.transcribed
else:
video = SourceVideo(
creator_id=creator.id,
filename=source_file,
file_path=f"{creator_folder}/{source_file}",
duration_seconds=duration_seconds,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.transcribed,
)
db.add(video)
await db.flush() # assign id
# ── 4. Bulk-insert TranscriptSegments ────────────────────────────────
segment_objs = [
TranscriptSegment(
source_video_id=video.id,
start_time=float(seg["start"]),
end_time=float(seg["end"]),
text=str(seg["text"]),
segment_index=idx,
)
for idx, seg in enumerate(segments_data)
]
db.add_all(segment_objs)
# ── 5. Save raw JSON to disk ─────────────────────────────────────────
transcript_dir = os.path.join(
settings.transcript_storage_path, creator_folder
)
transcript_path = os.path.join(transcript_dir, f"{source_file}.json")
try:
os.makedirs(transcript_dir, exist_ok=True)
with open(transcript_path, "w", encoding="utf-8") as f:
f.write(raw_text)
except OSError as exc:
raise HTTPException(
status_code=500, detail=f"Failed to save transcript: {exc}"
) from exc
video.transcript_path = transcript_path
# ── 6. Commit & respond ──────────────────────────────────────────────
try:
await db.commit()
except Exception as exc:
await db.rollback()
logger.error("Database commit failed during ingest: %s", exc)
raise HTTPException(
status_code=500, detail="Database error during ingest"
) from exc
await db.refresh(video)
await db.refresh(creator)
logger.info(
"Ingested transcript: creator=%s, file=%s, segments=%d, reupload=%s",
creator.name,
source_file,
len(segment_objs),
is_reupload,
)
return TranscriptIngestResponse(
video_id=video.id,
creator_id=creator.id,
creator_name=creator.name,
filename=source_file,
segments_stored=len(segment_objs),
processing_status=video.processing_status.value,
is_reupload=is_reupload,
)

View file

@ -173,6 +173,19 @@ class TagRead(TagBase):
id: uuid.UUID id: uuid.UUID
# ── Transcript Ingestion ─────────────────────────────────────────────────────
class TranscriptIngestResponse(BaseModel):
"""Response returned after successfully ingesting a transcript."""
video_id: uuid.UUID
creator_id: uuid.UUID
creator_name: str
filename: str
segments_stored: int
processing_status: str
is_reupload: bool
# ── Pagination wrapper ─────────────────────────────────────────────────────── # ── Pagination wrapper ───────────────────────────────────────────────────────
class PaginatedResponse(BaseModel): class PaginatedResponse(BaseModel):