chore: Extended Settings with 12 LLM/embedding/Qdrant config fields, cr…

- "backend/config.py"
- "backend/worker.py"
- "backend/pipeline/schemas.py"
- "backend/pipeline/llm_client.py"
- "backend/requirements.txt"
- "backend/pipeline/__init__.py"
- "backend/pipeline/stages.py"

GSD-Task: S03/T01
This commit is contained in:
jlightner 2026-03-29 22:30:31 +00:00
parent a9de7f97ea
commit b68751f0db
7 changed files with 297 additions and 0 deletions

View file

@ -26,6 +26,28 @@ class Settings(BaseSettings):
# CORS # CORS
cors_origins: list[str] = ["*"] cors_origins: list[str] = ["*"]
# LLM endpoint (OpenAI-compatible)
llm_api_url: str = "http://localhost:11434/v1"
llm_api_key: str = "sk-placeholder"
llm_model: str = "qwen2.5:14b-q8_0"
llm_fallback_url: str = "http://localhost:11434/v1"
llm_fallback_model: str = "qwen2.5:14b-q8_0"
# Embedding endpoint
embedding_api_url: str = "http://localhost:11434/v1"
embedding_model: str = "nomic-embed-text"
embedding_dimensions: int = 768
# Qdrant
qdrant_url: str = "http://localhost:6333"
qdrant_collection: str = "chrysopedia"
# Prompt templates
prompts_path: str = "./prompts"
# Review mode — when True, extracted moments go to review queue before publishing
review_mode: bool = True
# File storage # File storage
transcript_storage_path: str = "/data/transcripts" transcript_storage_path: str = "/data/transcripts"
video_metadata_path: str = "/data/video_meta" video_metadata_path: str = "/data/video_meta"

View file

View file

