feat: Created 4 prompt templates and implemented 5 Celery tasks (stages…

- "prompts/stage2_segmentation.txt"
- "prompts/stage3_extraction.txt"
- "prompts/stage4_classification.txt"
- "prompts/stage5_synthesis.txt"
- "backend/pipeline/stages.py"
- "backend/requirements.txt"

GSD-Task: S03/T02
This commit is contained in:
jlightner 2026-03-29 22:36:06 +00:00
parent 12cc86aef9
commit b5635a09db
9 changed files with 926 additions and 4 deletions

View file

@ -70,7 +70,7 @@
- Estimate: 1.5h
- Files: backend/config.py, backend/requirements.txt, backend/worker.py, backend/pipeline/__init__.py, backend/pipeline/schemas.py, backend/pipeline/llm_client.py
- Verify: cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)" && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')" && python -c "from pipeline.llm_client import LLMClient; print('client ok')" && python -c "from worker import celery_app; print(celery_app.main)"
- [ ] **T02: Prompt templates and pipeline stages 2-5 with orchestrator** — Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
- [x] **T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing** — Write the 4 prompt template files for stages 2-5 and implement all pipeline stage logic as Celery tasks in `pipeline/stages.py`. Each stage reads from PostgreSQL, loads its prompt template, calls the LLM client, parses the response, writes results back to PostgreSQL, and updates processing_status. The `run_pipeline` orchestrator chains stages and handles resumability.
## Failure Modes

View file

@ -0,0 +1,16 @@
{
"schemaVersion": 1,
"taskId": "T01",
"unitId": "M001/S03/T01",
"timestamp": 1774823431445,
"passed": true,
"discoverySource": "task-plan",
"checks": [
{
"command": "cd backend",
"exitCode": 0,
"durationMs": 5,
"verdict": "pass"
}
]
}

View file

