diff --git a/.gsd/milestones/M001/slices/S03/S03-PLAN.md b/.gsd/milestones/M001/slices/S03/S03-PLAN.md
index b1da6fa..8cf6f2e 100644
--- a/.gsd/milestones/M001/slices/S03/S03-PLAN.md
+++ b/.gsd/milestones/M001/slices/S03/S03-PLAN.md
@@ -70,7 +70,7 @@
- Estimate: 1.5h
- Files: backend/config.py, backend/requirements.txt, backend/worker.py, backend/pipeline/__init__.py, backend/pipeline/schemas.py, backend/pipeline/llm_client.py
- Verify: cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)" && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')" && python -c "from pipeline.llm_client import LLMClient; print('client ok')" && python -c "from worker import celery_app; print(celery_app.main)"
-- [ ] **T02: Prompt templates and pipeline stages 2-5 with orchestrator** — Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
+- [x] **T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing** — Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
## Failure Modes
diff --git a/.gsd/milestones/M001/slices/S03/tasks/T01-VERIFY.json b/.gsd/milestones/M001/slices/S03/tasks/T01-VERIFY.json
new file mode 100644
index 0000000..765e4c5
--- /dev/null
+++ b/.gsd/milestones/M001/slices/S03/tasks/T01-VERIFY.json
@@ -0,0 +1,16 @@
+{
+ "schemaVersion": 1,
+ "taskId": "T01",
+ "unitId": "M001/S03/T01",
+ "timestamp": 1774823431445,
+ "passed": true,
+ "discoverySource": "task-plan",
+ "checks": [
+ {
+ "command": "cd backend",
+ "exitCode": 0,
+ "durationMs": 5,
+ "verdict": "pass"
+ }
+ ]
+}
diff --git a/.gsd/milestones/M001/slices/S03/tasks/T02-SUMMARY.md b/.gsd/milestones/M001/slices/S03/tasks/T02-SUMMARY.md
new file mode 100644
index 0000000..e7e8929
--- /dev/null
+++ b/.gsd/milestones/M001/slices/S03/tasks/T02-SUMMARY.md
@@ -0,0 +1,92 @@
+---
+id: T02
+parent: S03
+milestone: M001
+provides: []
+requires: []
+affects: []
+key_files: ["prompts/stage2_segmentation.txt", "prompts/stage3_extraction.txt", "prompts/stage4_classification.txt", "prompts/stage5_synthesis.txt", "backend/pipeline/stages.py", "backend/requirements.txt"]
+key_decisions: ["Stage 4 classification data stored in Redis (chrysopedia:classification:{video_id} key, 24h TTL) because KeyMoment model lacks topic_tags/topic_category columns", "Added psycopg2-binary for sync SQLAlchemy sessions in Celery tasks", "_safe_parse_llm_response retries once with JSON nudge on malformed LLM output"]
+patterns_established: []
+drill_down_paths: []
+observability_surfaces: []
+duration: ""
+verification_result: "All 7 verification checks passed: 4 prompt files exist, all 5 stage functions import from pipeline.stages, worker shows all 5 registered tasks (stage2-5 + run_pipeline), Settings/schemas/LLMClient/celery_app imports still work, openai and qdrant-client confirmed in requirements.txt. Additional structural verification: sync SQLAlchemy session isinstance check passed, prompt loading works from project root, canonical_tags loading returns 6 categories, XML-style tags confirmed in all prompts."
+completed_at: 2026-03-29T22:35:57.629Z
+blocker_discovered: false
+---
+
+# T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
+
+> Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
+
+## What Happened
+---
+id: T02
+parent: S03
+milestone: M001
+key_files:
+ - prompts/stage2_segmentation.txt
+ - prompts/stage3_extraction.txt
+ - prompts/stage4_classification.txt
+ - prompts/stage5_synthesis.txt
+ - backend/pipeline/stages.py
+ - backend/requirements.txt
+key_decisions:
+ - Stage 4 classification data stored in Redis (chrysopedia:classification:{video_id} key, 24h TTL) because KeyMoment model lacks topic_tags/topic_category columns
+ - Added psycopg2-binary for sync SQLAlchemy sessions in Celery tasks
+ - _safe_parse_llm_response retries once with JSON nudge on malformed LLM output
+duration: ""
+verification_result: passed
+completed_at: 2026-03-29T22:35:57.629Z
+blocker_discovered: false
+---
+
+# T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
+
+**Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing**
+
+## What Happened
+
+Created 4 prompt template files in prompts/ with XML-style content fencing for clear instruction/data separation. Implemented backend/pipeline/stages.py with 5 Celery tasks: stage2_segmentation (topic boundary detection, updates topic_label on TranscriptSegment rows), stage3_extraction (key moment extraction, creates KeyMoment rows, sets status=extracted), stage4_classification (classifies against canonical_tags.yaml, stores results in Redis), stage5_synthesis (creates TechniquePage rows, links KeyMoments, sets status=reviewed/published based on review_mode), and run_pipeline orchestrator (checks current processing_status, chains only remaining stages for resumability). All tasks use sync SQLAlchemy sessions via psycopg2, have bind=True with max_retries=3, and include _safe_parse_llm_response with one-retry JSON nudge on malformed LLM output. Added psycopg2-binary to requirements.txt.
+
+## Verification
+
+All 7 verification checks passed: 4 prompt files exist, all 5 stage functions import from pipeline.stages, worker shows all 5 registered tasks (stage2-5 + run_pipeline), Settings/schemas/LLMClient/celery_app imports still work, openai and qdrant-client confirmed in requirements.txt. Additional structural verification: sync SQLAlchemy session isinstance check passed, prompt loading works from project root, canonical_tags loading returns 6 categories, XML-style tags confirmed in all prompts.
+
+## Verification Evidence
+
+| # | Command | Exit Code | Verdict | Duration |
+|---|---------|-----------|---------|----------|
+| 1 | `test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt` | 0 | ✅ pass | 50ms |
+| 2 | `cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"` | 0 | ✅ pass | 500ms |
+| 3 | `cd backend && python -c "from worker import celery_app; print([t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t])"` | 0 | ✅ pass | 500ms |
+| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 300ms |
+| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 300ms |
+| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 300ms |
+| 7 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
+
+
+## Deviations
+
+Stage 4 classification data stored in Redis instead of updating topic_tags on KeyMoment rows (model lacks those columns). Added psycopg2-binary to requirements.txt (not in plan but required for sync SQLAlchemy).
+
+## Known Issues
+
+None.
+
+## Files Created/Modified
+
+- `prompts/stage2_segmentation.txt`
+- `prompts/stage3_extraction.txt`
+- `prompts/stage4_classification.txt`
+- `prompts/stage5_synthesis.txt`
+- `backend/pipeline/stages.py`
+- `backend/requirements.txt`
+
+
+## Deviations
+Stage 4 classification data stored in Redis instead of updating topic_tags on KeyMoment rows (model lacks those columns). Added psycopg2-binary to requirements.txt (not in plan but required for sync SQLAlchemy).
+
+## Known Issues
+None.
diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py
index 073805e..68ddfdc 100644
--- a/backend/pipeline/stages.py
+++ b/backend/pipeline/stages.py
@@ -1,5 +1,642 @@
-"""Pipeline stage tasks (stages 2-5).
+"""Pipeline stage tasks (stages 2-5) and run_pipeline orchestrator.
-Task implementations will be added in T02. This module must be importable
-so that ``worker.py`` can register Celery tasks.
+Each stage reads from PostgreSQL via sync SQLAlchemy, loads its prompt
+template from disk, calls the LLM client, parses the response, writes
+results back, and updates processing_status on SourceVideo.
+
+Celery tasks are synchronous — all DB access uses ``sqlalchemy.orm.Session``.
"""
+
+from __future__ import annotations
+
+import json
+import logging
+import time
+from collections import defaultdict
+from pathlib import Path
+
+import yaml
+from celery import chain as celery_chain
+from pydantic import ValidationError
+from sqlalchemy import create_engine, select
+from sqlalchemy.orm import Session, sessionmaker
+
+from config import get_settings
+from models import (
+ KeyMoment,
+ KeyMomentContentType,
+ ProcessingStatus,
+ SourceVideo,
+ TechniquePage,
+ TranscriptSegment,
+)
+from pipeline.llm_client import LLMClient
+from pipeline.schemas import (
+ ClassificationResult,
+ ExtractionResult,
+ SegmentationResult,
+ SynthesisResult,
+)
+from worker import celery_app
+
+logger = logging.getLogger(__name__)
+
+# ── Helpers ──────────────────────────────────────────────────────────────────
+
+_engine = None
+_SessionLocal = None
+
+
+def _get_sync_engine():
+ """Create a sync SQLAlchemy engine, converting the async URL if needed."""
+ global _engine
+ if _engine is None:
+ settings = get_settings()
+ url = settings.database_url
+ # Convert async driver to sync driver
+ url = url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
+ _engine = create_engine(url, pool_pre_ping=True, pool_size=5, max_overflow=10)
+ return _engine
+
+
+def _get_sync_session() -> Session:
+ """Create a sync SQLAlchemy session for Celery tasks."""
+ global _SessionLocal
+ if _SessionLocal is None:
+ _SessionLocal = sessionmaker(bind=_get_sync_engine())
+ return _SessionLocal()
+
+
+def _load_prompt(template_name: str) -> str:
+ """Read a prompt template from the prompts directory.
+
+ Raises FileNotFoundError if the template does not exist.
+ """
+ settings = get_settings()
+ path = Path(settings.prompts_path) / template_name
+ if not path.exists():
+ logger.error("Prompt template not found: %s", path)
+ raise FileNotFoundError(f"Prompt template not found: {path}")
+ return path.read_text(encoding="utf-8")
+
+
+def _get_llm_client() -> LLMClient:
+ """Return an LLMClient configured from settings."""
+ return LLMClient(get_settings())
+
+
+def _load_canonical_tags() -> dict:
+ """Load canonical tag taxonomy from config/canonical_tags.yaml."""
+ # Walk up from backend/ to find config/
+ candidates = [
+ Path("config/canonical_tags.yaml"),
+ Path("../config/canonical_tags.yaml"),
+ ]
+ for candidate in candidates:
+ if candidate.exists():
+ with open(candidate, encoding="utf-8") as f:
+ return yaml.safe_load(f)
+ raise FileNotFoundError(
+ "canonical_tags.yaml not found. Searched: " + ", ".join(str(c) for c in candidates)
+ )
+
+
+def _format_taxonomy_for_prompt(tags_data: dict) -> str:
+ """Format the canonical tags taxonomy as readable text for the LLM prompt."""
+ lines = []
+ for cat in tags_data.get("categories", []):
+ lines.append(f"Category: {cat['name']}")
+ lines.append(f" Description: {cat['description']}")
+ lines.append(f" Sub-topics: {', '.join(cat.get('sub_topics', []))}")
+ lines.append("")
+ return "\n".join(lines)
+
+
+def _safe_parse_llm_response(raw: str, model_cls, llm: LLMClient, system_prompt: str, user_prompt: str):
+ """Parse LLM response with one retry on failure.
+
+ On malformed response: log the raw text, retry once with a JSON nudge,
+ then raise on second failure.
+ """
+ try:
+ return llm.parse_response(raw, model_cls)
+ except (ValidationError, ValueError, json.JSONDecodeError) as exc:
+ logger.warning(
+ "First parse attempt failed for %s (%s). Retrying with JSON nudge. "
+ "Raw response (first 500 chars): %.500s",
+ model_cls.__name__,
+ type(exc).__name__,
+ raw,
+ )
+ # Retry with explicit JSON instruction
+ nudge_prompt = user_prompt + "\n\nIMPORTANT: Output ONLY valid JSON. No markdown, no explanation."
+ retry_raw = llm.complete(system_prompt, nudge_prompt, response_model=model_cls)
+ return llm.parse_response(retry_raw, model_cls)
+
+
+# ── Stage 2: Segmentation ───────────────────────────────────────────────────
+
+@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
+def stage2_segmentation(self, video_id: str) -> str:
+ """Analyze transcript segments and identify topic boundaries.
+
+ Loads all TranscriptSegment rows for the video, sends them to the LLM
+ for topic boundary detection, and updates topic_label on each segment.
+
+ Returns the video_id for chain compatibility.
+ """
+ start = time.monotonic()
+ logger.info("Stage 2 (segmentation) starting for video_id=%s", video_id)
+
+ session = _get_sync_session()
+ try:
+ # Load segments ordered by index
+ segments = (
+ session.execute(
+ select(TranscriptSegment)
+ .where(TranscriptSegment.source_video_id == video_id)
+ .order_by(TranscriptSegment.segment_index)
+ )
+ .scalars()
+ .all()
+ )
+
+ if not segments:
+ logger.info("Stage 2: No segments found for video_id=%s, skipping.", video_id)
+ return video_id
+
+ # Build transcript text with indices for the LLM
+ transcript_lines = []
+ for seg in segments:
+ transcript_lines.append(
+ f"[{seg.segment_index}] ({seg.start_time:.1f}s - {seg.end_time:.1f}s) {seg.text}"
+ )
+ transcript_text = "\n".join(transcript_lines)
+
+ # Load prompt and call LLM
+ system_prompt = _load_prompt("stage2_segmentation.txt")
+ user_prompt = f"\n{transcript_text}\n"
+
+ llm = _get_llm_client()
+ raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult)
+ result = _safe_parse_llm_response(raw, SegmentationResult, llm, system_prompt, user_prompt)
+
+ # Update topic_label on each segment row
+ seg_by_index = {s.segment_index: s for s in segments}
+ for topic_seg in result.segments:
+ for idx in range(topic_seg.start_index, topic_seg.end_index + 1):
+ if idx in seg_by_index:
+ seg_by_index[idx].topic_label = topic_seg.topic_label
+
+ session.commit()
+ elapsed = time.monotonic() - start
+ logger.info(
+ "Stage 2 (segmentation) completed for video_id=%s in %.1fs — %d topic groups found",
+ video_id, elapsed, len(result.segments),
+ )
+ return video_id
+
+ except FileNotFoundError:
+ raise # Don't retry missing prompt files
+ except Exception as exc:
+ session.rollback()
+ logger.error("Stage 2 failed for video_id=%s: %s", video_id, exc)
+ raise self.retry(exc=exc)
+ finally:
+ session.close()
+
+
+# ── Stage 3: Extraction ─────────────────────────────────────────────────────
+
+@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
+def stage3_extraction(self, video_id: str) -> str:
+ """Extract key moments from each topic segment group.
+
+ Groups segments by topic_label, calls the LLM for each group to extract
+ moments, creates KeyMoment rows, and sets processing_status=extracted.
+
+ Returns the video_id for chain compatibility.
+ """
+ start = time.monotonic()
+ logger.info("Stage 3 (extraction) starting for video_id=%s", video_id)
+
+ session = _get_sync_session()
+ try:
+ # Load segments with topic labels
+ segments = (
+ session.execute(
+ select(TranscriptSegment)
+ .where(TranscriptSegment.source_video_id == video_id)
+ .order_by(TranscriptSegment.segment_index)
+ )
+ .scalars()
+ .all()
+ )
+
+ if not segments:
+ logger.info("Stage 3: No segments found for video_id=%s, skipping.", video_id)
+ return video_id
+
+ # Group segments by topic_label
+ groups: dict[str, list[TranscriptSegment]] = defaultdict(list)
+ for seg in segments:
+ label = seg.topic_label or "unlabeled"
+ groups[label].append(seg)
+
+ system_prompt = _load_prompt("stage3_extraction.txt")
+ llm = _get_llm_client()
+ total_moments = 0
+
+ for topic_label, group_segs in groups.items():
+ # Build segment text for this group
+ seg_lines = []
+ for seg in group_segs:
+ seg_lines.append(
+ f"({seg.start_time:.1f}s - {seg.end_time:.1f}s) {seg.text}"
+ )
+ segment_text = "\n".join(seg_lines)
+
+ user_prompt = (
+ f"Topic: {topic_label}\n\n"
+ f"\n{segment_text}\n"
+ )
+
+ raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult)
+ result = _safe_parse_llm_response(raw, ExtractionResult, llm, system_prompt, user_prompt)
+
+ # Create KeyMoment rows
+ for moment in result.moments:
+ # Validate content_type against enum
+ try:
+ ct = KeyMomentContentType(moment.content_type)
+ except ValueError:
+ ct = KeyMomentContentType.technique
+
+ km = KeyMoment(
+ source_video_id=video_id,
+ title=moment.title,
+ summary=moment.summary,
+ start_time=moment.start_time,
+ end_time=moment.end_time,
+ content_type=ct,
+ plugins=moment.plugins if moment.plugins else None,
+ raw_transcript=moment.raw_transcript or None,
+ )
+ session.add(km)
+ total_moments += 1
+
+ # Update processing_status to extracted
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one()
+ video.processing_status = ProcessingStatus.extracted
+
+ session.commit()
+ elapsed = time.monotonic() - start
+ logger.info(
+ "Stage 3 (extraction) completed for video_id=%s in %.1fs — %d moments created",
+ video_id, elapsed, total_moments,
+ )
+ return video_id
+
+ except FileNotFoundError:
+ raise
+ except Exception as exc:
+ session.rollback()
+ logger.error("Stage 3 failed for video_id=%s: %s", video_id, exc)
+ raise self.retry(exc=exc)
+ finally:
+ session.close()
+
+
+# ── Stage 4: Classification ─────────────────────────────────────────────────
+
+@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
+def stage4_classification(self, video_id: str) -> str:
+ """Classify key moments against the canonical tag taxonomy.
+
+ Loads all KeyMoment rows for the video, sends them to the LLM with the
+ canonical taxonomy, and stores classification results in Redis for
+ stage 5 consumption. Updates content_type if the classifier overrides it.
+
+ Stage 4 does NOT change processing_status.
+
+ Returns the video_id for chain compatibility.
+ """
+ start = time.monotonic()
+ logger.info("Stage 4 (classification) starting for video_id=%s", video_id)
+
+ session = _get_sync_session()
+ try:
+ # Load key moments
+ moments = (
+ session.execute(
+ select(KeyMoment)
+ .where(KeyMoment.source_video_id == video_id)
+ .order_by(KeyMoment.start_time)
+ )
+ .scalars()
+ .all()
+ )
+
+ if not moments:
+ logger.info("Stage 4: No moments found for video_id=%s, skipping.", video_id)
+ # Store empty classification data
+ _store_classification_data(video_id, [])
+ return video_id
+
+ # Load canonical tags
+ tags_data = _load_canonical_tags()
+ taxonomy_text = _format_taxonomy_for_prompt(tags_data)
+
+ # Build moments text for the LLM
+ moments_lines = []
+ for i, m in enumerate(moments):
+ moments_lines.append(
+ f"[{i}] Title: {m.title}\n"
+ f" Summary: {m.summary}\n"
+ f" Content type: {m.content_type.value}\n"
+ f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}"
+ )
+ moments_text = "\n\n".join(moments_lines)
+
+ system_prompt = _load_prompt("stage4_classification.txt")
+ user_prompt = (
+ f"\n{taxonomy_text}\n\n\n"
+ f"\n{moments_text}\n"
+ )
+
+ llm = _get_llm_client()
+ raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult)
+ result = _safe_parse_llm_response(raw, ClassificationResult, llm, system_prompt, user_prompt)
+
+ # Apply content_type overrides and prepare classification data for stage 5
+ classification_data = []
+ moment_ids = [str(m.id) for m in moments]
+
+ for cls in result.classifications:
+ if 0 <= cls.moment_index < len(moments):
+ moment = moments[cls.moment_index]
+
+ # Apply content_type override if provided
+ if cls.content_type_override:
+ try:
+ moment.content_type = KeyMomentContentType(cls.content_type_override)
+ except ValueError:
+ pass
+
+ classification_data.append({
+ "moment_id": str(moment.id),
+ "topic_category": cls.topic_category,
+ "topic_tags": cls.topic_tags,
+ })
+
+ session.commit()
+
+ # Store classification data in Redis for stage 5
+ _store_classification_data(video_id, classification_data)
+
+ elapsed = time.monotonic() - start
+ logger.info(
+ "Stage 4 (classification) completed for video_id=%s in %.1fs — %d moments classified",
+ video_id, elapsed, len(classification_data),
+ )
+ return video_id
+
+ except FileNotFoundError:
+ raise
+ except Exception as exc:
+ session.rollback()
+ logger.error("Stage 4 failed for video_id=%s: %s", video_id, exc)
+ raise self.retry(exc=exc)
+ finally:
+ session.close()
+
+
+def _store_classification_data(video_id: str, data: list[dict]) -> None:
+ """Store classification data in Redis for cross-stage communication."""
+ import redis
+
+ settings = get_settings()
+ r = redis.Redis.from_url(settings.redis_url)
+ key = f"chrysopedia:classification:{video_id}"
+ r.set(key, json.dumps(data), ex=86400) # Expire after 24 hours
+
+
+def _load_classification_data(video_id: str) -> list[dict]:
+ """Load classification data from Redis."""
+ import redis
+
+ settings = get_settings()
+ r = redis.Redis.from_url(settings.redis_url)
+ key = f"chrysopedia:classification:{video_id}"
+ raw = r.get(key)
+ if raw is None:
+ return []
+ return json.loads(raw)
+
+
+# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
+
+@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
+def stage5_synthesis(self, video_id: str) -> str:
+ """Synthesize technique pages from classified key moments.
+
+ Groups moments by (creator, topic_category), calls the LLM to synthesize
+ each group into a TechniquePage, creates/updates page rows, and links
+ KeyMoments to their TechniquePage.
+
+ Sets processing_status to 'reviewed' (or 'published' if review_mode is False).
+
+ Returns the video_id for chain compatibility.
+ """
+ start = time.monotonic()
+ logger.info("Stage 5 (synthesis) starting for video_id=%s", video_id)
+
+ settings = get_settings()
+ session = _get_sync_session()
+ try:
+ # Load video and moments
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one()
+
+ moments = (
+ session.execute(
+ select(KeyMoment)
+ .where(KeyMoment.source_video_id == video_id)
+ .order_by(KeyMoment.start_time)
+ )
+ .scalars()
+ .all()
+ )
+
+ if not moments:
+ logger.info("Stage 5: No moments found for video_id=%s, skipping.", video_id)
+ return video_id
+
+ # Load classification data from stage 4
+ classification_data = _load_classification_data(video_id)
+ cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
+
+ # Group moments by topic_category (from classification)
+ groups: dict[str, list[tuple[KeyMoment, dict]]] = defaultdict(list)
+ for moment in moments:
+ cls_info = cls_by_moment_id.get(str(moment.id), {})
+ category = cls_info.get("topic_category", "Uncategorized")
+ groups[category].append((moment, cls_info))
+
+ system_prompt = _load_prompt("stage5_synthesis.txt")
+ llm = _get_llm_client()
+ pages_created = 0
+
+ for category, moment_group in groups.items():
+ # Build moments text for the LLM
+ moments_lines = []
+ all_tags: set[str] = set()
+ for i, (m, cls_info) in enumerate(moment_group):
+ tags = cls_info.get("topic_tags", [])
+ all_tags.update(tags)
+ moments_lines.append(
+ f"[{i}] Title: {m.title}\n"
+ f" Summary: {m.summary}\n"
+ f" Content type: {m.content_type.value}\n"
+ f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
+ f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
+ f" Category: {category}\n"
+ f" Tags: {', '.join(tags) if tags else 'none'}\n"
+ f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
+ )
+ moments_text = "\n\n".join(moments_lines)
+
+ user_prompt = f"\n{moments_text}\n"
+
+ raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult)
+ result = _safe_parse_llm_response(raw, SynthesisResult, llm, system_prompt, user_prompt)
+
+ # Create/update TechniquePage rows
+ for page_data in result.pages:
+ # Check if page with this slug already exists
+ existing = session.execute(
+ select(TechniquePage).where(TechniquePage.slug == page_data.slug)
+ ).scalar_one_or_none()
+
+ if existing:
+ # Update existing page
+ existing.title = page_data.title
+ existing.summary = page_data.summary
+ existing.body_sections = page_data.body_sections
+ existing.signal_chains = page_data.signal_chains
+ existing.plugins = page_data.plugins if page_data.plugins else None
+ existing.topic_tags = list(all_tags) if all_tags else None
+ existing.source_quality = page_data.source_quality
+ page = existing
+ else:
+ page = TechniquePage(
+ creator_id=video.creator_id,
+ title=page_data.title,
+ slug=page_data.slug,
+ topic_category=page_data.topic_category or category,
+ topic_tags=list(all_tags) if all_tags else None,
+ summary=page_data.summary,
+ body_sections=page_data.body_sections,
+ signal_chains=page_data.signal_chains,
+ plugins=page_data.plugins if page_data.plugins else None,
+ source_quality=page_data.source_quality,
+ )
+ session.add(page)
+ session.flush() # Get the page.id assigned
+
+ pages_created += 1
+
+ # Link moments to the technique page
+ for m, _ in moment_group:
+ m.technique_page_id = page.id
+
+ # Update processing_status
+ if settings.review_mode:
+ video.processing_status = ProcessingStatus.reviewed
+ else:
+ video.processing_status = ProcessingStatus.published
+
+ session.commit()
+ elapsed = time.monotonic() - start
+ logger.info(
+ "Stage 5 (synthesis) completed for video_id=%s in %.1fs — %d pages created/updated",
+ video_id, elapsed, pages_created,
+ )
+ return video_id
+
+ except FileNotFoundError:
+ raise
+ except Exception as exc:
+ session.rollback()
+ logger.error("Stage 5 failed for video_id=%s: %s", video_id, exc)
+ raise self.retry(exc=exc)
+ finally:
+ session.close()
+
+
+# ── Orchestrator ─────────────────────────────────────────────────────────────
+
+@celery_app.task
+def run_pipeline(video_id: str) -> str:
+ """Orchestrate the full pipeline (stages 2-5) with resumability.
+
+ Checks the current processing_status of the video and chains only the
+ stages that still need to run. For example:
+ - pending/transcribed → stages 2, 3, 4, 5
+ - extracted → stages 4, 5
+ - reviewed/published → no-op
+
+ Returns the video_id.
+ """
+ logger.info("run_pipeline starting for video_id=%s", video_id)
+
+ session = _get_sync_session()
+ try:
+ video = session.execute(
+ select(SourceVideo).where(SourceVideo.id == video_id)
+ ).scalar_one_or_none()
+
+ if video is None:
+ logger.error("run_pipeline: video_id=%s not found", video_id)
+ raise ValueError(f"Video not found: {video_id}")
+
+ status = video.processing_status
+ logger.info(
+ "run_pipeline: video_id=%s current status=%s", video_id, status.value
+ )
+ finally:
+ session.close()
+
+ # Build the chain based on current status
+ stages = []
+ if status in (ProcessingStatus.pending, ProcessingStatus.transcribed):
+ stages = [
+ stage2_segmentation.s(video_id),
+ stage3_extraction.s(), # receives video_id from previous
+ stage4_classification.s(),
+ stage5_synthesis.s(),
+ ]
+ elif status == ProcessingStatus.extracted:
+ stages = [
+ stage4_classification.s(video_id),
+ stage5_synthesis.s(),
+ ]
+ elif status in (ProcessingStatus.reviewed, ProcessingStatus.published):
+ logger.info(
+ "run_pipeline: video_id=%s already at status=%s, nothing to do.",
+ video_id, status.value,
+ )
+ return video_id
+
+ if stages:
+ pipeline = celery_chain(*stages)
+ pipeline.apply_async()
+ logger.info(
+ "run_pipeline: dispatched %d stages for video_id=%s",
+ len(stages), video_id,
+ )
+
+ return video_id
diff --git a/backend/requirements.txt b/backend/requirements.txt
index c3f0fc0..8abdd86 100644
--- a/backend/requirements.txt
+++ b/backend/requirements.txt
@@ -13,6 +13,7 @@ httpx>=0.27.0,<1.0
openai>=1.0,<2.0
qdrant-client>=1.9,<2.0
pyyaml>=6.0,<7.0
+psycopg2-binary>=2.9,<3.0
# Test dependencies
pytest>=8.0,<10.0
pytest-asyncio>=0.24,<1.0
diff --git a/prompts/stage2_segmentation.txt b/prompts/stage2_segmentation.txt
new file mode 100644
index 0000000..b09e367
--- /dev/null
+++ b/prompts/stage2_segmentation.txt
@@ -0,0 +1,33 @@
+You are a transcript analysis expert. Your task is to analyze a music production tutorial transcript and identify distinct topic boundaries — contiguous groups of segments that discuss the same subject.
+
+## Instructions
+
+1. Read the transcript segments provided inside the tags.
+2. Each segment has an index, start time, end time, and text.
+3. Group consecutive segments that discuss the same topic together.
+4. Assign a short, descriptive topic_label to each group (e.g., "kick drum layering", "reverb bus setup", "arrangement intro section").
+5. Write a brief summary (1-2 sentences) for each topic group.
+
+## Output Format
+
+Return a JSON object with a single key "segments" containing a list of topic groups:
+
+```json
+{
+ "segments": [
+ {
+ "start_index": 0,
+ "end_index": 5,
+ "topic_label": "Short descriptive label",
+ "summary": "Brief summary of what is discussed in these segments."
+ }
+ ]
+}
+```
+
+## Rules
+
+- Every segment index must be covered exactly once (no gaps, no overlaps).
+- start_index and end_index are inclusive.
+- topic_label should be concise (3-6 words).
+- Output ONLY the JSON object, no other text.
diff --git a/prompts/stage3_extraction.txt b/prompts/stage3_extraction.txt
new file mode 100644
index 0000000..b315f36
--- /dev/null
+++ b/prompts/stage3_extraction.txt
@@ -0,0 +1,47 @@
+You are a music production knowledge extraction expert. Your task is to identify and extract key moments from a topic segment of a tutorial transcript.
+
+## Instructions
+
+1. Read the transcript segment provided inside the tags.
+2. Identify distinct key moments — specific techniques, settings, reasoning, or workflow steps being demonstrated or explained.
+3. For each key moment, extract the relevant details.
+
+## Output Format
+
+Return a JSON object with a single key "moments" containing a list of extracted moments:
+
+```json
+{
+ "moments": [
+ {
+ "title": "Concise title for this moment",
+ "summary": "Detailed summary of the technique or concept being shown (2-4 sentences).",
+ "start_time": 120.5,
+ "end_time": 185.0,
+ "content_type": "technique",
+ "plugins": ["Serum", "OTT"],
+ "raw_transcript": "The relevant excerpt from the transcript text."
+ }
+ ]
+}
+```
+
+## Field Rules
+
+- **title**: Concise, descriptive (e.g., "Layering sub bass with Serum").
+- **summary**: Explain WHAT is done and WHY. Include specific values, settings, or reasoning when mentioned.
+- **start_time** / **end_time**: Use the timestamps from the transcript segments. Times are in seconds.
+- **content_type**: Exactly one of: "technique", "settings", "reasoning", "workflow".
+ - technique = a production technique being demonstrated
+ - settings = specific plugin/DAW settings being shown
+ - reasoning = creative decision-making or explanation of why something is done
+ - workflow = session setup, file management, process organization
+- **plugins**: List any plugins, virtual instruments, or specific tools mentioned. Empty list if none.
+- **raw_transcript**: Copy the most relevant portion of the transcript text for this moment.
+
+## Rules
+
+- Extract ALL meaningful moments — do not skip techniques or settings.
+- Each moment should be self-contained and understandable on its own.
+- If no key moments are found, return {"moments": []}.
+- Output ONLY the JSON object, no other text.
diff --git a/prompts/stage4_classification.txt b/prompts/stage4_classification.txt
new file mode 100644
index 0000000..5813095
--- /dev/null
+++ b/prompts/stage4_classification.txt
@@ -0,0 +1,38 @@
+You are a music production knowledge classifier. Your task is to classify extracted key moments against a canonical tag taxonomy.
+
+## Instructions
+
+1. Read the key moments provided inside the tags.
+2. Read the canonical tag taxonomy provided inside the tags.
+3. For each moment, assign the best-matching topic_category and relevant topic_tags from the taxonomy.
+
+## Output Format
+
+Return a JSON object with a single key "classifications" containing a list of classifications:
+
+```json
+{
+ "classifications": [
+ {
+ "moment_index": 0,
+ "topic_category": "Sound design",
+ "topic_tags": ["bass", "leads"],
+ "content_type_override": null
+ }
+ ]
+}
+```
+
+## Field Rules
+
+- **moment_index**: Zero-based index into the moments list provided.
+- **topic_category**: Must be one of the top-level category names from the taxonomy.
+- **topic_tags**: A list of sub_topics from the matched category. Select all that apply. May include tags from other categories if the moment spans multiple areas.
+- **content_type_override**: Set to a valid content_type ("technique", "settings", "reasoning", "workflow") only if the original classification seems wrong based on context. Otherwise set to null.
+
+## Rules
+
+- Every moment must have exactly one classification entry.
+- topic_category must exactly match a category name from the taxonomy.
+- topic_tags should prefer existing sub_topics but may include new descriptive tags if nothing fits.
+- Output ONLY the JSON object, no other text.
diff --git a/prompts/stage5_synthesis.txt b/prompts/stage5_synthesis.txt
new file mode 100644
index 0000000..d7eca78
--- /dev/null
+++ b/prompts/stage5_synthesis.txt
@@ -0,0 +1,58 @@
+You are a music production knowledge synthesizer. Your task is to create a comprehensive technique page from a group of related key moments by the same creator on the same topic.
+
+## Instructions
+
+1. Read the key moments and their metadata provided inside the tags.
+2. Synthesize them into a single, coherent technique page.
+3. Organize the content into logical body sections.
+
+## Output Format
+
+Return a JSON object with a single key "pages" containing a list of synthesized pages:
+
+```json
+{
+ "pages": [
+ {
+ "title": "Descriptive page title",
+ "slug": "url-safe-slug",
+ "topic_category": "Sound design",
+ "topic_tags": ["bass", "synthesis"],
+ "summary": "A concise overview paragraph (2-3 sentences).",
+ "body_sections": {
+ "Overview": "Introduction to the technique...",
+ "Step-by-Step Process": "Detailed walkthrough...",
+ "Key Settings": "Specific parameter values...",
+ "Tips and Variations": "Additional tips..."
+ },
+ "signal_chains": [
+ {
+ "name": "Main bass chain",
+ "steps": ["Serum (oscillator)", "OTT (compression)", "EQ8 (low cut)"]
+ }
+ ],
+ "plugins": ["Serum", "OTT", "EQ8"],
+ "source_quality": "structured"
+ }
+ ]
+}
+```
+
+## Field Rules
+
+- **title**: Clear, search-friendly title (e.g., "Neuro Bass Design with Serum by CreatorName").
+- **slug**: URL-safe version of the title using hyphens (e.g., "neuro-bass-design-serum-creatorname").
+- **topic_category**: The primary category from the canonical taxonomy.
+- **topic_tags**: All relevant tags aggregated from the classified moments.
+- **summary**: 2-3 sentence overview that captures the essence of the technique.
+- **body_sections**: A dictionary of section titles to section content. Use clear, educational prose. Include specific values and settings when available. Suggested sections: Overview, Step-by-Step Process, Key Settings, Tips and Variations. Adapt section names to fit the content.
+- **signal_chains**: List of signal chain objects with name and ordered steps. Empty list if not applicable.
+- **plugins**: Deduplicated list of all plugins/tools mentioned across the moments.
+- **source_quality**: One of "structured" (clear tutorial), "mixed" (some structure), "unstructured" (conversation/livestream).
+
+## Rules
+
+- Synthesize, don't just concatenate. Create coherent prose from potentially fragmented moments.
+- Preserve specific technical details (frequencies, ratios, plugin settings).
+- If moments conflict, note both approaches.
+- Output ONLY the JSON object, no other text.