@ -0,0 +1,136 @@
"""Synchronous LLM client with primary/fallback endpoint logic.
Uses the OpenAI-compatible API (works with Ollama, vLLM, OpenWebUI, etc.).
Celery tasks run synchronously, so this uses ``openai.OpenAI`` (not Async).
"""
from __future__ import annotations
import logging
from typing import TypeVar
import openai
from pydantic import BaseModel
from config import Settings
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
class LLMClient:
"""Sync LLM client that tries a primary endpoint and falls back on failure."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._primary = openai.OpenAI(
base_url=settings.llm_api_url,
api_key=settings.llm_api_key,
)
self._fallback = openai.OpenAI(
base_url=settings.llm_fallback_url,
api_key=settings.llm_api_key,
)
# ── Core completion ──────────────────────────────────────────────────
def complete(
self,
system_prompt: str,
user_prompt: str,
response_model: type[BaseModel] | None = None,
) -> str:
"""Send a chat completion request, falling back on connection/timeout errors.
Parameters
----------
system_prompt:
System message content.
user_prompt:
User message content.
response_model:
If provided, ``response_format`` is set to ``{"type": "json_object"}``
so the LLM returns parseable JSON.
Returns
-------
str
Raw completion text from the model.
"""
kwargs: dict = {}
if response_model is not None:
kwargs["response_format"] = {"type": "json_object"}
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# --- Try primary endpoint ---
try:
response = self._primary.chat.completions.create(
model=self.settings.llm_model,
messages=messages,
**kwargs,
)
return response.choices[0].message.content or ""
except (openai.APIConnectionError, openai.APITimeoutError) as exc:
logger.warning(
"Primary LLM endpoint failed (%s: %s), trying fallback at %s",
type(exc).__name__,
exc,
self.settings.llm_fallback_url,
)
# --- Try fallback endpoint ---
try:
response = self._fallback.chat.completions.create(
model=self.settings.llm_fallback_model,
messages=messages,
**kwargs,
)
return response.choices[0].message.content or ""
except (openai.APIConnectionError, openai.APITimeoutError, openai.APIError) as exc:
logger.error(
"Fallback LLM endpoint also failed (%s: %s). Giving up.",
type(exc).__name__,
exc,
)
raise
# ── Response parsing ─────────────────────────────────────────────────
def parse_response(self, text: str, model: type[T]) -> T:
"""Parse raw LLM output as JSON and validate against a Pydantic model.
Parameters
----------
text:
Raw JSON string from the LLM.
model:
Pydantic model class to validate against.
Returns
-------
T
Validated Pydantic model instance.
Raises
------
pydantic.ValidationError
If the JSON doesn't match the schema.
ValueError
If the text is not valid JSON.
"""
try:
return model.model_validate_json(text)
except Exception:
logger.error(
"Failed to parse LLM response as %s. Response text: %.500s",
model.__name__,
text,
)
raise

View file

@ -0,0 +1,99 @@
"""Pydantic schemas for pipeline stage inputs and outputs.
Stage 2 Segmentation: groups transcript segments by topic.
Stage 3 Extraction: extracts key moments from segments.
Stage 4 Classification: classifies moments by category/tags.
Stage 5 Synthesis: generates technique pages from classified moments.
"""
from __future__ import annotations
from pydantic import BaseModel, Field
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
class TopicSegment(BaseModel):
"""A contiguous group of transcript segments sharing a topic."""
start_index: int = Field(description="First transcript segment index in this group")
end_index: int = Field(description="Last transcript segment index in this group (inclusive)")
topic_label: str = Field(description="Short label describing the topic")
summary: str = Field(description="Brief summary of what is discussed")
class SegmentationResult(BaseModel):
"""Full output of stage 2 (segmentation)."""
segments: list[TopicSegment]
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
class ExtractedMoment(BaseModel):
"""A single key moment extracted from a topic segment group."""
title: str = Field(description="Concise title for the moment")
summary: str = Field(description="Detailed summary of the technique/concept")
start_time: float = Field(description="Start time in seconds")
end_time: float = Field(description="End time in seconds")
content_type: str = Field(description="One of: technique, settings, reasoning, workflow")
plugins: list[str] = Field(default_factory=list, description="Plugins/tools mentioned")
raw_transcript: str = Field(default="", description="Raw transcript text for this moment")
class ExtractionResult(BaseModel):
"""Full output of stage 3 (extraction)."""
moments: list[ExtractedMoment]
# ── Stage 4: Classification ─────────────────────────────────────────────────
class ClassifiedMoment(BaseModel):
"""Classification metadata for a single extracted moment."""
moment_index: int = Field(description="Index into ExtractionResult.moments")
topic_category: str = Field(description="High-level topic category")
topic_tags: list[str] = Field(default_factory=list, description="Specific topic tags")
content_type_override: str | None = Field(
default=None,
description="Override for content_type if classification disagrees with extraction",
)
class ClassificationResult(BaseModel):
"""Full output of stage 4 (classification)."""
classifications: list[ClassifiedMoment]
# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
class SynthesizedPage(BaseModel):
"""A technique page synthesized from classified moments."""
title: str = Field(description="Page title")
slug: str = Field(description="URL-safe slug")
topic_category: str = Field(description="Primary topic category")
topic_tags: list[str] = Field(default_factory=list, description="Associated tags")
summary: str = Field(description="Page summary / overview paragraph")
body_sections: dict = Field(
default_factory=dict,
description="Structured body content as section_name -> content mapping",
)
signal_chains: list[dict] = Field(
default_factory=list,
description="Signal chain descriptions (for audio/music production contexts)",
)
plugins: list[str] = Field(default_factory=list, description="Plugins/tools referenced")
source_quality: str = Field(
default="mixed",
description="One of: structured, mixed, unstructured",
)
class SynthesisResult(BaseModel):
"""Full output of stage 5 (synthesis)."""
pages: list[SynthesizedPage]

View file

@ -0,0 +1,5 @@
"""Pipeline stage tasks (stages 2-5).
Task implementations will be added in T02. This module must be importable
so that ``worker.py`` can register Celery tasks.
"""

View file

@ -10,6 +10,9 @@ redis>=5.0,<6.0
python-dotenv>=1.0,<2.0 python-dotenv>=1.0,<2.0
python-multipart>=0.0.9,<1.0 python-multipart>=0.0.9,<1.0
httpx>=0.27.0,<1.0 httpx>=0.27.0,<1.0
openai>=1.0,<2.0
qdrant-client>=1.9,<2.0
pyyaml>=6.0,<7.0
# Test dependencies # Test dependencies
pytest>=8.0,<10.0 pytest>=8.0,<10.0
pytest-asyncio>=0.24,<1.0 pytest-asyncio>=0.24,<1.0

32
backend/worker.py Normal file
View file

@ -0,0 +1,32 @@
"""Celery application instance for the Chrysopedia pipeline.
Usage:
celery -A worker worker --loglevel=info
"""
from celery import Celery
from config import get_settings
settings = get_settings()
celery_app = Celery(
"chrysopedia",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
# Import pipeline.stages so that @celery_app.task decorators register tasks.
# This import must come after celery_app is defined.
import pipeline.stages # noqa: E402, F401