@ -0,0 +1,92 @@
---
id: T02
parent: S03
milestone: M001
provides: []
requires: []
affects: []
key_files: ["prompts/stage2_segmentation.txt", "prompts/stage3_extraction.txt", "prompts/stage4_classification.txt", "prompts/stage5_synthesis.txt", "backend/pipeline/stages.py", "backend/requirements.txt"]
key_decisions: ["Stage 4 classification data stored in Redis (chrysopedia:classification:{video_id} key, 24h TTL) because KeyMoment model lacks topic_tags/topic_category columns", "Added psycopg2-binary for sync SQLAlchemy sessions in Celery tasks", "_safe_parse_llm_response retries once with JSON nudge on malformed LLM output"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "All 7 verification checks passed: 4 prompt files exist, all 5 stage functions import from pipeline.stages, worker shows all 5 registered tasks (stage2-5 + run_pipeline), Settings/schemas/LLMClient/celery_app imports still work, openai and qdrant-client confirmed in requirements.txt. Additional structural verification: sync SQLAlchemy session isinstance check passed, prompt loading works from project root, canonical_tags loading returns 6 categories, XML-style tags confirmed in all prompts."
completed_at: 2026-03-29T22:35:57.629Z
blocker_discovered: false
---
# T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
> Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
## What Happened
---
id: T02
parent: S03
milestone: M001
key_files:
- prompts/stage2_segmentation.txt
- prompts/stage3_extraction.txt
- prompts/stage4_classification.txt
- prompts/stage5_synthesis.txt
- backend/pipeline/stages.py
- backend/requirements.txt
key_decisions:
- Stage 4 classification data stored in Redis (chrysopedia:classification:{video_id} key, 24h TTL) because KeyMoment model lacks topic_tags/topic_category columns
- Added psycopg2-binary for sync SQLAlchemy sessions in Celery tasks
- _safe_parse_llm_response retries once with JSON nudge on malformed LLM output
duration: ""
verification_result: passed
completed_at: 2026-03-29T22:35:57.629Z
blocker_discovered: false
---
# T02: Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing
**Created 4 prompt templates and implemented 5 Celery tasks (stages 2-5 + run_pipeline) with sync SQLAlchemy, retry logic, resumability, and Redis-based cross-stage data passing**
## What Happened
Created 4 prompt template files in prompts/ with XML-style content fencing for clear instruction/data separation. Implemented backend/pipeline/stages.py with 5 Celery tasks: stage2_segmentation (topic boundary detection, updates topic_label on TranscriptSegment rows), stage3_extraction (key moment extraction, creates KeyMoment rows, sets status=extracted), stage4_classification (classifies against canonical_tags.yaml, stores results in Redis), stage5_synthesis (creates TechniquePage rows, links KeyMoments, sets status=reviewed/published based on review_mode), and run_pipeline orchestrator (checks current processing_status, chains only remaining stages for resumability). All tasks use sync SQLAlchemy sessions via psycopg2, have bind=True with max_retries=3, and include _safe_parse_llm_response with one-retry JSON nudge on malformed LLM output. Added psycopg2-binary to requirements.txt.
## Verification
All 7 verification checks passed: 4 prompt files exist, all 5 stage functions import from pipeline.stages, worker shows all 5 registered tasks (stage2-5 + run_pipeline), Settings/schemas/LLMClient/celery_app imports still work, openai and qdrant-client confirmed in requirements.txt. Additional structural verification: sync SQLAlchemy session isinstance check passed, prompt loading works from project root, canonical_tags loading returns 6 categories, XML-style tags confirmed in all prompts.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `test -f prompts/stage2_segmentation.txt && test -f prompts/stage3_extraction.txt && test -f prompts/stage4_classification.txt && test -f prompts/stage5_synthesis.txt` | 0 | ✅ pass | 50ms |
| 2 | `cd backend && python -c "from pipeline.stages import run_pipeline, stage2_segmentation, stage3_extraction, stage4_classification, stage5_synthesis; print('all stages imported')"` | 0 | ✅ pass | 500ms |
| 3 | `cd backend && python -c "from worker import celery_app; print([t for t in celery_app.tasks if 'stage' in t or 'pipeline' in t])"` | 0 | ✅ pass | 500ms |
| 4 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 300ms |
| 5 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 300ms |
| 6 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 300ms |
| 7 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
## Deviations
Stage 4 classification data stored in Redis instead of updating topic_tags on KeyMoment rows (model lacks those columns). Added psycopg2-binary to requirements.txt (not in plan but required for sync SQLAlchemy).
## Known Issues
None.
## Files Created/Modified
- `prompts/stage2_segmentation.txt`
- `prompts/stage3_extraction.txt`
- `prompts/stage4_classification.txt`
- `prompts/stage5_synthesis.txt`
- `backend/pipeline/stages.py`
- `backend/requirements.txt`
## Deviations
Stage 4 classification data stored in Redis instead of updating topic_tags on KeyMoment rows (model lacks those columns). Added psycopg2-binary to requirements.txt (not in plan but required for sync SQLAlchemy).
## Known Issues
None.

View file

@ -1,5 +1,642 @@
"""Pipeline stage tasks (stages 2-5).
"""Pipeline stage tasks (stages 2-5) and run_pipeline orchestrator.
Task implementations will be added in T02. This module must be importable
so that ``worker.py`` can register Celery tasks.
Each stage reads from PostgreSQL via sync SQLAlchemy, loads its prompt
template from disk, calls the LLM client, parses the response, writes
results back, and updates processing_status on SourceVideo.
Celery tasks are synchronous all DB access uses ``sqlalchemy.orm.Session``.
"""
from __future__ import annotations
import json
import logging
import time
from collections import defaultdict
from pathlib import Path
import yaml
from celery import chain as celery_chain
from pydantic import ValidationError
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from config import get_settings
from models import (
KeyMoment,
KeyMomentContentType,
ProcessingStatus,
SourceVideo,
TechniquePage,
TranscriptSegment,
)
from pipeline.llm_client import LLMClient
from pipeline.schemas import (
ClassificationResult,
ExtractionResult,
SegmentationResult,
SynthesisResult,
)
from worker import celery_app
logger = logging.getLogger(__name__)
# ── Helpers ──────────────────────────────────────────────────────────────────
_engine = None
_SessionLocal = None
def _get_sync_engine():
"""Create a sync SQLAlchemy engine, converting the async URL if needed."""
global _engine
if _engine is None:
settings = get_settings()
url = settings.database_url
# Convert async driver to sync driver
url = url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
_engine = create_engine(url, pool_pre_ping=True, pool_size=5, max_overflow=10)
return _engine
def _get_sync_session() -> Session:
"""Create a sync SQLAlchemy session for Celery tasks."""
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=_get_sync_engine())
return _SessionLocal()
def _load_prompt(template_name: str) -> str:
"""Read a prompt template from the prompts directory.
Raises FileNotFoundError if the template does not exist.
"""
settings = get_settings()
path = Path(settings.prompts_path) / template_name
if not path.exists():
logger.error("Prompt template not found: %s", path)
raise FileNotFoundError(f"Prompt template not found: {path}")
return path.read_text(encoding="utf-8")
def _get_llm_client() -> LLMClient:
"""Return an LLMClient configured from settings."""
return LLMClient(get_settings())
def _load_canonical_tags() -> dict:
"""Load canonical tag taxonomy from config/canonical_tags.yaml."""
# Walk up from backend/ to find config/
candidates = [
Path("config/canonical_tags.yaml"),
Path("../config/canonical_tags.yaml"),
]
for candidate in candidates:
if candidate.exists():
with open(candidate, encoding="utf-8") as f:
return yaml.safe_load(f)
raise FileNotFoundError(
"canonical_tags.yaml not found. Searched: " + ", ".join(str(c) for c in candidates)
)
def _format_taxonomy_for_prompt(tags_data: dict) -> str:
"""Format the canonical tags taxonomy as readable text for the LLM prompt."""
lines = []
for cat in tags_data.get("categories", []):
lines.append(f"Category: {cat['name']}")
lines.append(f" Description: {cat['description']}")
lines.append(f" Sub-topics: {', '.join(cat.get('sub_topics', []))}")
lines.append("")
return "\n".join(lines)
def _safe_parse_llm_response(raw: str, model_cls, llm: LLMClient, system_prompt: str, user_prompt: str):
"""Parse LLM response with one retry on failure.
On malformed response: log the raw text, retry once with a JSON nudge,
then raise on second failure.
"""
try:
return llm.parse_response(raw, model_cls)
except (ValidationError, ValueError, json.JSONDecodeError) as exc:
logger.warning(
"First parse attempt failed for %s (%s). Retrying with JSON nudge. "
"Raw response (first 500 chars): %.500s",
model_cls.__name__,
type(exc).__name__,
raw,
)
# Retry with explicit JSON instruction
nudge_prompt = user_prompt + "\n\nIMPORTANT: Output ONLY valid JSON. No markdown, no explanation."
retry_raw = llm.complete(system_prompt, nudge_prompt, response_model=model_cls)
return llm.parse_response(retry_raw, model_cls)
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage2_segmentation(self, video_id: str) -> str:
"""Analyze transcript segments and identify topic boundaries.
Loads all TranscriptSegment rows for the video, sends them to the LLM
for topic boundary detection, and updates topic_label on each segment.
Returns the video_id for chain compatibility.
"""
start = time.monotonic()
logger.info("Stage 2 (segmentation) starting for video_id=%s", video_id)
session = _get_sync_session()
try:
# Load segments ordered by index
segments = (
session.execute(
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
.scalars()
.all()
)
if not segments:
logger.info("Stage 2: No segments found for video_id=%s, skipping.", video_id)
return video_id
# Build transcript text with indices for the LLM
transcript_lines = []
for seg in segments:
transcript_lines.append(
f"[{seg.segment_index}] ({seg.start_time:.1f}s - {seg.end_time:.1f}s) {seg.text}"
)
transcript_text = "\n".join(transcript_lines)
# Load prompt and call LLM
system_prompt = _load_prompt("stage2_segmentation.txt")
user_prompt = f"<transcript>\n{transcript_text}\n</transcript>"
llm = _get_llm_client()
raw = llm.complete(system_prompt, user_prompt, response_model=SegmentationResult)
result = _safe_parse_llm_response(raw, SegmentationResult, llm, system_prompt, user_prompt)
# Update topic_label on each segment row
seg_by_index = {s.segment_index: s for s in segments}
for topic_seg in result.segments:
for idx in range(topic_seg.start_index, topic_seg.end_index + 1):
if idx in seg_by_index:
seg_by_index[idx].topic_label = topic_seg.topic_label
session.commit()
elapsed = time.monotonic() - start
logger.info(
"Stage 2 (segmentation) completed for video_id=%s in %.1fs — %d topic groups found",
video_id, elapsed, len(result.segments),
)
return video_id
except FileNotFoundError:
raise # Don't retry missing prompt files
except Exception as exc:
session.rollback()
logger.error("Stage 2 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage3_extraction(self, video_id: str) -> str:
"""Extract key moments from each topic segment group.
Groups segments by topic_label, calls the LLM for each group to extract
moments, creates KeyMoment rows, and sets processing_status=extracted.
Returns the video_id for chain compatibility.
"""
start = time.monotonic()
logger.info("Stage 3 (extraction) starting for video_id=%s", video_id)
session = _get_sync_session()
try:
# Load segments with topic labels
segments = (
session.execute(
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
.scalars()
.all()
)
if not segments:
logger.info("Stage 3: No segments found for video_id=%s, skipping.", video_id)
return video_id
# Group segments by topic_label
groups: dict[str, list[TranscriptSegment]] = defaultdict(list)
for seg in segments:
label = seg.topic_label or "unlabeled"
groups[label].append(seg)
system_prompt = _load_prompt("stage3_extraction.txt")
llm = _get_llm_client()
total_moments = 0
for topic_label, group_segs in groups.items():
# Build segment text for this group
seg_lines = []
for seg in group_segs:
seg_lines.append(
f"({seg.start_time:.1f}s - {seg.end_time:.1f}s) {seg.text}"
)
segment_text = "\n".join(seg_lines)
user_prompt = (
f"Topic: {topic_label}\n\n"
f"<segment>\n{segment_text}\n</segment>"
)
raw = llm.complete(system_prompt, user_prompt, response_model=ExtractionResult)
result = _safe_parse_llm_response(raw, ExtractionResult, llm, system_prompt, user_prompt)
# Create KeyMoment rows
for moment in result.moments:
# Validate content_type against enum
try:
ct = KeyMomentContentType(moment.content_type)
except ValueError:
ct = KeyMomentContentType.technique
km = KeyMoment(
source_video_id=video_id,
title=moment.title,
summary=moment.summary,
start_time=moment.start_time,
end_time=moment.end_time,
content_type=ct,
plugins=moment.plugins if moment.plugins else None,
raw_transcript=moment.raw_transcript or None,
)
session.add(km)
total_moments += 1
# Update processing_status to extracted
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.extracted
session.commit()
elapsed = time.monotonic() - start
logger.info(
"Stage 3 (extraction) completed for video_id=%s in %.1fs — %d moments created",
video_id, elapsed, total_moments,
)
return video_id
except FileNotFoundError:
raise
except Exception as exc:
session.rollback()
logger.error("Stage 3 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()
# ── Stage 4: Classification ─────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage4_classification(self, video_id: str) -> str:
"""Classify key moments against the canonical tag taxonomy.
Loads all KeyMoment rows for the video, sends them to the LLM with the
canonical taxonomy, and stores classification results in Redis for
stage 5 consumption. Updates content_type if the classifier overrides it.
Stage 4 does NOT change processing_status.
Returns the video_id for chain compatibility.
"""
start = time.monotonic()
logger.info("Stage 4 (classification) starting for video_id=%s", video_id)
session = _get_sync_session()
try:
# Load key moments
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
logger.info("Stage 4: No moments found for video_id=%s, skipping.", video_id)
# Store empty classification data
_store_classification_data(video_id, [])
return video_id
# Load canonical tags
tags_data = _load_canonical_tags()
taxonomy_text = _format_taxonomy_for_prompt(tags_data)
# Build moments text for the LLM
moments_lines = []
for i, m in enumerate(moments):
moments_lines.append(
f"[{i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}"
)
moments_text = "\n\n".join(moments_lines)
system_prompt = _load_prompt("stage4_classification.txt")
user_prompt = (
f"<taxonomy>\n{taxonomy_text}\n</taxonomy>\n\n"
f"<moments>\n{moments_text}\n</moments>"
)
llm = _get_llm_client()
raw = llm.complete(system_prompt, user_prompt, response_model=ClassificationResult)
result = _safe_parse_llm_response(raw, ClassificationResult, llm, system_prompt, user_prompt)
# Apply content_type overrides and prepare classification data for stage 5
classification_data = []
moment_ids = [str(m.id) for m in moments]
for cls in result.classifications:
if 0 <= cls.moment_index < len(moments):
moment = moments[cls.moment_index]
# Apply content_type override if provided
if cls.content_type_override:
try:
moment.content_type = KeyMomentContentType(cls.content_type_override)
except ValueError:
pass
classification_data.append({
"moment_id": str(moment.id),
"topic_category": cls.topic_category,
"topic_tags": cls.topic_tags,
})
session.commit()
# Store classification data in Redis for stage 5
_store_classification_data(video_id, classification_data)
elapsed = time.monotonic() - start
logger.info(
"Stage 4 (classification) completed for video_id=%s in %.1fs — %d moments classified",
video_id, elapsed, len(classification_data),
)
return video_id
except FileNotFoundError:
raise
except Exception as exc:
session.rollback()
logger.error("Stage 4 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()
def _store_classification_data(video_id: str, data: list[dict]) -> None:
"""Store classification data in Redis for cross-stage communication."""
import redis
settings = get_settings()
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
r.set(key, json.dumps(data), ex=86400) # Expire after 24 hours
def _load_classification_data(video_id: str) -> list[dict]:
"""Load classification data from Redis."""
import redis
settings = get_settings()
r = redis.Redis.from_url(settings.redis_url)
key = f"chrysopedia:classification:{video_id}"
raw = r.get(key)
if raw is None:
return []
return json.loads(raw)
# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def stage5_synthesis(self, video_id: str) -> str:
"""Synthesize technique pages from classified key moments.
Groups moments by (creator, topic_category), calls the LLM to synthesize
each group into a TechniquePage, creates/updates page rows, and links
KeyMoments to their TechniquePage.
Sets processing_status to 'reviewed' (or 'published' if review_mode is False).
Returns the video_id for chain compatibility.
"""
start = time.monotonic()
logger.info("Stage 5 (synthesis) starting for video_id=%s", video_id)
settings = get_settings()
session = _get_sync_session()
try:
# Load video and moments
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
if not moments:
logger.info("Stage 5: No moments found for video_id=%s, skipping.", video_id)
return video_id
# Load classification data from stage 4
classification_data = _load_classification_data(video_id)
cls_by_moment_id = {c["moment_id"]: c for c in classification_data}
# Group moments by topic_category (from classification)
groups: dict[str, list[tuple[KeyMoment, dict]]] = defaultdict(list)
for moment in moments:
cls_info = cls_by_moment_id.get(str(moment.id), {})
category = cls_info.get("topic_category", "Uncategorized")
groups[category].append((moment, cls_info))
system_prompt = _load_prompt("stage5_synthesis.txt")
llm = _get_llm_client()
pages_created = 0
for category, moment_group in groups.items():
# Build moments text for the LLM
moments_lines = []
all_tags: set[str] = set()
for i, (m, cls_info) in enumerate(moment_group):
tags = cls_info.get("topic_tags", [])
all_tags.update(tags)
moments_lines.append(
f"[{i}] Title: {m.title}\n"
f" Summary: {m.summary}\n"
f" Content type: {m.content_type.value}\n"
f" Time: {m.start_time:.1f}s - {m.end_time:.1f}s\n"
f" Plugins: {', '.join(m.plugins) if m.plugins else 'none'}\n"
f" Category: {category}\n"
f" Tags: {', '.join(tags) if tags else 'none'}\n"
f" Transcript excerpt: {(m.raw_transcript or '')[:300]}"
)
moments_text = "\n\n".join(moments_lines)
user_prompt = f"<moments>\n{moments_text}\n</moments>"
raw = llm.complete(system_prompt, user_prompt, response_model=SynthesisResult)
result = _safe_parse_llm_response(raw, SynthesisResult, llm, system_prompt, user_prompt)
# Create/update TechniquePage rows
for page_data in result.pages:
# Check if page with this slug already exists
existing = session.execute(
select(TechniquePage).where(TechniquePage.slug == page_data.slug)
).scalar_one_or_none()
if existing:
# Update existing page
existing.title = page_data.title
existing.summary = page_data.summary
existing.body_sections = page_data.body_sections
existing.signal_chains = page_data.signal_chains
existing.plugins = page_data.plugins if page_data.plugins else None
existing.topic_tags = list(all_tags) if all_tags else None
existing.source_quality = page_data.source_quality
page = existing
else:
page = TechniquePage(
creator_id=video.creator_id,
title=page_data.title,
slug=page_data.slug,
topic_category=page_data.topic_category or category,
topic_tags=list(all_tags) if all_tags else None,
summary=page_data.summary,
body_sections=page_data.body_sections,
signal_chains=page_data.signal_chains,
plugins=page_data.plugins if page_data.plugins else None,
source_quality=page_data.source_quality,
)
session.add(page)
session.flush() # Get the page.id assigned
pages_created += 1
# Link moments to the technique page
for m, _ in moment_group:
m.technique_page_id = page.id
# Update processing_status
if settings.review_mode:
video.processing_status = ProcessingStatus.reviewed
else:
video.processing_status = ProcessingStatus.published
session.commit()
elapsed = time.monotonic() - start
logger.info(
"Stage 5 (synthesis) completed for video_id=%s in %.1fs — %d pages created/updated",
video_id, elapsed, pages_created,
)
return video_id
except FileNotFoundError:
raise
except Exception as exc:
session.rollback()
logger.error("Stage 5 failed for video_id=%s: %s", video_id, exc)
raise self.retry(exc=exc)
finally:
session.close()
# ── Orchestrator ─────────────────────────────────────────────────────────────
@celery_app.task
def run_pipeline(video_id: str) -> str:
"""Orchestrate the full pipeline (stages 2-5) with resumability.
Checks the current processing_status of the video and chains only the
stages that still need to run. For example:
- pending/transcribed stages 2, 3, 4, 5
- extracted stages 4, 5
- reviewed/published no-op
Returns the video_id.
"""
logger.info("run_pipeline starting for video_id=%s", video_id)
session = _get_sync_session()
try:
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one_or_none()
if video is None:
logger.error("run_pipeline: video_id=%s not found", video_id)
raise ValueError(f"Video not found: {video_id}")
status = video.processing_status
logger.info(
"run_pipeline: video_id=%s current status=%s", video_id, status.value
)
finally:
session.close()
# Build the chain based on current status
stages = []
if status in (ProcessingStatus.pending, ProcessingStatus.transcribed):
stages = [
stage2_segmentation.s(video_id),
stage3_extraction.s(), # receives video_id from previous
stage4_classification.s(),
stage5_synthesis.s(),
]
elif status == ProcessingStatus.extracted:
stages = [
stage4_classification.s(video_id),
stage5_synthesis.s(),
]
elif status in (ProcessingStatus.reviewed, ProcessingStatus.published):
logger.info(
"run_pipeline: video_id=%s already at status=%s, nothing to do.",
video_id, status.value,
)
return video_id
if stages:
pipeline = celery_chain(*stages)
pipeline.apply_async()
logger.info(
"run_pipeline: dispatched %d stages for video_id=%s",
len(stages), video_id,
)
return video_id

View file

@ -13,6 +13,7 @@ httpx>=0.27.0,<1.0
openai>=1.0,<2.0
qdrant-client>=1.9,<2.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
# Test dependencies
pytest>=8.0,<10.0
pytest-asyncio>=0.24,<1.0

View file

@ -0,0 +1,33 @@
You are a transcript analysis expert. Your task is to analyze a music production tutorial transcript and identify distinct topic boundaries — contiguous groups of segments that discuss the same subject.
## Instructions
1. Read the transcript segments provided inside the <transcript> tags.
2. Each segment has an index, start time, end time, and text.
3. Group consecutive segments that discuss the same topic together.
4. Assign a short, descriptive topic_label to each group (e.g., "kick drum layering", "reverb bus setup", "arrangement intro section").
5. Write a brief summary (1-2 sentences) for each topic group.
## Output Format
Return a JSON object with a single key "segments" containing a list of topic groups:
```json
{
"segments": [
{
"start_index": 0,
"end_index": 5,
"topic_label": "Short descriptive label",
"summary": "Brief summary of what is discussed in these segments."
}
]
}
```
## Rules
- Every segment index must be covered exactly once (no gaps, no overlaps).
- start_index and end_index are inclusive.
- topic_label should be concise (3-6 words).
- Output ONLY the JSON object, no other text.

View file

@ -0,0 +1,47 @@
You are a music production knowledge extraction expert. Your task is to identify and extract key moments from a topic segment of a tutorial transcript.
## Instructions
1. Read the transcript segment provided inside the <segment> tags.
2. Identify distinct key moments — specific techniques, settings, reasoning, or workflow steps being demonstrated or explained.
3. For each key moment, extract the relevant details.
## Output Format
Return a JSON object with a single key "moments" containing a list of extracted moments:
```json
{
"moments": [
{
"title": "Concise title for this moment",
"summary": "Detailed summary of the technique or concept being shown (2-4 sentences).",
"start_time": 120.5,
"end_time": 185.0,
"content_type": "technique",
"plugins": ["Serum", "OTT"],
"raw_transcript": "The relevant excerpt from the transcript text."
}
]
}
```
## Field Rules
- **title**: Concise, descriptive (e.g., "Layering sub bass with Serum").
- **summary**: Explain WHAT is done and WHY. Include specific values, settings, or reasoning when mentioned.
- **start_time** / **end_time**: Use the timestamps from the transcript segments. Times are in seconds.
- **content_type**: Exactly one of: "technique", "settings", "reasoning", "workflow".
- technique = a production technique being demonstrated
- settings = specific plugin/DAW settings being shown
- reasoning = creative decision-making or explanation of why something is done
- workflow = session setup, file management, process organization
- **plugins**: List any plugins, virtual instruments, or specific tools mentioned. Empty list if none.
- **raw_transcript**: Copy the most relevant portion of the transcript text for this moment.
## Rules
- Extract ALL meaningful moments — do not skip techniques or settings.
- Each moment should be self-contained and understandable on its own.
- If no key moments are found, return {"moments": []}.
- Output ONLY the JSON object, no other text.

View file

@ -0,0 +1,38 @@
You are a music production knowledge classifier. Your task is to classify extracted key moments against a canonical tag taxonomy.
## Instructions
1. Read the key moments provided inside the <moments> tags.
2. Read the canonical tag taxonomy provided inside the <taxonomy> tags.
3. For each moment, assign the best-matching topic_category and relevant topic_tags from the taxonomy.
## Output Format
Return a JSON object with a single key "classifications" containing a list of classifications:
```json
{
"classifications": [
{
"moment_index": 0,
"topic_category": "Sound design",
"topic_tags": ["bass", "leads"],
"content_type_override": null
}
]
}
```
## Field Rules
- **moment_index**: Zero-based index into the moments list provided.
- **topic_category**: Must be one of the top-level category names from the taxonomy.
- **topic_tags**: A list of sub_topics from the matched category. Select all that apply. May include tags from other categories if the moment spans multiple areas.
- **content_type_override**: Set to a valid content_type ("technique", "settings", "reasoning", "workflow") only if the original classification seems wrong based on context. Otherwise set to null.
## Rules
- Every moment must have exactly one classification entry.
- topic_category must exactly match a category name from the taxonomy.
- topic_tags should prefer existing sub_topics but may include new descriptive tags if nothing fits.
- Output ONLY the JSON object, no other text.

View file

@ -0,0 +1,58 @@
You are a music production knowledge synthesizer. Your task is to create a comprehensive technique page from a group of related key moments by the same creator on the same topic.
## Instructions
1. Read the key moments and their metadata provided inside the <moments> tags.
2. Synthesize them into a single, coherent technique page.
3. Organize the content into logical body sections.
## Output Format
Return a JSON object with a single key "pages" containing a list of synthesized pages:
```json
{
"pages": [
{
"title": "Descriptive page title",
"slug": "url-safe-slug",
"topic_category": "Sound design",
"topic_tags": ["bass", "synthesis"],
"summary": "A concise overview paragraph (2-3 sentences).",
"body_sections": {
"Overview": "Introduction to the technique...",
"Step-by-Step Process": "Detailed walkthrough...",
"Key Settings": "Specific parameter values...",
"Tips and Variations": "Additional tips..."
},
"signal_chains": [
{
"name": "Main bass chain",
"steps": ["Serum (oscillator)", "OTT (compression)", "EQ8 (low cut)"]
}
],
"plugins": ["Serum", "OTT", "EQ8"],
"source_quality": "structured"
}
]
}
```
## Field Rules
- **title**: Clear, search-friendly title (e.g., "Neuro Bass Design with Serum by CreatorName").
- **slug**: URL-safe version of the title using hyphens (e.g., "neuro-bass-design-serum-creatorname").
- **topic_category**: The primary category from the canonical taxonomy.
- **topic_tags**: All relevant tags aggregated from the classified moments.
- **summary**: 2-3 sentence overview that captures the essence of the technique.
- **body_sections**: A dictionary of section titles to section content. Use clear, educational prose. Include specific values and settings when available. Suggested sections: Overview, Step-by-Step Process, Key Settings, Tips and Variations. Adapt section names to fit the content.
- **signal_chains**: List of signal chain objects with name and ordered steps. Empty list if not applicable.
- **plugins**: Deduplicated list of all plugins/tools mentioned across the moments.
- **source_quality**: One of "structured" (clear tutorial), "mixed" (some structure), "unstructured" (conversation/livestream).
## Rules
- Synthesize, don't just concatenate. Create coherent prose from potentially fragmented moments.
- Preserve specific technical details (frequencies, ratios, plugin settings).
- If moments conflict, note both approaches.
- Output ONLY the JSON object, no other text.