chore: Extended Settings with 12 LLM/embedding/Qdrant config fields, cr…
- "backend/config.py" - "backend/worker.py" - "backend/pipeline/schemas.py" - "backend/pipeline/llm_client.py" - "backend/requirements.txt" - "backend/pipeline/__init__.py" - "backend/pipeline/stages.py" GSD-Task: S03/T01
This commit is contained in:
parent
bef8d95e64
commit
12cc86aef9
20 changed files with 1434 additions and 3 deletions
|
|
@ -6,4 +6,8 @@
|
|||
|
||||
| # | When | Scope | Decision | Choice | Rationale | Revisable? | Made By |
|
||||
|---|------|-------|----------|--------|-----------|------------|---------|
|
||||
| D001 | | architecture | Docker Compose project naming and path conventions | xpltd_chrysopedia with bind mounts at /vmPool/r/services/chrysopedia_*, compose at /vmPool/r/compose/chrysopedia/ | XPLTD lore: compose projects at /vmPool/r/compose/{name}/, service data at /vmPool/r/services/{service}_{role}/, project naming follows xpltd_{name} pattern. Network will be a dedicated bridge subnet avoiding existing 172.16-172.23 and 172.29-172.30 ranges. | Yes | agent |
|
||||
| D001 | | architecture | Storage layer selection | PostgreSQL for structured data, Qdrant (existing instance at 10.0.0.10) for vector embeddings, local filesystem for transcript JSON files | Spec specifies PostgreSQL for JSONB support and Qdrant already running on hypervisor. File store on local filesystem organized by creator slug. Qdrant gets a dedicated collection, not a new instance. | Yes | collaborative |
|
||||
| D002 | | architecture | Timestamp handling for asyncpg with TIMESTAMP WITHOUT TIME ZONE columns | Use datetime.now(timezone.utc).replace(tzinfo=None) in _now() helper | asyncpg rejects timezone-aware datetimes for TIMESTAMP WITHOUT TIME ZONE columns. Stripping tzinfo at the application layer preserves UTC semantics while staying compatible with the existing schema. | Yes | agent |
|
||||
| D003 | | requirement | R002 Transcript Ingestion API status | validated | 6 passing integration tests prove the full POST /api/v1/ingest flow: creator auto-detection, SourceVideo upsert, TranscriptSegment bulk insert, raw JSON persistence, idempotent re-upload, and invalid input rejection. | Yes | agent |
|
||||
| D004 | | architecture | Sync vs async approach for Celery worker pipeline tasks | Use sync openai.OpenAI, sync QdrantClient, and sync SQLAlchemy (create_engine with psycopg2) inside Celery tasks. Convert DATABASE_URL from postgresql+asyncpg:// to postgresql:// for the sync engine. | Celery workers run in a synchronous context. Using asyncio.run() inside tasks risks nested event loop errors with gevent/eventlet workers. Using sync clients throughout eliminates this class of bug entirely. The async engine/session from database.py is only used by FastAPI (ASGI); the worker gets its own sync engine. | Yes | agent |
|
||||
| D005 | | architecture | Embedding/Qdrant failure handling strategy in pipeline | Embedding/Qdrant failures (stage 6) log errors but do not fail the pipeline. Processing_status is set by stages 2-5 only. Embeddings can be regenerated by manual re-trigger. | Qdrant is at 10.0.0.10 on the hypervisor network and may not be reachable during all pipeline runs. Making embedding a non-blocking side-effect ensures core pipeline output (KeyMoments, TechniquePages in PostgreSQL) is never lost due to vector store issues. The manual re-trigger endpoint allows regenerating embeddings at any time. | Yes | agent |
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ Stand up the complete Chrysopedia stack: Docker Compose deployment on ub01, Post
|
|||
| ID | Slice | Risk | Depends | Done | After this |
|
||||
|----|-------|------|---------|------|------------|
|
||||
| S01 | Docker Compose + Database + Whisper Script | low | — | ✅ | docker compose up -d starts all services on ub01; Whisper script transcribes a sample video to JSON |
|
||||
| S02 | Transcript Ingestion API | low | S01 | ⬜ | POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL |
|
||||
| S02 | Transcript Ingestion API | low | S01 | ✅ | POST a transcript JSON file to the API; Creator and Source Video records appear in PostgreSQL |
|
||||
| S03 | LLM Extraction Pipeline + Qdrant Integration | high | S02 | ⬜ | A transcript JSON triggers stages 2-5: segmentation → extraction → classification → synthesis. Technique pages with key moments appear in DB. Qdrant has searchable embeddings. |
|
||||
| S04 | Review Queue Admin UI | medium | S03 | ⬜ | Admin views pending key moments, approves/edits/rejects them, toggles between review and auto mode |
|
||||
| S05 | Search-First Web UI | medium | S03 | ⬜ | User searches for a technique, gets semantic results in <500ms, clicks through to a full technique page with study guide prose, key moments, and related links |
|
||||
|
|
|
|||
108
.gsd/milestones/M001/slices/S02/S02-SUMMARY.md
Normal file
108
.gsd/milestones/M001/slices/S02/S02-SUMMARY.md
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
---
|
||||
id: S02
|
||||
parent: M001
|
||||
milestone: M001
|
||||
provides:
|
||||
- POST /api/v1/ingest endpoint accepting Whisper transcript JSON
|
||||
- Creator and SourceVideo records in PostgreSQL with TranscriptSegments
|
||||
- Raw transcript JSON persisted to transcript_storage_path
|
||||
- pytest-asyncio test infrastructure with async fixtures and ASGI client
|
||||
- TranscriptIngestResponse Pydantic schema
|
||||
requires:
|
||||
[]
|
||||
affects:
|
||||
- S03
|
||||
key_files:
|
||||
- backend/routers/ingest.py
|
||||
- backend/schemas.py
|
||||
- backend/main.py
|
||||
- backend/requirements.txt
|
||||
- backend/tests/conftest.py
|
||||
- backend/tests/test_ingest.py
|
||||
- backend/tests/fixtures/sample_transcript.json
|
||||
- backend/pytest.ini
|
||||
- backend/models.py
|
||||
key_decisions:
|
||||
- Used NullPool for test engine to avoid asyncpg connection contention in pytest-asyncio
|
||||
- Fixed _now() helper to return naive UTC datetimes for asyncpg TIMESTAMP WITHOUT TIME ZONE compatibility
|
||||
- Used slugify helper inline in ingest.py rather than a shared utils module
|
||||
- Set file_path to {creator_folder}/{source_file} for new SourceVideo records
|
||||
patterns_established:
|
||||
- pytest-asyncio integration test pattern: function-scoped NullPool engine + ASGI transport client with dependency overrides
|
||||
- Multipart JSON file upload pattern for FastAPI endpoints
|
||||
- Creator auto-detection from folder_name with find-or-create and slugify
|
||||
observability_surfaces:
|
||||
- INFO-level structured logging on successful ingest (creator name, filename, segment count)
|
||||
- pytest output with per-test pass/fail and timing via -v flag
|
||||
drill_down_paths:
|
||||
- .gsd/milestones/M001/slices/S02/tasks/T01-SUMMARY.md
|
||||
- .gsd/milestones/M001/slices/S02/tasks/T02-SUMMARY.md
|
||||
duration: ""
|
||||
verification_result: passed
|
||||
completed_at: 2026-03-29T22:19:19.537Z
|
||||
blocker_discovered: false
|
||||
---
|
||||
|
||||
# S02: Transcript Ingestion API
|
||||
|
||||
**Delivered POST /api/v1/ingest endpoint with creator auto-detection, SourceVideo upsert, TranscriptSegment bulk insert, raw JSON persistence, and 6 passing integration tests against real PostgreSQL.**
|
||||
|
||||
## What Happened
|
||||
|
||||
This slice built the transcript ingestion API — the critical bridge between Whisper transcription output (S01) and the LLM extraction pipeline (S03).
|
||||
|
||||
**T01 — Ingest Endpoint** created `POST /api/v1/ingest` accepting multipart JSON uploads. The endpoint parses Whisper transcript JSON, finds-or-creates a Creator by `folder_name` (with slugify), upserts a SourceVideo by `(creator_id, filename)` — deleting old segments on re-upload for idempotency — bulk-inserts TranscriptSegments with `segment_index`, saves the raw JSON to the configured `transcript_storage_path`, and returns a structured `TranscriptIngestResponse`. Error handling covers invalid files (400), malformed JSON (422), and DB/filesystem failures (500). The router is mounted in `main.py` under `/api/v1`.
|
||||
|
||||
**T02 — Integration Tests** established the pytest + pytest-asyncio test infrastructure with async fixtures: function-scoped engine with NullPool for connection isolation, ASGI transport test client with dependency overrides, and a sample transcript fixture. Six tests prove: (1) happy-path creator+video+segments creation, (2) existing creator reuse, (3) idempotent re-upload (same video_id, no duplicate segments), (4) raw JSON persistence to disk, (5) invalid JSON rejection, (6) missing fields rejection. During T02, a bug was found and fixed in `models.py` — the `_now()` helper produced timezone-aware datetimes that asyncpg rejects for TIMESTAMP WITHOUT TIME ZONE columns.
|
||||
|
||||
All slice verification checks pass: router routes, schema fields, dependency presence, main.py wiring, 6/6 tests green, docker compose config valid.
|
||||
|
||||
## Verification
|
||||
|
||||
All verification checks passed:
|
||||
1. `python -m pytest tests/test_ingest.py -v` — 6/6 tests passed in 2.93s
|
||||
2. `from routers.ingest import router; print(router.routes)` — outputs `['/ingest']`
|
||||
3. `from schemas import TranscriptIngestResponse; print(model_fields.keys())` — shows all 7 fields
|
||||
4. `grep -q 'python-multipart' requirements.txt` — exits 0
|
||||
5. `grep -q 'ingest' main.py` — exits 0
|
||||
6. `docker compose config > /dev/null` — exits 0
|
||||
|
||||
## Requirements Advanced
|
||||
|
||||
- R002 — POST /api/v1/ingest endpoint accepts transcript JSON uploads, creates/updates Creator and SourceVideo records, stores transcript data in PostgreSQL, and persists raw JSON to filesystem. 6 integration tests prove the full flow.
|
||||
|
||||
## Requirements Validated
|
||||
|
||||
- R002 — 6 passing integration tests: happy-path ingestion creates Creator+SourceVideo+5 TranscriptSegments, re-upload is idempotent (same video_id, segments replaced not doubled), existing creators are reused, raw JSON saved to disk, invalid/malformed JSON rejected with proper HTTP status codes.
|
||||
|
||||
## New Requirements Surfaced
|
||||
|
||||
None.
|
||||
|
||||
## Requirements Invalidated or Re-scoped
|
||||
|
||||
None.
|
||||
|
||||
## Deviations
|
||||
|
||||
Fixed bug in backend/models.py _now() function during T02 — changed from datetime.now(timezone.utc) to datetime.now(timezone.utc).replace(tzinfo=None) to match TIMESTAMP WITHOUT TIME ZONE column types. This was discovered during integration testing and was not in the original plan.
|
||||
|
||||
## Known Limitations
|
||||
|
||||
Tests require a running PostgreSQL instance on localhost:5433 — they are integration tests, not unit tests. No SQLite fallback. The test database (chrysopedia_test) must be created manually before running tests.
|
||||
|
||||
## Follow-ups
|
||||
|
||||
None.
|
||||
|
||||
## Files Created/Modified
|
||||
|
||||
- `backend/routers/ingest.py` — New file — POST /api/v1/ingest endpoint with creator auto-detection, SourceVideo upsert, TranscriptSegment bulk insert, raw JSON persistence
|
||||
- `backend/schemas.py` — Added TranscriptIngestResponse Pydantic model with 7 fields
|
||||
- `backend/main.py` — Mounted ingest router under /api/v1 prefix
|
||||
- `backend/requirements.txt` — Added python-multipart, pytest, pytest-asyncio, httpx dependencies
|
||||
- `backend/models.py` — Fixed _now() to return naive UTC datetimes for asyncpg compatibility
|
||||
- `backend/tests/conftest.py` — New file — async test fixtures: NullPool engine, ASGI client, sample transcript path
|
||||
- `backend/tests/test_ingest.py` — New file — 6 integration tests for ingest endpoint
|
||||
- `backend/tests/fixtures/sample_transcript.json` — New file — 5-segment sample transcript JSON fixture
|
||||
- `backend/pytest.ini` — New file — asyncio_mode = auto configuration
|
||||
95
.gsd/milestones/M001/slices/S02/S02-UAT.md
Normal file
95
.gsd/milestones/M001/slices/S02/S02-UAT.md
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
# S02: Transcript Ingestion API — UAT
|
||||
|
||||
**Milestone:** M001
|
||||
**Written:** 2026-03-29T22:19:19.537Z
|
||||
|
||||
## UAT: S02 — Transcript Ingestion API
|
||||
|
||||
### Preconditions
|
||||
- PostgreSQL running on `localhost:5433` (via `docker compose up -d chrysopedia-db`)
|
||||
- Test database `chrysopedia_test` exists: `docker exec chrysopedia-db psql -U chrysopedia -c "CREATE DATABASE chrysopedia_test;"`
|
||||
- Backend dependencies installed: `cd backend && pip install -r requirements.txt`
|
||||
- Working directory: `backend/`
|
||||
|
||||
---
|
||||
|
||||
### Test 1: Happy-Path Ingestion
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_creates_creator_and_video -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- Response status 200
|
||||
- Response JSON contains: `video_id` (UUID), `creator_id` (UUID), `creator_name` = "Skope", `segments_stored` = 5, `processing_status` = "transcribed", `is_reupload` = false
|
||||
- Creator record exists in DB with `folder_name` = "Skope", `slug` = "skope"
|
||||
- SourceVideo record exists with correct creator linkage
|
||||
- 5 TranscriptSegment rows with `segment_index` 0–4
|
||||
|
||||
### Test 2: Creator Reuse
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_reuses_existing_creator -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- When a Creator with `folder_name` = "Skope" already exists, ingesting a transcript with `creator_folder` = "Skope" reuses the existing Creator (same ID)
|
||||
- Only 1 Creator row in the database (not duplicated)
|
||||
|
||||
### Test 3: Idempotent Re-upload
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_idempotent_reupload -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- Uploading the same transcript file twice returns `is_reupload` = true on second upload
|
||||
- Same `video_id` returned both times
|
||||
- Still only 5 TranscriptSegment rows (old segments deleted, new ones inserted)
|
||||
- Only 1 SourceVideo row exists
|
||||
|
||||
### Test 4: Raw JSON Persistence
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_saves_json_to_disk -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- Raw JSON file saved at `{transcript_storage_path}/Skope/{source_file}.json`
|
||||
- File content is valid JSON matching the uploaded payload
|
||||
|
||||
### Test 5: Invalid JSON Rejection
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_rejects_invalid_json -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- Uploading a file with invalid JSON syntax returns HTTP 400 or 422
|
||||
- No Creator, SourceVideo, or TranscriptSegment records created
|
||||
|
||||
### Test 6: Missing Fields Rejection
|
||||
**Steps:**
|
||||
1. Run `python -m pytest tests/test_ingest.py::test_ingest_rejects_missing_fields -v`
|
||||
|
||||
**Expected:**
|
||||
- Test passes
|
||||
- Uploading JSON without required `creator_folder` field returns HTTP 400 or 422
|
||||
- No partial records created in the database
|
||||
|
||||
### Test 7: Full Suite
|
||||
**Steps:**
|
||||
1. Run `cd backend && python -m pytest tests/test_ingest.py -v`
|
||||
|
||||
**Expected:**
|
||||
- All 6 tests pass
|
||||
- Total runtime under 10 seconds
|
||||
|
||||
### Test 8: Endpoint Wiring Smoke Test
|
||||
**Steps:**
|
||||
1. Run `cd backend && python3 -c "from main import app; print([r.path for r in app.routes if 'ingest' in r.path])"`
|
||||
|
||||
**Expected:**
|
||||
- Output includes `/api/v1/ingest`
|
||||
|
||||
### Edge Case: Docker Compose Compatibility
|
||||
**Steps:**
|
||||
1. Run `docker compose config > /dev/null` from project root
|
||||
|
||||
**Expected:**
|
||||
- Exits 0 (no regression from S02 changes)
|
||||
30
.gsd/milestones/M001/slices/S02/tasks/T02-VERIFY.json
Normal file
30
.gsd/milestones/M001/slices/S02/tasks/T02-VERIFY.json
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
{
|
||||
"schemaVersion": 1,
|
||||
"taskId": "T02",
|
||||
"unitId": "M001/S02/T02",
|
||||
"timestamp": 1774822575906,
|
||||
"passed": false,
|
||||
"discoverySource": "task-plan",
|
||||
"checks": [
|
||||
{
|
||||
"command": "cd backend",
|
||||
"exitCode": 0,
|
||||
"durationMs": 4,
|
||||
"verdict": "pass"
|
||||
},
|
||||
{
|
||||
"command": "python -m pytest tests/test_ingest.py -v",
|
||||
"exitCode": 127,
|
||||
"durationMs": 4,
|
||||
"verdict": "fail"
|
||||
},
|
||||
{
|
||||
"command": "docker compose config > /dev/null 2>&1",
|
||||
"exitCode": 0,
|
||||
"durationMs": 62,
|
||||
"verdict": "pass"
|
||||
}
|
||||
],
|
||||
"retryAttempt": 1,
|
||||
"maxRetries": 2
|
||||
}
|
||||
|
|
@ -1,6 +1,284 @@
|
|||
# S03: LLM Extraction Pipeline + Qdrant Integration
|
||||
|
||||
**Goal:** Complete LLM pipeline with editable prompt templates, canonical tag system, Qdrant embedding, and resumable processing
|
||||
**Goal:** A transcript JSON triggers stages 2-5: segmentation → extraction → classification → synthesis. Technique pages with key moments appear in DB. Qdrant has searchable embeddings. Pipeline is resumable per-video per-stage, uses configurable LLM endpoints with fallback, and loads prompt templates from disk.
|
||||
**Demo:** After this: A transcript JSON triggers stages 2-5: segmentation → extraction → classification → synthesis. Technique pages with key moments appear in DB. Qdrant has searchable embeddings.
|
||||
|
||||
## Tasks
|
||||
- [x] **T01: Extended Settings with 12 LLM/embedding/Qdrant config fields, created Celery app, built sync LLMClient with primary/fallback logic, and defined 8 Pydantic schemas for pipeline stages 2-5** — Extend Settings with LLM/embedding/Qdrant/prompt config vars, add openai and qdrant-client to requirements.txt, create the Celery app in worker.py, build the sync LLM client with primary/fallback logic, and define Pydantic schemas for all pipeline stage inputs/outputs. This task establishes all infrastructure the pipeline stages depend on.
|
||||
|
||||
## Steps
|
||||
|
||||
1. Add `openai>=1.0,<2.0`, `qdrant-client>=1.9,<2.0`, and `pyyaml>=6.0,<7.0` to `backend/requirements.txt`.
|
||||
|
||||
2. Extend `backend/config.py` `Settings` class with these fields (all with sensible defaults from .env.example):
|
||||
- `llm_api_url: str = 'http://localhost:11434/v1'`
|
||||
- `llm_api_key: str = 'sk-placeholder'`
|
||||
- `llm_model: str = 'qwen2.5:14b-q8_0'`
|
||||
- `llm_fallback_url: str = 'http://localhost:11434/v1'`
|
||||
- `llm_fallback_model: str = 'qwen2.5:14b-q8_0'`
|
||||
- `embedding_api_url: str = 'http://localhost:11434/v1'`
|
||||
- `embedding_model: str = 'nomic-embed-text'`
|
||||
- `embedding_dimensions: int = 768`
|
||||
- `qdrant_url: str = 'http://localhost:6333'`
|
||||
- `qdrant_collection: str = 'chrysopedia'`
|
||||
- `prompts_path: str = './prompts'`
|
||||
- `review_mode: bool = True`
|
||||
|
||||
3. Create `backend/worker.py`: Celery app instance using `config.get_settings().redis_url` as broker. Import and register tasks from `pipeline.stages` (import the module so decorators register). Worker must be importable as `celery -A worker worker`.
|
||||
|
||||
4. Create `backend/pipeline/__init__.py` (empty).
|
||||
|
||||
5. Create `backend/pipeline/schemas.py` with Pydantic models:
|
||||
- `TopicSegment(start_index, end_index, topic_label, summary)` — stage 2 output per segment group
|
||||
- `SegmentationResult(segments: list[TopicSegment])` — stage 2 full output
|
||||
- `ExtractedMoment(title, summary, start_time, end_time, content_type, plugins, raw_transcript)` — stage 3 output per moment
|
||||
- `ExtractionResult(moments: list[ExtractedMoment])` — stage 3 full output
|
||||
- `ClassifiedMoment(moment_index, topic_category, topic_tags, content_type_override)` — stage 4 output per moment
|
||||
- `ClassificationResult(classifications: list[ClassifiedMoment])` — stage 4 full output
|
||||
- `SynthesizedPage(title, slug, topic_category, topic_tags, summary, body_sections, signal_chains, plugins, source_quality)` — stage 5 output
|
||||
- `SynthesisResult(pages: list[SynthesizedPage])` — stage 5 full output
|
||||
All fields should use appropriate types: `content_type` as str (enum value), `body_sections` as dict, `signal_chains` as list[dict], etc.
|
||||
|
||||
6. Create `backend/pipeline/llm_client.py`:
|
||||
- Class `LLMClient` initialized with `settings: Settings`
|
||||
- Uses sync `openai.OpenAI(base_url=settings.llm_api_url, api_key=settings.llm_api_key)`
|
||||
- Method `complete(system_prompt: str, user_prompt: str, response_model: type[BaseModel] | None = None) -> str` that:
|
||||
a. Tries primary endpoint with `response_format={'type': 'json_object'}` if response_model is provided
|
||||
b. On connection/timeout error, tries fallback endpoint (`openai.OpenAI(base_url=settings.llm_fallback_url)` with `settings.llm_fallback_model`)
|
||||
c. Returns the raw completion text
|
||||
d. Logs WARNING on fallback, ERROR on total failure
|
||||
- Method `parse_response(text: str, model: type[T]) -> T` that validates JSON via Pydantic `model_validate_json()` with error handling
|
||||
- Catch `openai.APIConnectionError`, `openai.APITimeoutError` for fallback trigger
|
||||
- IMPORTANT: Use sync `openai.OpenAI`, not async — Celery tasks run in sync context
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] Settings has all 12 new fields with correct defaults
|
||||
- [ ] `openai`, `qdrant-client`, `pyyaml` in requirements.txt
|
||||
- [ ] `celery -A worker worker` is syntactically valid (worker.py creates Celery app and imports pipeline.stages)
|
||||
- [ ] LLMClient uses sync openai.OpenAI, not AsyncOpenAI
|
||||
- [ ] LLMClient has primary/fallback logic with proper exception handling
|
||||
- [ ] All 8 Pydantic schema classes defined with correct field types
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` prints defaults
|
||||
- `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` exits 0
|
||||
- `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` exits 0
|
||||
- `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt`
|
||||
- Estimate: 1.5h
|
||||
- Files: backend/config.py, backend/requirements.txt, backend/worker.py, backend/pipeline/__init__.py, backend/pipeline/schemas.py, backend/pipeline/llm_client.py
|
||||
- Verify: cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)" && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')" && python -c "from pipeline.llm_client import LLMClient; print('client ok')" && python -c "from worker import celery_app; print(celery_app.main)"
|
||||
- [ ] **T02: Prompt templates and pipeline stages 2-5 with orchestrator** — Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
|
||||
|
||||
## Failure Modes
|
||||
|
||||
| Dependency | On error | On timeout | On malformed response |
|
||||
|------------|----------|-----------|----------------------|
|
||||
| LLM API (primary) | Fall back to secondary endpoint | Fall back to secondary endpoint | Log raw response, retry once with 'output valid JSON' nudge, then fail stage |
|
||||
| PostgreSQL | Task fails, Celery retries (max 3) | Task fails, Celery retries | N/A |
|
||||
| Prompt template file | Task fails immediately with FileNotFoundError logged | N/A | N/A |
|
||||
|
||||
## Negative Tests
|
||||
|
||||
- **Malformed inputs**: LLM returns non-JSON text → caught by parse_response, logged with raw text
|
||||
- **Error paths**: LLM returns valid JSON but wrong schema → Pydantic ValidationError caught, logged
|
||||
- **Boundary conditions**: Empty transcript (0 segments) → stage 2 returns empty segmentation, stages 3-5 skip gracefully
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `prompts/stage2_segmentation.txt`: System prompt instructing the LLM to analyze transcript segments and identify topic boundaries. Input: full transcript text with segment indices. Output: JSON matching `SegmentationResult` schema — list of topic segments with start_index, end_index, topic_label, summary. Use XML-style `<transcript>` tags to fence user content from instructions.
|
||||
|
||||
2. Create `prompts/stage3_extraction.txt`: System prompt for extracting key moments from a topic segment. Input: segment text with timestamps. Output: JSON matching `ExtractionResult` — list of moments with title, summary, timestamps, content_type (technique/settings/reasoning/workflow), plugins mentioned, raw_transcript excerpt.
|
||||
|
||||
3. Create `prompts/stage4_classification.txt`: System prompt for classifying key moments against the canonical tag taxonomy. Input: moment title + summary + list of canonical categories/sub-topics from `canonical_tags.yaml`. Output: JSON matching `ClassificationResult` — topic_category, topic_tags for each moment.
|
||||
|
||||
4. Create `prompts/stage5_synthesis.txt`: System prompt for synthesizing a technique page from key moments. Input: all moments for a creator+topic group. Output: JSON matching `SynthesisResult` — title, slug, summary, body_sections (structured prose with sub-aspects), signal_chains, plugins, source_quality assessment.
|
||||
|
||||
5. Create `backend/pipeline/stages.py` with these components:
|
||||
a. Helper `_load_prompt(template_name: str) -> str` — reads from `settings.prompts_path / template_name`
|
||||
b. Helper `_get_sync_session()` — creates a sync SQLAlchemy session for Celery tasks using `create_engine()` (sync, not async) with the DATABASE_URL converted from `postgresql+asyncpg://` to `postgresql+psycopg2://` (or use `sqlalchemy.create_engine` with sync URL). **CRITICAL**: Celery is sync, so use `sqlalchemy.orm.Session`, not async.
|
||||
c. `@celery_app.task(bind=True, max_retries=3)` for `stage2_segmentation(self, video_id: str)`: Load transcript segments from DB ordered by segment_index, build prompt with full text, call LLM, parse SegmentationResult, update `topic_label` on each TranscriptSegment row.
|
||||
d. `@celery_app.task(bind=True, max_retries=3)` for `stage3_extraction(self, video_id: str)`: Group segments by topic_label, for each group call LLM to extract moments, create KeyMoment rows in DB, set `processing_status = extracted` on SourceVideo.
|
||||
e. `@celery_app.task(bind=True, max_retries=3)` for `stage4_classification(self, video_id: str)`: Load key moments, load canonical tags from `config/canonical_tags.yaml`, call LLM to classify, update topic_tags on moments. Stage 4 does NOT change processing_status.
|
||||
f. `@celery_app.task(bind=True, max_retries=3)` for `stage5_synthesis(self, video_id: str)`: Group key moments by (creator, topic_category), for each group call LLM to synthesize, create/update TechniquePage rows with body_sections, signal_chains, plugins, topic_tags, summary. Link KeyMoments to their TechniquePage via technique_page_id. Set `processing_status = reviewed` (or `published` if `settings.review_mode is False`).
|
||||
g. `@celery_app.task` for `run_pipeline(video_id: str)`: Check current processing_status, chain the appropriate stages (e.g., if `transcribed`, chain 2→3→4→5; if `extracted`, chain 4→5). Use `celery.chain()` for sequential execution.
|
||||
|
||||
6. Import `stages` module in `worker.py` to register tasks (ensure the import line `from pipeline import stages` is present after celery_app creation).
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] 4 prompt template files in `prompts/` with clear instruction/data separation
|
||||
- [ ] All 5 stage tasks + run_pipeline orchestrator defined in stages.py
|
||||
- [ ] Celery tasks use SYNC SQLAlchemy sessions (not async)
|
||||
- [ ] Stage 2 updates topic_label on TranscriptSegment rows
|
||||
- [ ] Stage 3 creates KeyMoment rows and sets processing_status=extracted
|
||||
- [ ] Stage 4 loads canonical_tags.yaml and classifies moments
|
||||
- [ ] Stage 5 creates TechniquePage rows and sets processing_status=reviewed/published
|
||||
- [ ] run_pipeline handles resumability based on current processing_status
|
||||
- [ ] Prompts fence user content with XML-style tags
|
||||
|
||||
## Verification
|
||||
|
||||
- `test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt` — all 4 prompt files exist
|
||||
- `cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"` — imports succeed
|
||||
- `cd backend && python -c "from worker import celery_app; print([t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t])"` — shows registered tasks
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: INFO log at start/end of each stage with video_id and duration, WARNING on LLM fallback, ERROR on parse failure with raw response excerpt
|
||||
- How a future agent inspects this: query `source_videos.processing_status` to see pipeline progress, check Celery worker logs for stage timing
|
||||
- Failure state exposed: video stays at last successful processing_status, error logged with stage name and video_id
|
||||
- Estimate: 2.5h
|
||||
- Files: prompts/stage2_segmentation.txt, prompts/stage3_extraction.txt, prompts/stage4_classification.txt, prompts/stage5_synthesis.txt, backend/pipeline/stages.py, backend/worker.py
|
||||
- Verify: test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt && cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"
|
||||
- [ ] **T03: Qdrant integration and embedding client** — Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads.
|
||||
|
||||
## Failure Modes
|
||||
|
||||
| Dependency | On error | On timeout | On malformed response |
|
||||
|------------|----------|-----------|----------------------|
|
||||
| Embedding API | Log error, skip embedding for this batch, pipeline continues | Same as error | Validate vector dimensions match config, log mismatch |
|
||||
| Qdrant server | Log error, skip upsert, pipeline continues (embeddings are not blocking) | Same as error | N/A |
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `backend/pipeline/embedding_client.py`:
|
||||
- Class `EmbeddingClient` initialized with `settings: Settings`
|
||||
- Uses sync `openai.OpenAI(base_url=settings.embedding_api_url, api_key=settings.llm_api_key)`
|
||||
- Method `embed(texts: list[str]) -> list[list[float]]` that calls `client.embeddings.create(model=settings.embedding_model, input=texts)` and returns the vector list
|
||||
- Handle `openai.APIConnectionError` gracefully — log and return empty list
|
||||
- Validate returned vector dimensions match `settings.embedding_dimensions`
|
||||
|
||||
2. Create `backend/pipeline/qdrant_client.py`:
|
||||
- Class `QdrantManager` initialized with `settings: Settings`
|
||||
- Uses sync `qdrant_client.QdrantClient(url=settings.qdrant_url)`
|
||||
- Method `ensure_collection()` — checks `collection_exists()`, creates with `VectorParams(size=settings.embedding_dimensions, distance=Distance.COSINE)` if not
|
||||
- Method `upsert_points(points: list[PointStruct])` — wraps `client.upsert()`
|
||||
- Method `upsert_technique_pages(pages: list[dict], vectors: list[list[float]])` — builds PointStruct list with payload (page_id, creator_id, title, topic_category, topic_tags, summary) and upserts
|
||||
- Method `upsert_key_moments(moments: list[dict], vectors: list[list[float]])` — builds PointStruct list with payload (moment_id, source_video_id, title, start_time, end_time, content_type) and upserts
|
||||
- Handle `qdrant_client.http.exceptions.UnexpectedResponse` for connection errors
|
||||
|
||||
3. Add a new Celery task `stage6_embed_and_index(video_id: str)` in `backend/pipeline/stages.py`:
|
||||
- Load all KeyMoments and TechniquePages created for this video
|
||||
- Build text strings for embedding (moment: title + summary; page: title + summary + topic_category)
|
||||
- Call EmbeddingClient.embed() to get vectors
|
||||
- Call QdrantManager.ensure_collection() then upsert_technique_pages() and upsert_key_moments()
|
||||
- This stage runs after stage 5 in the pipeline chain but does NOT update processing_status (it's a side-effect)
|
||||
- If embedding/Qdrant fails, log error but don't fail the pipeline — embeddings can be regenerated later
|
||||
|
||||
4. Update `run_pipeline` in `stages.py` to append `stage6_embed_and_index` to the chain.
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] EmbeddingClient uses sync openai.OpenAI for /v1/embeddings calls
|
||||
- [ ] QdrantManager creates collection only if not exists
|
||||
- [ ] Qdrant points include metadata payloads (page_id/moment_id, creator_id, topic, timestamps)
|
||||
- [ ] stage6 is non-blocking — embedding/Qdrant failures don't fail the pipeline
|
||||
- [ ] Vector dimension from config, not hardcoded
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"` exits 0
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: INFO log for embedding batch size and Qdrant upsert count, WARNING on embedding/Qdrant failures with error details
|
||||
- How a future agent inspects this: check Qdrant dashboard at QDRANT_URL, query collection point count
|
||||
- Failure state exposed: embedding failures logged but pipeline completes successfully
|
||||
- Estimate: 1.5h
|
||||
- Files: backend/pipeline/embedding_client.py, backend/pipeline/qdrant_client.py, backend/pipeline/stages.py
|
||||
- Verify: cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')" && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')" && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"
|
||||
- [ ] **T04: Wire ingest-to-pipeline trigger and add manual re-trigger endpoint** — Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos.
|
||||
|
||||
## Steps
|
||||
|
||||
1. Modify `backend/routers/ingest.py`:
|
||||
- After the `await db.commit()` and refresh calls (after the successful response is ready), dispatch the pipeline: `from pipeline.stages import run_pipeline; run_pipeline.delay(str(video.id))`
|
||||
- Wrap the dispatch in try/except to avoid failing the ingest response if Redis/Celery is down — log WARNING on dispatch failure
|
||||
- The dispatch must happen AFTER commit so the worker can find the data in PostgreSQL
|
||||
|
||||
2. Create `backend/routers/pipeline.py`:
|
||||
- Router with `prefix='/pipeline'`, tag `['pipeline']`
|
||||
- `POST /trigger/{video_id}`: Look up SourceVideo by id, verify it exists (404 if not), dispatch `run_pipeline.delay(str(video_id))`, return `{"status": "triggered", "video_id": str(video_id), "current_processing_status": video.processing_status.value}`
|
||||
- Uses `get_session` dependency for DB access
|
||||
|
||||
3. Mount the pipeline router in `backend/main.py` under `/api/v1`.
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] Ingest endpoint dispatches run_pipeline.delay() after commit
|
||||
- [ ] Pipeline dispatch failure does not fail the ingest response
|
||||
- [ ] POST /api/v1/pipeline/trigger/{video_id} exists and returns status
|
||||
- [ ] 404 returned for non-existent video_id
|
||||
- [ ] Pipeline router mounted in main.py
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])"` shows `['/trigger/{video_id}']`
|
||||
- `grep -q 'pipeline' backend/main.py` exits 0
|
||||
- `grep -q 'run_pipeline' backend/routers/ingest.py` exits 0
|
||||
- Estimate: 45m
|
||||
- Files: backend/routers/ingest.py, backend/routers/pipeline.py, backend/main.py
|
||||
- Verify: cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])" && grep -q 'pipeline' backend/main.py && grep -q 'run_pipeline' backend/routers/ingest.py
|
||||
- [ ] **T05: Integration tests for pipeline, embedding, Qdrant, and trigger endpoints** — Write comprehensive integration tests with mocked LLM and Qdrant that verify the entire pipeline flow. Tests use the existing pytest-asyncio infrastructure from S02, extended with mocks for the OpenAI client and Qdrant client.
|
||||
|
||||
## Negative Tests
|
||||
|
||||
- **Malformed inputs**: LLM returns invalid JSON → pipeline handles gracefully, logs error
|
||||
- **Error paths**: LLM primary endpoint connection refused → falls back to secondary
|
||||
- **Boundary conditions**: Video with 0 segments → pipeline completes without errors, no moments created
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `backend/tests/fixtures/mock_llm_responses.py`:
|
||||
- Dictionary mapping stage names to valid JSON response strings matching each Pydantic schema
|
||||
- Stage 2: SegmentationResult with 2 topic segments covering the 5 sample transcript segments
|
||||
- Stage 3: ExtractionResult with 2 key moments (one technique, one settings)
|
||||
- Stage 4: ClassificationResult mapping moments to canonical tags
|
||||
- Stage 5: SynthesisResult with a technique page including body_sections, signal_chains, plugins
|
||||
- Embedding response: list of 768-dimensional vectors (can be random floats for testing)
|
||||
|
||||
2. Create `backend/tests/test_pipeline.py` with these tests using `unittest.mock.patch` on `openai.OpenAI` and `qdrant_client.QdrantClient`:
|
||||
a. `test_stage2_segmentation_updates_topic_labels`: Ingest sample transcript, run stage2 with mocked LLM, verify TranscriptSegment rows have topic_label set
|
||||
b. `test_stage3_extraction_creates_key_moments`: Run stages 2+3, verify KeyMoment rows created with correct fields, processing_status = extracted
|
||||
c. `test_stage4_classification_assigns_tags`: Run stages 2+3+4, verify moments have topic_category and topic_tags from canonical list
|
||||
d. `test_stage5_synthesis_creates_technique_pages`: Run full pipeline stages 2-5, verify TechniquePage rows created with body_sections, signal_chains, summary. Verify KeyMoments linked to TechniquePage via technique_page_id
|
||||
e. `test_stage6_embeds_and_upserts_to_qdrant`: Run full pipeline, verify EmbeddingClient.embed called with correct texts, QdrantManager.upsert called with correct payloads
|
||||
f. `test_run_pipeline_resumes_from_extracted`: Set video processing_status to extracted, run pipeline, verify only stages 4+5+6 execute (not 2+3)
|
||||
g. `test_pipeline_trigger_endpoint`: POST to /api/v1/pipeline/trigger/{video_id} with mocked Celery delay, verify 200 response
|
||||
h. `test_pipeline_trigger_404_for_missing_video`: POST to /api/v1/pipeline/trigger/{nonexistent_id}, verify 404
|
||||
i. `test_ingest_dispatches_pipeline`: POST transcript to ingest endpoint with mocked run_pipeline.delay, verify delay was called with video_id
|
||||
j. `test_llm_fallback_on_primary_failure`: Mock primary endpoint to raise APIConnectionError, verify fallback endpoint is called
|
||||
|
||||
IMPORTANT: Pipeline stages use SYNC SQLAlchemy (not async). Tests that call stage functions directly need a real PostgreSQL test database. Use the existing conftest.py pattern but create sync engine/sessions for pipeline stage calls. Mock the LLM and Qdrant clients, not the DB.
|
||||
|
||||
3. Update `backend/tests/conftest.py` to add:
|
||||
- A fixture that creates a sync SQLAlchemy engine pointing to the test database (for pipeline stage tests)
|
||||
- A fixture that pre-ingests a sample transcript and returns the video_id (for pipeline tests to use)
|
||||
- Any necessary mock fixtures for LLMClient and QdrantManager
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] 10 integration tests covering all pipeline stages, trigger endpoints, resumability, and fallback
|
||||
- [ ] All LLM calls mocked with realistic response fixtures
|
||||
- [ ] Qdrant calls mocked — no real Qdrant needed
|
||||
- [ ] Pipeline stage tests use sync DB sessions (matching production Celery behavior)
|
||||
- [ ] At least one negative test (LLM fallback on primary failure)
|
||||
- [ ] All tests pass with `python -m pytest tests/test_pipeline.py -v`
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -m pytest tests/test_pipeline.py -v` — all tests pass
|
||||
- `cd backend && python -m pytest tests/ -v` — all tests (including S02 ingest tests) still pass
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: test output shows per-test pass/fail with timing, mock call assertions verify logging would fire in production
|
||||
- How a future agent inspects this: run `pytest tests/test_pipeline.py -v` to see all pipeline test results
|
||||
- Estimate: 2h
|
||||
- Files: backend/tests/fixtures/mock_llm_responses.py, backend/tests/test_pipeline.py, backend/tests/conftest.py
|
||||
- Verify: cd backend && python -m pytest tests/test_pipeline.py -v && python -m pytest tests/ -v
|
||||
|
|
|
|||
125
.gsd/milestones/M001/slices/S03/S03-RESEARCH.md
Normal file
125
.gsd/milestones/M001/slices/S03/S03-RESEARCH.md
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
# S03 — LLM Extraction Pipeline + Qdrant Integration — Research
|
||||
|
||||
**Date:** 2026-03-29
|
||||
|
||||
## Summary
|
||||
|
||||
S03 is the highest-risk, highest-complexity slice in M001. It builds the entire LLM extraction pipeline (stages 2–5: segmentation → extraction → classification → synthesis), the Qdrant vector embedding pipeline, the Celery worker infrastructure, the prompt template system, and the config extensions for LLM/embedding endpoints. This slice touches requirements R003, R009, R011, R012, and R013.
|
||||
|
||||
The codebase currently has **no worker/Celery/pipeline code at all** — it's entirely greenfield for this slice. Docker Compose already declares a `chrysopedia-worker` service that runs `celery -A worker worker`, but the `worker` module doesn't exist yet. The data model is complete — `KeyMoment`, `TechniquePage`, `RelatedTechniqueLink`, and `Tag` tables with their enums (`ProcessingStatus`, `ReviewStatus`, `KeyMomentContentType`, etc.) are all defined in `models.py` and migrated via `001_initial.py`. The S02 ingest endpoint already creates `TranscriptSegment` rows and sets `processing_status = transcribed`, which is the trigger state for the pipeline.
|
||||
|
||||
The primary risk is LLM integration: the pipeline must talk to an OpenAI-compatible API (Qwen on DGX Sparks primary, Ollama fallback), parse structured JSON responses, handle failures gracefully, and be resumable per-video per-stage. Secondary risk is Qdrant integration — creating collections with the right schema, generating embeddings via a configurable endpoint, and upserting points with metadata payloads. The Celery+Redis infrastructure is lower risk since both are already in Docker Compose and `celery[redis]` is in requirements.txt.
|
||||
|
||||
## Recommendation
|
||||
|
||||
Build this slice in a layered approach: (1) config/settings extensions first, (2) Celery app + task skeleton, (3) LLM client with fallback, (4) prompt templates on disk, (5) pipeline stages 2–5 as individual Celery tasks with a chain orchestrator, (6) Qdrant client + embedding pipeline, (7) trigger mechanism (post-ingest hook or API endpoint), (8) integration tests with mocked LLM responses.
|
||||
|
||||
Use the `openai` Python SDK for LLM calls — it works with any OpenAI-compatible API (Open WebUI, Ollama, vLLM). Use `qdrant-client` with `AsyncQdrantClient` for vector operations. Use Pydantic models to define expected LLM output schemas for each stage, enabling `response_format` structured output where the model supports it, with JSON-parse fallback for models that don't. Each pipeline stage should be an individual Celery task chained together, with `processing_status` updated after each stage completes, enabling resumability.
|
||||
|
||||
## Implementation Landscape
|
||||
|
||||
### Key Files
|
||||
|
||||
- `backend/models.py` — Complete. All 7 entities exist with correct enums. `ProcessingStatus` has the progression: `pending → transcribed → extracted → reviewed → published`. `KeyMoment` has `review_status`, `content_type`, `plugins`, `raw_transcript`. `TechniquePage` has `body_sections` (JSONB), `signal_chains` (JSONB), `topic_tags` (ARRAY), `source_quality`. No schema changes needed.
|
||||
- `backend/database.py` — Provides `engine`, `async_session`, `get_session`. Worker tasks will need their own session management (not FastAPI dependency injection).
|
||||
- `backend/config.py` — Currently has DB, Redis, CORS, file storage settings. **Must be extended** with: `llm_api_url`, `llm_api_key`, `llm_model`, `llm_fallback_url`, `llm_fallback_model`, `embedding_api_url`, `embedding_model`, `qdrant_url`, `qdrant_collection`, `review_mode` (bool). All referenced in `.env.example` but not yet in `Settings`.
|
||||
- `backend/requirements.txt` — Has `celery[redis]`, `redis`, `httpx`, `pydantic`. **Must add**: `openai`, `qdrant-client`.
|
||||
- `backend/routers/ingest.py` — After successful ingest, should trigger the pipeline. Currently sets `processing_status = transcribed` and commits. Add a post-commit Celery task dispatch: `run_pipeline.delay(video_id)`.
|
||||
- `backend/worker.py` — **New file.** Celery app definition. Docker Compose expects `celery -A worker worker`.
|
||||
- `backend/pipeline/` — **New directory.** Individual stage modules:
|
||||
- `backend/pipeline/__init__.py`
|
||||
- `backend/pipeline/llm_client.py` — OpenAI-compatible client with primary/fallback logic
|
||||
- `backend/pipeline/embedding_client.py` — Embedding generation via OpenAI-compatible `/v1/embeddings` endpoint
|
||||
- `backend/pipeline/qdrant_client.py` — Qdrant collection management and upsert operations
|
||||
- `backend/pipeline/stages.py` — Celery tasks for stages 2–5 + orchestrator chain
|
||||
- `backend/pipeline/schemas.py` — Pydantic models for LLM input/output per stage
|
||||
- `prompts/` — **New template files:**
|
||||
- `prompts/stage2_segmentation.txt` — Topic boundary detection prompt
|
||||
- `prompts/stage3_extraction.txt` — Key moment extraction prompt
|
||||
- `prompts/stage4_classification.txt` — Classification/tagging prompt
|
||||
- `prompts/stage5_synthesis.txt` — Technique page synthesis prompt
|
||||
- `config/canonical_tags.yaml` — Already exists with 6 categories, sub-topics, and genre taxonomy. Pipeline reads this during stage 4 classification.
|
||||
|
||||
### Build Order
|
||||
|
||||
1. **Config + dependencies first** — Extend `Settings` with LLM/embedding/Qdrant/review_mode vars. Add `openai` and `qdrant-client` to requirements.txt. This unblocks everything.
|
||||
|
||||
2. **Celery app + skeleton tasks** — Create `backend/worker.py` with Celery app instance pointing to Redis. Create `backend/pipeline/stages.py` with placeholder tasks for stages 2–5 and a `run_pipeline` orchestrator that chains them. Verify `celery -A worker worker` starts without errors. This proves the Celery infrastructure works.
|
||||
|
||||
3. **LLM client with fallback** — Create `backend/pipeline/llm_client.py` using the `openai` SDK's `AsyncOpenAI` client. The key insight: the openai SDK works with any OpenAI-compatible API by setting `base_url`. Build a wrapper that tries the primary endpoint, catches connection/timeout errors, and falls back to the secondary. Define Pydantic schemas for each stage's expected LLM output in `backend/pipeline/schemas.py`.
|
||||
|
||||
4. **Prompt templates** — Write the 4 prompt template files. These are the soul of the extraction pipeline. Stage 2 (segmentation) needs the full transcript and asks for topic boundaries. Stage 3 (extraction) takes individual segments and extracts key moments. Stage 4 (classification) takes key moments and assigns tags from the canonical list. Stage 5 (synthesis) takes all moments for a creator+topic and produces technique page content.
|
||||
|
||||
5. **Pipeline stages implementation** — Implement the actual logic in each Celery task. Each stage:
|
||||
- Reads its input from PostgreSQL (transcript segments, key moments, etc.)
|
||||
- Loads the appropriate prompt template from disk
|
||||
- Calls the LLM client
|
||||
- Parses the structured response
|
||||
- Writes results to PostgreSQL (topic_labels on segments, new KeyMoment rows, tags, TechniquePage records)
|
||||
- Updates `processing_status` on the SourceVideo
|
||||
- Pipeline is resumable: if a video is at `transcribed`, start from stage 2; if at `extracted`, skip to stage 4 (classification), etc.
|
||||
|
||||
6. **Qdrant integration** — Create `backend/pipeline/qdrant_client.py` and `backend/pipeline/embedding_client.py`. Create the `chrysopedia` collection (if not exists) with vector size matching the embedding model (768 for nomic-embed-text). After stage 5 synthesis (or after key moment creation), embed summaries and upsert to Qdrant with metadata payloads (creator_id, topic, timestamps, source_video_id).
|
||||
|
||||
7. **Ingest trigger** — Modify `backend/routers/ingest.py` to dispatch `run_pipeline.delay(video_id)` after successful commit. Add an API endpoint `POST /api/v1/pipeline/trigger/{video_id}` for manual re-triggering.
|
||||
|
||||
8. **Integration tests** — Test with mocked LLM responses (don't call real LLMs in CI). Verify: pipeline stages update processing_status correctly, key moments are created with correct fields, technique pages are synthesized, Qdrant upsert is called with correct payloads.
|
||||
|
||||
### Verification Approach
|
||||
|
||||
1. `celery -A worker worker --loglevel=debug` starts cleanly with registered tasks visible in output
|
||||
2. `python -c "from pipeline.stages import run_pipeline; print('import ok')"` — module imports work
|
||||
3. `python -c "from pipeline.llm_client import LLMClient; print('client ok')"` — LLM client importable
|
||||
4. Pipeline integration test: ingest a sample transcript → verify processing_status progresses through `transcribed → extracted → reviewed → published` (with mocked LLM)
|
||||
5. Key moment creation test: verify stage 3 output produces KeyMoment rows with correct fields
|
||||
6. Technique page synthesis test: verify stage 5 produces TechniquePage with body_sections, signal_chains, plugins
|
||||
7. Qdrant test: verify embedding client produces vectors and qdrant_client upserts with correct payloads (mock Qdrant)
|
||||
8. Prompt template loading: verify all 4 templates load from `prompts/` directory
|
||||
9. Config test: verify all new Settings fields have correct defaults and load from env vars
|
||||
10. `docker compose config > /dev/null` still passes with worker service
|
||||
|
||||
## Don't Hand-Roll
|
||||
|
||||
| Problem | Existing Solution | Why Use It |
|
||||
|---------|------------------|------------|
|
||||
| OpenAI-compatible API calls | `openai` Python SDK | Works with Open WebUI, Ollama, vLLM by setting `base_url`. Has async support, structured output parsing, retry logic. No need to build raw httpx calls. |
|
||||
| Qdrant vector operations | `qdrant-client` | Official Python client with `AsyncQdrantClient`, typed models for `PointStruct`, `VectorParams`, metadata filtering. Already mature. |
|
||||
| Background job queue | `celery[redis]` | Already in requirements.txt and Docker Compose. Task chaining, retries, result tracking built in. |
|
||||
| Structured LLM output parsing | Pydantic models + openai SDK `response_format` | The openai SDK supports `response_format={"type": "json_object"}` for structured output. Combined with Pydantic `model_validate_json()` for robust parsing. |
|
||||
| YAML config loading | `PyYAML` (already installed) | For loading `canonical_tags.yaml`. Already a dependency. |
|
||||
|
||||
## Constraints
|
||||
|
||||
- **Celery is sync, DB is async.** The Celery worker runs in a sync event loop. Pipeline tasks using SQLAlchemy async require either: (a) running `asyncio.run()` inside each task, (b) using `asgiref.sync_to_async`, or (c) creating a sync SQLAlchemy engine for the worker. Option (a) is simplest — each Celery task wraps an async function with `asyncio.run()`. The openai SDK has both sync and async clients; use the sync client in Celery tasks to avoid complexity.
|
||||
- **Qdrant is at 10.0.0.10 from the hypervisor, not localhost.** The `QDRANT_URL` env var must be configured per-environment. In Docker Compose on the hypervisor, it should point to `http://10.0.0.10:6333`. Tests should mock Qdrant entirely.
|
||||
- **LLM structured output support varies by model.** Qwen via Open WebUI may or may not support `response_format: json_object`. The pipeline must handle both: try structured output first, fall back to prompting for JSON and parsing the raw text response. Always validate with Pydantic regardless.
|
||||
- **Prompts are bind-mounted read-only.** Docker Compose mounts `./prompts:/prompts:ro` for the worker. Prompt templates should be loaded from `/prompts/` in production and `./prompts/` in development. Config should have a `prompts_path` setting.
|
||||
- **`processing_status` enum progression is: `pending → transcribed → extracted → reviewed → published`.** The pipeline must respect this: stage 2+3 set `extracted`, stage 4 doesn't change status (classification is metadata), stage 5 sets `reviewed` (or `published` if `review_mode=false`).
|
||||
- **Naive datetimes required.** Per KNOWLEDGE.md, all datetime values must use `_now()` (naive UTC) for asyncpg compatibility with TIMESTAMP WITHOUT TIME ZONE columns.
|
||||
|
||||
## Common Pitfalls
|
||||
|
||||
- **Celery `asyncio.run()` nesting** — Celery tasks run in a sync context. If you try to call async functions without `asyncio.run()`, you get "no running event loop" errors. If a Celery worker happens to use an async-aware event loop (e.g., with gevent), `asyncio.run()` may fail with "cannot run nested event loop". Safest approach: use sync clients (sync `openai.OpenAI`, sync `QdrantClient`) in Celery tasks, or use a dedicated sync SQLAlchemy engine for the worker.
|
||||
- **LLM JSON parse failures** — LLMs don't always produce valid JSON even when asked. Every LLM response must be wrapped in try/except for `json.JSONDecodeError` and `pydantic.ValidationError`. Log the raw response on failure for debugging. Consider retry with a "please output valid JSON" follow-up.
|
||||
- **Qdrant collection recreation** — `create_collection` fails if collection exists. Always use `collection_exists()` check first, or use `recreate_collection()` only during initial setup.
|
||||
- **Token limits on long transcripts** — A 4-hour livestream transcript could be 50k+ tokens. Stage 2 (segmentation) must handle this via chunking — split the transcript into overlapping windows and segment each independently, then merge boundary segments. This is the hardest part of stage 2.
|
||||
- **Prompt template injection** — Transcript text is user-generated content inserted into prompts. While this is a self-hosted system with trusted content, prompts should clearly delineate the transcript from instructions using XML-style tags or markdown fences.
|
||||
|
||||
## Open Risks
|
||||
|
||||
- **LLM endpoint availability during testing.** Neither DGX Sparks nor local Ollama are available in the CI/test environment. All LLM calls in tests must use mocked responses. This means test coverage proves the pipeline plumbing but not extraction quality — quality validation requires real LLM calls in a staging environment.
|
||||
- **Token window sizing for long transcripts.** The spec mentions 4-hour livestreams. Qwen 2.5-72B has a 128K context window, but sending a full 4-hour transcript (~50K tokens) in one call may degrade quality. The chunking strategy for stage 2 needs careful design — overlapping windows with merge logic at boundaries.
|
||||
- **Embedding model dimension mismatch.** `nomic-embed-text` produces 768-dimensional vectors. If the embedding model is changed later, the Qdrant collection schema must be recreated. The collection creation should read the dimension from config, not hardcode it.
|
||||
- **Open WebUI API compatibility.** Open WebUI wraps Ollama/vLLM with its own API layer. The exact endpoint path and authentication method (API key format) may differ from standard OpenAI API conventions. The LLM client should be tested against the actual endpoint before the pipeline processes real data.
|
||||
|
||||
## Skills Discovered
|
||||
|
||||
| Technology | Skill | Status |
|
||||
|------------|-------|--------|
|
||||
| Celery | `bobmatnyc/claude-mpm-skills@celery` (143 installs) | available |
|
||||
| Qdrant | `davila7/claude-code-templates@qdrant-vector-search` (470 installs) | available |
|
||||
| Qdrant | `giuseppe-trisciuoglio/developer-kit@qdrant` (339 installs) | available |
|
||||
|
||||
Install commands (user decision):
|
||||
- `npx skills add bobmatnyc/claude-mpm-skills@celery`
|
||||
- `npx skills add davila7/claude-code-templates@qdrant-vector-search`
|
||||
91
.gsd/milestones/M001/slices/S03/tasks/T01-PLAN.md
Normal file
91
.gsd/milestones/M001/slices/S03/tasks/T01-PLAN.md
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
---
|
||||
estimated_steps: 52
|
||||
estimated_files: 6
|
||||
skills_used: []
|
||||
---
|
||||
|
||||
# T01: Config extensions, Celery app, LLM client, and pipeline schemas
|
||||
|
||||
Extend Settings with LLM/embedding/Qdrant/prompt config vars, add openai and qdrant-client to requirements.txt, create the Celery app in worker.py, build the sync LLM client with primary/fallback logic, and define Pydantic schemas for all pipeline stage inputs/outputs. This task establishes all infrastructure the pipeline stages depend on.
|
||||
|
||||
## Steps
|
||||
|
||||
1. Add `openai>=1.0,<2.0`, `qdrant-client>=1.9,<2.0`, and `pyyaml>=6.0,<7.0` to `backend/requirements.txt`.
|
||||
|
||||
2. Extend `backend/config.py` `Settings` class with these fields (all with sensible defaults from .env.example):
|
||||
- `llm_api_url: str = 'http://localhost:11434/v1'`
|
||||
- `llm_api_key: str = 'sk-placeholder'`
|
||||
- `llm_model: str = 'qwen2.5:14b-q8_0'`
|
||||
- `llm_fallback_url: str = 'http://localhost:11434/v1'`
|
||||
- `llm_fallback_model: str = 'qwen2.5:14b-q8_0'`
|
||||
- `embedding_api_url: str = 'http://localhost:11434/v1'`
|
||||
- `embedding_model: str = 'nomic-embed-text'`
|
||||
- `embedding_dimensions: int = 768`
|
||||
- `qdrant_url: str = 'http://localhost:6333'`
|
||||
- `qdrant_collection: str = 'chrysopedia'`
|
||||
- `prompts_path: str = './prompts'`
|
||||
- `review_mode: bool = True`
|
||||
|
||||
3. Create `backend/worker.py`: Celery app instance using `config.get_settings().redis_url` as broker. Import and register tasks from `pipeline.stages` (import the module so decorators register). Worker must be importable as `celery -A worker worker`.
|
||||
|
||||
4. Create `backend/pipeline/__init__.py` (empty).
|
||||
|
||||
5. Create `backend/pipeline/schemas.py` with Pydantic models:
|
||||
- `TopicSegment(start_index, end_index, topic_label, summary)` — stage 2 output per segment group
|
||||
- `SegmentationResult(segments: list[TopicSegment])` — stage 2 full output
|
||||
- `ExtractedMoment(title, summary, start_time, end_time, content_type, plugins, raw_transcript)` — stage 3 output per moment
|
||||
- `ExtractionResult(moments: list[ExtractedMoment])` — stage 3 full output
|
||||
- `ClassifiedMoment(moment_index, topic_category, topic_tags, content_type_override)` — stage 4 output per moment
|
||||
- `ClassificationResult(classifications: list[ClassifiedMoment])` — stage 4 full output
|
||||
- `SynthesizedPage(title, slug, topic_category, topic_tags, summary, body_sections, signal_chains, plugins, source_quality)` — stage 5 output
|
||||
- `SynthesisResult(pages: list[SynthesizedPage])` — stage 5 full output
|
||||
All fields should use appropriate types: `content_type` as str (enum value), `body_sections` as dict, `signal_chains` as list[dict], etc.
|
||||
|
||||
6. Create `backend/pipeline/llm_client.py`:
|
||||
- Class `LLMClient` initialized with `settings: Settings`
|
||||
- Uses sync `openai.OpenAI(base_url=settings.llm_api_url, api_key=settings.llm_api_key)`
|
||||
- Method `complete(system_prompt: str, user_prompt: str, response_model: type[BaseModel] | None = None) -> str` that:
|
||||
a. Tries primary endpoint with `response_format={'type': 'json_object'}` if response_model is provided
|
||||
b. On connection/timeout error, tries fallback endpoint (`openai.OpenAI(base_url=settings.llm_fallback_url)` with `settings.llm_fallback_model`)
|
||||
c. Returns the raw completion text
|
||||
d. Logs WARNING on fallback, ERROR on total failure
|
||||
- Method `parse_response(text: str, model: type[T]) -> T` that validates JSON via Pydantic `model_validate_json()` with error handling
|
||||
- Catch `openai.APIConnectionError`, `openai.APITimeoutError` for fallback trigger
|
||||
- IMPORTANT: Use sync `openai.OpenAI`, not async — Celery tasks run in sync context
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] Settings has all 12 new fields with correct defaults
|
||||
- [ ] `openai`, `qdrant-client`, `pyyaml` in requirements.txt
|
||||
- [ ] `celery -A worker worker` is syntactically valid (worker.py creates Celery app and imports pipeline.stages)
|
||||
- [ ] LLMClient uses sync openai.OpenAI, not AsyncOpenAI
|
||||
- [ ] LLMClient has primary/fallback logic with proper exception handling
|
||||
- [ ] All 8 Pydantic schema classes defined with correct field types
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` prints defaults
|
||||
- `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` exits 0
|
||||
- `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` exits 0
|
||||
- `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt`
|
||||
|
||||
## Inputs
|
||||
|
||||
- ``backend/config.py` — existing Settings class to extend`
|
||||
- ``backend/requirements.txt` — existing dependencies to append to`
|
||||
- ``backend/database.py` — provides engine/session patterns used by worker`
|
||||
- ``.env.example` — reference for env var names and defaults`
|
||||
|
||||
## Expected Output
|
||||
|
||||
- ``backend/config.py` — Settings extended with 12 new LLM/embedding/Qdrant/prompt/review fields`
|
||||
- ``backend/requirements.txt` — openai, qdrant-client, pyyaml added`
|
||||
- ``backend/worker.py` — Celery app instance configured with Redis broker`
|
||||
- ``backend/pipeline/__init__.py` — empty package init`
|
||||
- ``backend/pipeline/schemas.py` — 8 Pydantic models for pipeline stage I/O`
|
||||
- ``backend/pipeline/llm_client.py` — LLMClient with sync OpenAI primary/fallback`
|
||||
|
||||
## Verification
|
||||
|
||||
cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)" && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')" && python -c "from pipeline.llm_client import LLMClient; print('client ok')" && python -c "from worker import celery_app; print(celery_app.main)"
|
||||
92
.gsd/milestones/M001/slices/S03/tasks/T01-SUMMARY.md
Normal file
92
.gsd/milestones/M001/slices/S03/tasks/T01-SUMMARY.md
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
---
|
||||
id: T01
|
||||
parent: S03
|
||||
milestone: M001
|
||||
provides: []
|
||||
requires: []
|
||||
affects: []
|
||||
key_files: ["backend/config.py", "backend/worker.py", "backend/pipeline/schemas.py", "backend/pipeline/llm_client.py", "backend/requirements.txt", "backend/pipeline/__init__.py", "backend/pipeline/stages.py"]
|
||||
key_decisions: ["Used sync openai.OpenAI (not Async) since Celery tasks run synchronously", "LLMClient catches APIConnectionError and APITimeoutError for fallback; other errors propagate immediately", "Created pipeline/stages.py stub so worker.py import chain works ahead of T02"]
|
||||
patterns_established: []
|
||||
drill_down_paths: []
|
||||
observability_surfaces: []
|
||||
duration: ""
|
||||
verification_result: "All 5 verification checks passed: Settings prints correct defaults (llm_api_url, qdrant_url, review_mode), all 4 schema classes import successfully, LLMClient imports clean, celery_app.main prints 'chrysopedia', and grep confirms openai/qdrant-client in requirements.txt. Additional validation confirmed all 12 config fields present, all 8 schema classes importable, sync OpenAI usage (not Async), and fallback exception handling wired."
|
||||
completed_at: 2026-03-29T22:30:25.116Z
|
||||
blocker_discovered: false
|
||||
---
|
||||
|
||||
# T01: Extended Settings with 12 LLM/embedding/Qdrant config fields, created Celery app, built sync LLMClient with primary/fallback logic, and defined 8 Pydantic schemas for pipeline stages 2-5
|
||||
|
||||
> Extended Settings with 12 LLM/embedding/Qdrant config fields, created Celery app, built sync LLMClient with primary/fallback logic, and defined 8 Pydantic schemas for pipeline stages 2-5
|
||||
|
||||
## What Happened
|
||||
---
|
||||
id: T01
|
||||
parent: S03
|
||||
milestone: M001
|
||||
key_files:
|
||||
- backend/config.py
|
||||
- backend/worker.py
|
||||
- backend/pipeline/schemas.py
|
||||
- backend/pipeline/llm_client.py
|
||||
- backend/requirements.txt
|
||||
- backend/pipeline/__init__.py
|
||||
- backend/pipeline/stages.py
|
||||
key_decisions:
|
||||
- Used sync openai.OpenAI (not Async) since Celery tasks run synchronously
|
||||
- LLMClient catches APIConnectionError and APITimeoutError for fallback; other errors propagate immediately
|
||||
- Created pipeline/stages.py stub so worker.py import chain works ahead of T02
|
||||
duration: ""
|
||||
verification_result: passed
|
||||
completed_at: 2026-03-29T22:30:25.116Z
|
||||
blocker_discovered: false
|
||||
---
|
||||
|
||||
# T01: Extended Settings with 12 LLM/embedding/Qdrant config fields, created Celery app, built sync LLMClient with primary/fallback logic, and defined 8 Pydantic schemas for pipeline stages 2-5
|
||||
|
||||
**Extended Settings with 12 LLM/embedding/Qdrant config fields, created Celery app, built sync LLMClient with primary/fallback logic, and defined 8 Pydantic schemas for pipeline stages 2-5**
|
||||
|
||||
## What Happened
|
||||
|
||||
Added openai, qdrant-client, and pyyaml to requirements.txt. Extended the Settings class with 12 new fields covering LLM endpoints (primary + fallback), embedding config, Qdrant connection, prompt template path, and review mode toggle. Created the Celery app in worker.py using Redis as broker/backend. Built the pipeline package with 8 Pydantic schemas matching the DB column types, and an LLMClient using sync openai.OpenAI with primary/fallback completion logic that catches APIConnectionError and APITimeoutError for fallback trigger. Created a stub pipeline/stages.py so the worker import chain succeeds ahead of T02.
|
||||
|
||||
## Verification
|
||||
|
||||
All 5 verification checks passed: Settings prints correct defaults (llm_api_url, qdrant_url, review_mode), all 4 schema classes import successfully, LLMClient imports clean, celery_app.main prints 'chrysopedia', and grep confirms openai/qdrant-client in requirements.txt. Additional validation confirmed all 12 config fields present, all 8 schema classes importable, sync OpenAI usage (not Async), and fallback exception handling wired.
|
||||
|
||||
## Verification Evidence
|
||||
|
||||
| # | Command | Exit Code | Verdict | Duration |
|
||||
|---|---------|-----------|---------|----------|
|
||||
| 1 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 500ms |
|
||||
| 2 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 400ms |
|
||||
| 3 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 500ms |
|
||||
| 4 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 500ms |
|
||||
| 5 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
|
||||
|
||||
|
||||
## Deviations
|
||||
|
||||
Created backend/pipeline/stages.py as a stub module so worker.py import chain succeeds. Not in the task plan but required for worker.py to be importable.
|
||||
|
||||
## Known Issues
|
||||
|
||||
None.
|
||||
|
||||
## Files Created/Modified
|
||||
|
||||
- `backend/config.py`
|
||||
- `backend/worker.py`
|
||||
- `backend/pipeline/schemas.py`
|
||||
- `backend/pipeline/llm_client.py`
|
||||
- `backend/requirements.txt`
|
||||
- `backend/pipeline/__init__.py`
|
||||
- `backend/pipeline/stages.py`
|
||||
|
||||
|
||||
## Deviations
|
||||
Created backend/pipeline/stages.py as a stub module so worker.py import chain succeeds. Not in the task plan but required for worker.py to be importable.
|
||||
|
||||
## Known Issues
|
||||
None.
|
||||
90
.gsd/milestones/M001/slices/S03/tasks/T02-PLAN.md
Normal file
90
.gsd/milestones/M001/slices/S03/tasks/T02-PLAN.md
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
---
|
||||
estimated_steps: 43
|
||||
estimated_files: 6
|
||||
skills_used: []
|
||||
---
|
||||
|
||||
# T02: Prompt templates and pipeline stages 2-5 with orchestrator
|
||||
|
||||
Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
|
||||
|
||||
## Failure Modes
|
||||
|
||||
| Dependency | On error | On timeout | On malformed response |
|
||||
|------------|----------|-----------|----------------------|
|
||||
| LLM API (primary) | Fall back to secondary endpoint | Fall back to secondary endpoint | Log raw response, retry once with 'output valid JSON' nudge, then fail stage |
|
||||
| PostgreSQL | Task fails, Celery retries (max 3) | Task fails, Celery retries | N/A |
|
||||
| Prompt template file | Task fails immediately with FileNotFoundError logged | N/A | N/A |
|
||||
|
||||
## Negative Tests
|
||||
|
||||
- **Malformed inputs**: LLM returns non-JSON text → caught by parse_response, logged with raw text
|
||||
- **Error paths**: LLM returns valid JSON but wrong schema → Pydantic ValidationError caught, logged
|
||||
- **Boundary conditions**: Empty transcript (0 segments) → stage 2 returns empty segmentation, stages 3-5 skip gracefully
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `prompts/stage2_segmentation.txt`: System prompt instructing the LLM to analyze transcript segments and identify topic boundaries. Input: full transcript text with segment indices. Output: JSON matching `SegmentationResult` schema — list of topic segments with start_index, end_index, topic_label, summary. Use XML-style `<transcript>` tags to fence user content from instructions.
|
||||
|
||||
2. Create `prompts/stage3_extraction.txt`: System prompt for extracting key moments from a topic segment. Input: segment text with timestamps. Output: JSON matching `ExtractionResult` — list of moments with title, summary, timestamps, content_type (technique/settings/reasoning/workflow), plugins mentioned, raw_transcript excerpt.
|
||||
|
||||
3. Create `prompts/stage4_classification.txt`: System prompt for classifying key moments against the canonical tag taxonomy. Input: moment title + summary + list of canonical categories/sub-topics from `canonical_tags.yaml`. Output: JSON matching `ClassificationResult` — topic_category, topic_tags for each moment.
|
||||
|
||||
4. Create `prompts/stage5_synthesis.txt`: System prompt for synthesizing a technique page from key moments. Input: all moments for a creator+topic group. Output: JSON matching `SynthesisResult` — title, slug, summary, body_sections (structured prose with sub-aspects), signal_chains, plugins, source_quality assessment.
|
||||
|
||||
5. Create `backend/pipeline/stages.py` with these components:
|
||||
a. Helper `_load_prompt(template_name: str) -> str` — reads from `settings.prompts_path / template_name`
|
||||
b. Helper `_get_sync_session()` — creates a sync SQLAlchemy session for Celery tasks using `create_engine()` (sync, not async) with the DATABASE_URL converted from `postgresql+asyncpg://` to `postgresql+psycopg2://` (or use `sqlalchemy.create_engine` with sync URL). **CRITICAL**: Celery is sync, so use `sqlalchemy.orm.Session`, not async.
|
||||
c. `@celery_app.task(bind=True, max_retries=3)` for `stage2_segmentation(self, video_id: str)`: Load transcript segments from DB ordered by segment_index, build prompt with full text, call LLM, parse SegmentationResult, update `topic_label` on each TranscriptSegment row.
|
||||
d. `@celery_app.task(bind=True, max_retries=3)` for `stage3_extraction(self, video_id: str)`: Group segments by topic_label, for each group call LLM to extract moments, create KeyMoment rows in DB, set `processing_status = extracted` on SourceVideo.
|
||||
e. `@celery_app.task(bind=True, max_retries=3)` for `stage4_classification(self, video_id: str)`: Load key moments, load canonical tags from `config/canonical_tags.yaml`, call LLM to classify, update topic_tags on moments. Stage 4 does NOT change processing_status.
|
||||
f. `@celery_app.task(bind=True, max_retries=3)` for `stage5_synthesis(self, video_id: str)`: Group key moments by (creator, topic_category), for each group call LLM to synthesize, create/update TechniquePage rows with body_sections, signal_chains, plugins, topic_tags, summary. Link KeyMoments to their TechniquePage via technique_page_id. Set `processing_status = reviewed` (or `published` if `settings.review_mode is False`).
|
||||
g. `@celery_app.task` for `run_pipeline(video_id: str)`: Check current processing_status, chain the appropriate stages (e.g., if `transcribed`, chain 2→3→4→5; if `extracted`, chain 4→5). Use `celery.chain()` for sequential execution.
|
||||
|
||||
6. Import `stages` module in `worker.py` to register tasks (ensure the import line `from pipeline import stages` is present after celery_app creation).
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] 4 prompt template files in `prompts/` with clear instruction/data separation
|
||||
- [ ] All 5 stage tasks + run_pipeline orchestrator defined in stages.py
|
||||
- [ ] Celery tasks use SYNC SQLAlchemy sessions (not async)
|
||||
- [ ] Stage 2 updates topic_label on TranscriptSegment rows
|
||||
- [ ] Stage 3 creates KeyMoment rows and sets processing_status=extracted
|
||||
- [ ] Stage 4 loads canonical_tags.yaml and classifies moments
|
||||
- [ ] Stage 5 creates TechniquePage rows and sets processing_status=reviewed/published
|
||||
- [ ] run_pipeline handles resumability based on current processing_status
|
||||
- [ ] Prompts fence user content with XML-style tags
|
||||
|
||||
## Verification
|
||||
|
||||
- `test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt` — all 4 prompt files exist
|
||||
- `cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"` — imports succeed
|
||||
- `cd backend && python -c "from worker import celery_app; print([t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t])"` — shows registered tasks
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: INFO log at start/end of each stage with video_id and duration, WARNING on LLM fallback, ERROR on parse failure with raw response excerpt
|
||||
- How a future agent inspects this: query `source_videos.processing_status` to see pipeline progress, check Celery worker logs for stage timing
|
||||
- Failure state exposed: video stays at last successful processing_status, error logged with stage name and video_id
|
||||
|
||||
## Inputs
|
||||
|
||||
- ``backend/pipeline/schemas.py` — Pydantic models for stage I/O from T01`
|
||||
- ``backend/pipeline/llm_client.py` — LLMClient with primary/fallback from T01`
|
||||
- ``backend/config.py` — Settings with prompts_path, review_mode, llm_* fields from T01`
|
||||
- ``backend/worker.py` — Celery app instance from T01`
|
||||
- ``backend/models.py` — ORM models (TranscriptSegment, KeyMoment, TechniquePage, SourceVideo, ProcessingStatus)`
|
||||
- ``config/canonical_tags.yaml` — canonical tag taxonomy for stage 4 classification`
|
||||
|
||||
## Expected Output
|
||||
|
||||
- ``prompts/stage2_segmentation.txt` — topic boundary detection prompt template`
|
||||
- ``prompts/stage3_extraction.txt` — key moment extraction prompt template`
|
||||
- ``prompts/stage4_classification.txt` — classification/tagging prompt template`
|
||||
- ``prompts/stage5_synthesis.txt` — technique page synthesis prompt template`
|
||||
- ``backend/pipeline/stages.py` — 5 Celery tasks (stages 2-5 + run_pipeline orchestrator)`
|
||||
- ``backend/worker.py` — updated with pipeline.stages import for task registration`
|
||||
|
||||
## Verification
|
||||
|
||||
test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt && cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"
|
||||
81
.gsd/milestones/M001/slices/S03/tasks/T03-PLAN.md
Normal file
81
.gsd/milestones/M001/slices/S03/tasks/T03-PLAN.md
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
---
|
||||
estimated_steps: 43
|
||||
estimated_files: 3
|
||||
skills_used: []
|
||||
---
|
||||
|
||||
# T03: Qdrant integration and embedding client
|
||||
|
||||
Create the embedding client (sync OpenAI-compatible /v1/embeddings) and Qdrant client (collection management + upsert). Wire embedding generation into the pipeline after stage 5 synthesis, so technique page summaries and key moment summaries are embedded and upserted to Qdrant with metadata payloads.
|
||||
|
||||
## Failure Modes
|
||||
|
||||
| Dependency | On error | On timeout | On malformed response |
|
||||
|------------|----------|-----------|----------------------|
|
||||
| Embedding API | Log error, skip embedding for this batch, pipeline continues | Same as error | Validate vector dimensions match config, log mismatch |
|
||||
| Qdrant server | Log error, skip upsert, pipeline continues (embeddings are not blocking) | Same as error | N/A |
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `backend/pipeline/embedding_client.py`:
|
||||
- Class `EmbeddingClient` initialized with `settings: Settings`
|
||||
- Uses sync `openai.OpenAI(base_url=settings.embedding_api_url, api_key=settings.llm_api_key)`
|
||||
- Method `embed(texts: list[str]) -> list[list[float]]` that calls `client.embeddings.create(model=settings.embedding_model, input=texts)` and returns the vector list
|
||||
- Handle `openai.APIConnectionError` gracefully — log and return empty list
|
||||
- Validate returned vector dimensions match `settings.embedding_dimensions`
|
||||
|
||||
2. Create `backend/pipeline/qdrant_client.py`:
|
||||
- Class `QdrantManager` initialized with `settings: Settings`
|
||||
- Uses sync `qdrant_client.QdrantClient(url=settings.qdrant_url)`
|
||||
- Method `ensure_collection()` — checks `collection_exists()`, creates with `VectorParams(size=settings.embedding_dimensions, distance=Distance.COSINE)` if not
|
||||
- Method `upsert_points(points: list[PointStruct])` — wraps `client.upsert()`
|
||||
- Method `upsert_technique_pages(pages: list[dict], vectors: list[list[float]])` — builds PointStruct list with payload (page_id, creator_id, title, topic_category, topic_tags, summary) and upserts
|
||||
- Method `upsert_key_moments(moments: list[dict], vectors: list[list[float]])` — builds PointStruct list with payload (moment_id, source_video_id, title, start_time, end_time, content_type) and upserts
|
||||
- Handle `qdrant_client.http.exceptions.UnexpectedResponse` for connection errors
|
||||
|
||||
3. Add a new Celery task `stage6_embed_and_index(video_id: str)` in `backend/pipeline/stages.py`:
|
||||
- Load all KeyMoments and TechniquePages created for this video
|
||||
- Build text strings for embedding (moment: title + summary; page: title + summary + topic_category)
|
||||
- Call EmbeddingClient.embed() to get vectors
|
||||
- Call QdrantManager.ensure_collection() then upsert_technique_pages() and upsert_key_moments()
|
||||
- This stage runs after stage 5 in the pipeline chain but does NOT update processing_status (it's a side-effect)
|
||||
- If embedding/Qdrant fails, log error but don't fail the pipeline — embeddings can be regenerated later
|
||||
|
||||
4. Update `run_pipeline` in `stages.py` to append `stage6_embed_and_index` to the chain.
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] EmbeddingClient uses sync openai.OpenAI for /v1/embeddings calls
|
||||
- [ ] QdrantManager creates collection only if not exists
|
||||
- [ ] Qdrant points include metadata payloads (page_id/moment_id, creator_id, topic, timestamps)
|
||||
- [ ] stage6 is non-blocking — embedding/Qdrant failures don't fail the pipeline
|
||||
- [ ] Vector dimension from config, not hardcoded
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')"` exits 0
|
||||
- `cd backend && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"` exits 0
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: INFO log for embedding batch size and Qdrant upsert count, WARNING on embedding/Qdrant failures with error details
|
||||
- How a future agent inspects this: check Qdrant dashboard at QDRANT_URL, query collection point count
|
||||
- Failure state exposed: embedding failures logged but pipeline completes successfully
|
||||
|
||||
## Inputs
|
||||
|
||||
- ``backend/pipeline/stages.py` — pipeline stages from T02 to add stage6 and update run_pipeline`
|
||||
- ``backend/config.py` — Settings with embedding_api_url, embedding_model, embedding_dimensions, qdrant_url, qdrant_collection from T01`
|
||||
- ``backend/pipeline/schemas.py` — Pydantic schemas from T01`
|
||||
- ``backend/models.py` — KeyMoment and TechniquePage ORM models`
|
||||
|
||||
## Expected Output
|
||||
|
||||
- ``backend/pipeline/embedding_client.py` — EmbeddingClient with sync OpenAI embeddings`
|
||||
- ``backend/pipeline/qdrant_client.py` — QdrantManager with collection management and upsert`
|
||||
- ``backend/pipeline/stages.py` — updated with stage6_embed_and_index task and run_pipeline chain`
|
||||
|
||||
## Verification
|
||||
|
||||
cd backend && python -c "from pipeline.embedding_client import EmbeddingClient; print('embed ok')" && python -c "from pipeline.qdrant_client import QdrantManager; print('qdrant ok')" && python -c "from pipeline.stages import stage6_embed_and_index; print('stage6 ok')"
|
||||
54
.gsd/milestones/M001/slices/S03/tasks/T04-PLAN.md
Normal file
54
.gsd/milestones/M001/slices/S03/tasks/T04-PLAN.md
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
---
|
||||
estimated_steps: 21
|
||||
estimated_files: 3
|
||||
skills_used: []
|
||||
---
|
||||
|
||||
# T04: Wire ingest-to-pipeline trigger and add manual re-trigger endpoint
|
||||
|
||||
Connect the existing ingest endpoint to the pipeline by dispatching `run_pipeline.delay()` after successful commit, and add a manual re-trigger API endpoint for re-processing videos.
|
||||
|
||||
## Steps
|
||||
|
||||
1. Modify `backend/routers/ingest.py`:
|
||||
- After the `await db.commit()` and refresh calls (after the successful response is ready), dispatch the pipeline: `from pipeline.stages import run_pipeline; run_pipeline.delay(str(video.id))`
|
||||
- Wrap the dispatch in try/except to avoid failing the ingest response if Redis/Celery is down — log WARNING on dispatch failure
|
||||
- The dispatch must happen AFTER commit so the worker can find the data in PostgreSQL
|
||||
|
||||
2. Create `backend/routers/pipeline.py`:
|
||||
- Router with `prefix='/pipeline'`, tag `['pipeline']`
|
||||
- `POST /trigger/{video_id}`: Look up SourceVideo by id, verify it exists (404 if not), dispatch `run_pipeline.delay(str(video_id))`, return `{"status": "triggered", "video_id": str(video_id), "current_processing_status": video.processing_status.value}`
|
||||
- Uses `get_session` dependency for DB access
|
||||
|
||||
3. Mount the pipeline router in `backend/main.py` under `/api/v1`.
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] Ingest endpoint dispatches run_pipeline.delay() after commit
|
||||
- [ ] Pipeline dispatch failure does not fail the ingest response
|
||||
- [ ] POST /api/v1/pipeline/trigger/{video_id} exists and returns status
|
||||
- [ ] 404 returned for non-existent video_id
|
||||
- [ ] Pipeline router mounted in main.py
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])"` shows `['/trigger/{video_id}']`
|
||||
- `grep -q 'pipeline' backend/main.py` exits 0
|
||||
- `grep -q 'run_pipeline' backend/routers/ingest.py` exits 0
|
||||
|
||||
## Inputs
|
||||
|
||||
- ``backend/routers/ingest.py` — existing ingest endpoint to add pipeline dispatch to`
|
||||
- ``backend/main.py` — existing app to mount new router`
|
||||
- ``backend/pipeline/stages.py` — run_pipeline task from T02`
|
||||
- ``backend/models.py` — SourceVideo model for lookup`
|
||||
|
||||
## Expected Output
|
||||
|
||||
- ``backend/routers/ingest.py` — updated with run_pipeline.delay() dispatch after commit`
|
||||
- ``backend/routers/pipeline.py` — new router with POST /trigger/{video_id} endpoint`
|
||||
- ``backend/main.py` — updated with pipeline router mount`
|
||||
|
||||
## Verification
|
||||
|
||||
cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])" && grep -q 'pipeline' backend/main.py && grep -q 'run_pipeline' backend/routers/ingest.py
|
||||
86
.gsd/milestones/M001/slices/S03/tasks/T05-PLAN.md
Normal file
86
.gsd/milestones/M001/slices/S03/tasks/T05-PLAN.md
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
---
|
||||
estimated_steps: 42
|
||||
estimated_files: 3
|
||||
skills_used: []
|
||||
---
|
||||
|
||||
# T05: Integration tests for pipeline, embedding, Qdrant, and trigger endpoints
|
||||
|
||||
Write comprehensive integration tests with mocked LLM and Qdrant that verify the entire pipeline flow. Tests use the existing pytest-asyncio infrastructure from S02, extended with mocks for the OpenAI client and Qdrant client.
|
||||
|
||||
## Negative Tests
|
||||
|
||||
- **Malformed inputs**: LLM returns invalid JSON → pipeline handles gracefully, logs error
|
||||
- **Error paths**: LLM primary endpoint connection refused → falls back to secondary
|
||||
- **Boundary conditions**: Video with 0 segments → pipeline completes without errors, no moments created
|
||||
|
||||
## Steps
|
||||
|
||||
1. Create `backend/tests/fixtures/mock_llm_responses.py`:
|
||||
- Dictionary mapping stage names to valid JSON response strings matching each Pydantic schema
|
||||
- Stage 2: SegmentationResult with 2 topic segments covering the 5 sample transcript segments
|
||||
- Stage 3: ExtractionResult with 2 key moments (one technique, one settings)
|
||||
- Stage 4: ClassificationResult mapping moments to canonical tags
|
||||
- Stage 5: SynthesisResult with a technique page including body_sections, signal_chains, plugins
|
||||
- Embedding response: list of 768-dimensional vectors (can be random floats for testing)
|
||||
|
||||
2. Create `backend/tests/test_pipeline.py` with these tests using `unittest.mock.patch` on `openai.OpenAI` and `qdrant_client.QdrantClient`:
|
||||
a. `test_stage2_segmentation_updates_topic_labels`: Ingest sample transcript, run stage2 with mocked LLM, verify TranscriptSegment rows have topic_label set
|
||||
b. `test_stage3_extraction_creates_key_moments`: Run stages 2+3, verify KeyMoment rows created with correct fields, processing_status = extracted
|
||||
c. `test_stage4_classification_assigns_tags`: Run stages 2+3+4, verify moments have topic_category and topic_tags from canonical list
|
||||
d. `test_stage5_synthesis_creates_technique_pages`: Run full pipeline stages 2-5, verify TechniquePage rows created with body_sections, signal_chains, summary. Verify KeyMoments linked to TechniquePage via technique_page_id
|
||||
e. `test_stage6_embeds_and_upserts_to_qdrant`: Run full pipeline, verify EmbeddingClient.embed called with correct texts, QdrantManager.upsert called with correct payloads
|
||||
f. `test_run_pipeline_resumes_from_extracted`: Set video processing_status to extracted, run pipeline, verify only stages 4+5+6 execute (not 2+3)
|
||||
g. `test_pipeline_trigger_endpoint`: POST to /api/v1/pipeline/trigger/{video_id} with mocked Celery delay, verify 200 response
|
||||
h. `test_pipeline_trigger_404_for_missing_video`: POST to /api/v1/pipeline/trigger/{nonexistent_id}, verify 404
|
||||
i. `test_ingest_dispatches_pipeline`: POST transcript to ingest endpoint with mocked run_pipeline.delay, verify delay was called with video_id
|
||||
j. `test_llm_fallback_on_primary_failure`: Mock primary endpoint to raise APIConnectionError, verify fallback endpoint is called
|
||||
|
||||
IMPORTANT: Pipeline stages use SYNC SQLAlchemy (not async). Tests that call stage functions directly need a real PostgreSQL test database. Use the existing conftest.py pattern but create sync engine/sessions for pipeline stage calls. Mock the LLM and Qdrant clients, not the DB.
|
||||
|
||||
3. Update `backend/tests/conftest.py` to add:
|
||||
- A fixture that creates a sync SQLAlchemy engine pointing to the test database (for pipeline stage tests)
|
||||
- A fixture that pre-ingests a sample transcript and returns the video_id (for pipeline tests to use)
|
||||
- Any necessary mock fixtures for LLMClient and QdrantManager
|
||||
|
||||
## Must-Haves
|
||||
|
||||
- [ ] 10 integration tests covering all pipeline stages, trigger endpoints, resumability, and fallback
|
||||
- [ ] All LLM calls mocked with realistic response fixtures
|
||||
- [ ] Qdrant calls mocked — no real Qdrant needed
|
||||
- [ ] Pipeline stage tests use sync DB sessions (matching production Celery behavior)
|
||||
- [ ] At least one negative test (LLM fallback on primary failure)
|
||||
- [ ] All tests pass with `python -m pytest tests/test_pipeline.py -v`
|
||||
|
||||
## Verification
|
||||
|
||||
- `cd backend && python -m pytest tests/test_pipeline.py -v` — all tests pass
|
||||
- `cd backend && python -m pytest tests/ -v` — all tests (including S02 ingest tests) still pass
|
||||
|
||||
## Observability Impact
|
||||
|
||||
- Signals added: test output shows per-test pass/fail with timing, mock call assertions verify logging would fire in production
|
||||
- How a future agent inspects this: run `pytest tests/test_pipeline.py -v` to see all pipeline test results
|
||||
|
||||
## Inputs
|
||||
|
||||
- ``backend/pipeline/stages.py` — all pipeline stages from T02+T03`
|
||||
- ``backend/pipeline/llm_client.py` — LLMClient to mock from T01`
|
||||
- ``backend/pipeline/embedding_client.py` — EmbeddingClient to mock from T03`
|
||||
- ``backend/pipeline/qdrant_client.py` — QdrantManager to mock from T03`
|
||||
- ``backend/pipeline/schemas.py` — Pydantic models for response fixtures from T01`
|
||||
- ``backend/routers/pipeline.py` — trigger endpoint from T04`
|
||||
- ``backend/routers/ingest.py` — updated ingest with pipeline dispatch from T04`
|
||||
- ``backend/tests/conftest.py` — existing test infrastructure from S02`
|
||||
- ``backend/tests/fixtures/sample_transcript.json` — sample data from S02`
|
||||
- ``backend/models.py` — ORM models for DB verification`
|
||||
|
||||
## Expected Output
|
||||
|
||||
- ``backend/tests/fixtures/mock_llm_responses.py` — mock LLM response fixtures for all stages`
|
||||
- ``backend/tests/test_pipeline.py` — 10 integration tests for full pipeline flow`
|
||||
- ``backend/tests/conftest.py` — extended with sync engine fixture and pre-ingest fixture`
|
||||
|
||||
## Verification
|
||||
|
||||
cd backend && python -m pytest tests/test_pipeline.py -v && python -m pytest tests/ -v
|
||||
|
|
@ -26,6 +26,28 @@ class Settings(BaseSettings):
|
|||
# CORS
|
||||
cors_origins: list[str] = ["*"]
|
||||
|
||||
# LLM endpoint (OpenAI-compatible)
|
||||
llm_api_url: str = "http://localhost:11434/v1"
|
||||
llm_api_key: str = "sk-placeholder"
|
||||
llm_model: str = "qwen2.5:14b-q8_0"
|
||||
llm_fallback_url: str = "http://localhost:11434/v1"
|
||||
llm_fallback_model: str = "qwen2.5:14b-q8_0"
|
||||
|
||||
# Embedding endpoint
|
||||
embedding_api_url: str = "http://localhost:11434/v1"
|
||||
embedding_model: str = "nomic-embed-text"
|
||||
embedding_dimensions: int = 768
|
||||
|
||||
# Qdrant
|
||||
qdrant_url: str = "http://localhost:6333"
|
||||
qdrant_collection: str = "chrysopedia"
|
||||
|
||||
# Prompt templates
|
||||
prompts_path: str = "./prompts"
|
||||
|
||||
# Review mode — when True, extracted moments go to review queue before publishing
|
||||
review_mode: bool = True
|
||||
|
||||
# File storage
|
||||
transcript_storage_path: str = "/data/transcripts"
|
||||
video_metadata_path: str = "/data/video_meta"
|
||||
|
|
|
|||
0
backend/pipeline/__init__.py
Normal file
0
backend/pipeline/__init__.py
Normal file
136
backend/pipeline/llm_client.py
Normal file
136
backend/pipeline/llm_client.py
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
"""Synchronous LLM client with primary/fallback endpoint logic.
|
||||
|
||||
Uses the OpenAI-compatible API (works with Ollama, vLLM, OpenWebUI, etc.).
|
||||
Celery tasks run synchronously, so this uses ``openai.OpenAI`` (not Async).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TypeVar
|
||||
|
||||
import openai
|
||||
from pydantic import BaseModel
|
||||
|
||||
from config import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
class LLMClient:
|
||||
"""Sync LLM client that tries a primary endpoint and falls back on failure."""
|
||||
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self._primary = openai.OpenAI(
|
||||
base_url=settings.llm_api_url,
|
||||
api_key=settings.llm_api_key,
|
||||
)
|
||||
self._fallback = openai.OpenAI(
|
||||
base_url=settings.llm_fallback_url,
|
||||
api_key=settings.llm_api_key,
|
||||
)
|
||||
|
||||
# ── Core completion ──────────────────────────────────────────────────
|
||||
|
||||
def complete(
|
||||
self,
|
||||
system_prompt: str,
|
||||
user_prompt: str,
|
||||
response_model: type[BaseModel] | None = None,
|
||||
) -> str:
|
||||
"""Send a chat completion request, falling back on connection/timeout errors.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
system_prompt:
|
||||
System message content.
|
||||
user_prompt:
|
||||
User message content.
|
||||
response_model:
|
||||
If provided, ``response_format`` is set to ``{"type": "json_object"}``
|
||||
so the LLM returns parseable JSON.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Raw completion text from the model.
|
||||
"""
|
||||
kwargs: dict = {}
|
||||
if response_model is not None:
|
||||
kwargs["response_format"] = {"type": "json_object"}
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
]
|
||||
|
||||
# --- Try primary endpoint ---
|
||||
try:
|
||||
response = self._primary.chat.completions.create(
|
||||
model=self.settings.llm_model,
|
||||
messages=messages,
|
||||
**kwargs,
|
||||
)
|
||||
return response.choices[0].message.content or ""
|
||||
|
||||
except (openai.APIConnectionError, openai.APITimeoutError) as exc:
|
||||
logger.warning(
|
||||
"Primary LLM endpoint failed (%s: %s), trying fallback at %s",
|
||||
type(exc).__name__,
|
||||
exc,
|
||||
self.settings.llm_fallback_url,
|
||||
)
|
||||
|
||||
# --- Try fallback endpoint ---
|
||||
try:
|
||||
response = self._fallback.chat.completions.create(
|
||||
model=self.settings.llm_fallback_model,
|
||||
messages=messages,
|
||||
**kwargs,
|
||||
)
|
||||
return response.choices[0].message.content or ""
|
||||
|
||||
except (openai.APIConnectionError, openai.APITimeoutError, openai.APIError) as exc:
|
||||
logger.error(
|
||||
"Fallback LLM endpoint also failed (%s: %s). Giving up.",
|
||||
type(exc).__name__,
|
||||
exc,
|
||||
)
|
||||
raise
|
||||
|
||||
# ── Response parsing ─────────────────────────────────────────────────
|
||||
|
||||
def parse_response(self, text: str, model: type[T]) -> T:
|
||||
"""Parse raw LLM output as JSON and validate against a Pydantic model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text:
|
||||
Raw JSON string from the LLM.
|
||||
model:
|
||||
Pydantic model class to validate against.
|
||||
|
||||
Returns
|
||||
-------
|
||||
T
|
||||
Validated Pydantic model instance.
|
||||
|
||||
Raises
|
||||
------
|
||||
pydantic.ValidationError
|
||||
If the JSON doesn't match the schema.
|
||||
ValueError
|
||||
If the text is not valid JSON.
|
||||
"""
|
||||
try:
|
||||
return model.model_validate_json(text)
|
||||
except Exception:
|
||||
logger.error(
|
||||
"Failed to parse LLM response as %s. Response text: %.500s",
|
||||
model.__name__,
|
||||
text,
|
||||
)
|
||||
raise
|
||||
99
backend/pipeline/schemas.py
Normal file
99
backend/pipeline/schemas.py
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
"""Pydantic schemas for pipeline stage inputs and outputs.
|
||||
|
||||
Stage 2 — Segmentation: groups transcript segments by topic.
|
||||
Stage 3 — Extraction: extracts key moments from segments.
|
||||
Stage 4 — Classification: classifies moments by category/tags.
|
||||
Stage 5 — Synthesis: generates technique pages from classified moments.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
|
||||
|
||||
class TopicSegment(BaseModel):
|
||||
"""A contiguous group of transcript segments sharing a topic."""
|
||||
|
||||
start_index: int = Field(description="First transcript segment index in this group")
|
||||
end_index: int = Field(description="Last transcript segment index in this group (inclusive)")
|
||||
topic_label: str = Field(description="Short label describing the topic")
|
||||
summary: str = Field(description="Brief summary of what is discussed")
|
||||
|
||||
|
||||
class SegmentationResult(BaseModel):
|
||||
"""Full output of stage 2 (segmentation)."""
|
||||
|
||||
segments: list[TopicSegment]
|
||||
|
||||
|
||||
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
|
||||
|
||||
class ExtractedMoment(BaseModel):
|
||||
"""A single key moment extracted from a topic segment group."""
|
||||
|
||||
title: str = Field(description="Concise title for the moment")
|
||||
summary: str = Field(description="Detailed summary of the technique/concept")
|
||||
start_time: float = Field(description="Start time in seconds")
|
||||
end_time: float = Field(description="End time in seconds")
|
||||
content_type: str = Field(description="One of: technique, settings, reasoning, workflow")
|
||||
plugins: list[str] = Field(default_factory=list, description="Plugins/tools mentioned")
|
||||
raw_transcript: str = Field(default="", description="Raw transcript text for this moment")
|
||||
|
||||
|
||||
class ExtractionResult(BaseModel):
|
||||
"""Full output of stage 3 (extraction)."""
|
||||
|
||||
moments: list[ExtractedMoment]
|
||||
|
||||
|
||||
# ── Stage 4: Classification ─────────────────────────────────────────────────
|
||||
|
||||
class ClassifiedMoment(BaseModel):
|
||||
"""Classification metadata for a single extracted moment."""
|
||||
|
||||
moment_index: int = Field(description="Index into ExtractionResult.moments")
|
||||
topic_category: str = Field(description="High-level topic category")
|
||||
topic_tags: list[str] = Field(default_factory=list, description="Specific topic tags")
|
||||
content_type_override: str | None = Field(
|
||||
default=None,
|
||||
description="Override for content_type if classification disagrees with extraction",
|
||||
)
|
||||
|
||||
|
||||
class ClassificationResult(BaseModel):
|
||||
"""Full output of stage 4 (classification)."""
|
||||
|
||||
classifications: list[ClassifiedMoment]
|
||||
|
||||
|
||||
# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
|
||||
|
||||
class SynthesizedPage(BaseModel):
|
||||
"""A technique page synthesized from classified moments."""
|
||||
|
||||
title: str = Field(description="Page title")
|
||||
slug: str = Field(description="URL-safe slug")
|
||||
topic_category: str = Field(description="Primary topic category")
|
||||
topic_tags: list[str] = Field(default_factory=list, description="Associated tags")
|
||||
summary: str = Field(description="Page summary / overview paragraph")
|
||||
body_sections: dict = Field(
|
||||
default_factory=dict,
|
||||
description="Structured body content as section_name -> content mapping",
|
||||
)
|
||||
signal_chains: list[dict] = Field(
|
||||
default_factory=list,
|
||||
description="Signal chain descriptions (for audio/music production contexts)",
|
||||
)
|
||||
plugins: list[str] = Field(default_factory=list, description="Plugins/tools referenced")
|
||||
source_quality: str = Field(
|
||||
default="mixed",
|
||||
description="One of: structured, mixed, unstructured",
|
||||
)
|
||||
|
||||
|
||||
class SynthesisResult(BaseModel):
|
||||
"""Full output of stage 5 (synthesis)."""
|
||||
|
||||
pages: list[SynthesizedPage]
|
||||
5
backend/pipeline/stages.py
Normal file
5
backend/pipeline/stages.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
"""Pipeline stage tasks (stages 2-5).
|
||||
|
||||
Task implementations will be added in T02. This module must be importable
|
||||
so that ``worker.py`` can register Celery tasks.
|
||||
"""
|
||||
|
|
@ -10,6 +10,9 @@ redis>=5.0,<6.0
|
|||
python-dotenv>=1.0,<2.0
|
||||
python-multipart>=0.0.9,<1.0
|
||||
httpx>=0.27.0,<1.0
|
||||
openai>=1.0,<2.0
|
||||
qdrant-client>=1.9,<2.0
|
||||
pyyaml>=6.0,<7.0
|
||||
# Test dependencies
|
||||
pytest>=8.0,<10.0
|
||||
pytest-asyncio>=0.24,<1.0
|
||||
|
|
|
|||
32
backend/worker.py
Normal file
32
backend/worker.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
"""Celery application instance for the Chrysopedia pipeline.
|
||||
|
||||
Usage:
|
||||
celery -A worker worker --loglevel=info
|
||||
"""
|
||||
|
||||
from celery import Celery
|
||||
|
||||
from config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
celery_app = Celery(
|
||||
"chrysopedia",
|
||||
broker=settings.redis_url,
|
||||
backend=settings.redis_url,
|
||||
)
|
||||
|
||||
celery_app.conf.update(
|
||||
task_serializer="json",
|
||||
result_serializer="json",
|
||||
accept_content=["json"],
|
||||
timezone="UTC",
|
||||
enable_utc=True,
|
||||
task_track_started=True,
|
||||
task_acks_late=True,
|
||||
worker_prefetch_multiplier=1,
|
||||
)
|
||||
|
||||
# Import pipeline.stages so that @celery_app.task decorators register tasks.
|
||||
# This import must come after celery_app is defined.
|
||||
import pipeline.stages # noqa: E402, F401
|
||||
Loading…
Add table
Reference in a new issue