From b68751f0db9193779e07815aa3e033d094cbb39e Mon Sep 17 00:00:00 2001 From: jlightner Date: Sun, 29 Mar 2026 22:30:31 +0000 Subject: [PATCH] =?UTF-8?q?chore:=20Extended=20Settings=20with=2012=20LLM/?= =?UTF-8?q?embedding/Qdrant=20config=20fields,=20cr=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "backend/config.py" - "backend/worker.py" - "backend/pipeline/schemas.py" - "backend/pipeline/llm_client.py" - "backend/requirements.txt" - "backend/pipeline/__init__.py" - "backend/pipeline/stages.py" GSD-Task: S03/T01 --- backend/config.py | 22 ++++++ backend/pipeline/__init__.py | 0 backend/pipeline/llm_client.py | 136 +++++++++++++++++++++++++++++++++ backend/pipeline/schemas.py | 99 ++++++++++++++++++++++++ backend/pipeline/stages.py | 5 ++ backend/requirements.txt | 3 + backend/worker.py | 32 ++++++++ 7 files changed, 297 insertions(+) create mode 100644 backend/pipeline/__init__.py create mode 100644 backend/pipeline/llm_client.py create mode 100644 backend/pipeline/schemas.py create mode 100644 backend/pipeline/stages.py create mode 100644 backend/worker.py diff --git a/backend/config.py b/backend/config.py index 43deb36..878e983 100644 --- a/backend/config.py +++ b/backend/config.py @@ -26,6 +26,28 @@ class Settings(BaseSettings): # CORS cors_origins: list[str] = ["*"] + # LLM endpoint (OpenAI-compatible) + llm_api_url: str = "http://localhost:11434/v1" + llm_api_key: str = "sk-placeholder" + llm_model: str = "qwen2.5:14b-q8_0" + llm_fallback_url: str = "http://localhost:11434/v1" + llm_fallback_model: str = "qwen2.5:14b-q8_0" + + # Embedding endpoint + embedding_api_url: str = "http://localhost:11434/v1" + embedding_model: str = "nomic-embed-text" + embedding_dimensions: int = 768 + + # Qdrant + qdrant_url: str = "http://localhost:6333" + qdrant_collection: str = "chrysopedia" + + # Prompt templates + prompts_path: str = "./prompts" + + # Review mode — when True, extracted moments go to review queue before publishing + review_mode: bool = True + # File storage transcript_storage_path: str = "/data/transcripts" video_metadata_path: str = "/data/video_meta" diff --git a/backend/pipeline/__init__.py b/backend/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/pipeline/llm_client.py b/backend/pipeline/llm_client.py new file mode 100644 index 0000000..1ee095c --- /dev/null +++ b/backend/pipeline/llm_client.py @@ -0,0 +1,136 @@ +"""Synchronous LLM client with primary/fallback endpoint logic. + +Uses the OpenAI-compatible API (works with Ollama, vLLM, OpenWebUI, etc.). +Celery tasks run synchronously, so this uses ``openai.OpenAI`` (not Async). +""" + +from __future__ import annotations + +import logging +from typing import TypeVar + +import openai +from pydantic import BaseModel + +from config import Settings + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=BaseModel) + + +class LLMClient: + """Sync LLM client that tries a primary endpoint and falls back on failure.""" + + def __init__(self, settings: Settings) -> None: + self.settings = settings + self._primary = openai.OpenAI( + base_url=settings.llm_api_url, + api_key=settings.llm_api_key, + ) + self._fallback = openai.OpenAI( + base_url=settings.llm_fallback_url, + api_key=settings.llm_api_key, + ) + + # ── Core completion ────────────────────────────────────────────────── + + def complete( + self, + system_prompt: str, + user_prompt: str, + response_model: type[BaseModel] | None = None, + ) -> str: + """Send a chat completion request, falling back on connection/timeout errors. + + Parameters + ---------- + system_prompt: + System message content. + user_prompt: + User message content. + response_model: + If provided, ``response_format`` is set to ``{"type": "json_object"}`` + so the LLM returns parseable JSON. + + Returns + ------- + str + Raw completion text from the model. + """ + kwargs: dict = {} + if response_model is not None: + kwargs["response_format"] = {"type": "json_object"} + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + # --- Try primary endpoint --- + try: + response = self._primary.chat.completions.create( + model=self.settings.llm_model, + messages=messages, + **kwargs, + ) + return response.choices[0].message.content or "" + + except (openai.APIConnectionError, openai.APITimeoutError) as exc: + logger.warning( + "Primary LLM endpoint failed (%s: %s), trying fallback at %s", + type(exc).__name__, + exc, + self.settings.llm_fallback_url, + ) + + # --- Try fallback endpoint --- + try: + response = self._fallback.chat.completions.create( + model=self.settings.llm_fallback_model, + messages=messages, + **kwargs, + ) + return response.choices[0].message.content or "" + + except (openai.APIConnectionError, openai.APITimeoutError, openai.APIError) as exc: + logger.error( + "Fallback LLM endpoint also failed (%s: %s). Giving up.", + type(exc).__name__, + exc, + ) + raise + + # ── Response parsing ───────────────────────────────────────────────── + + def parse_response(self, text: str, model: type[T]) -> T: + """Parse raw LLM output as JSON and validate against a Pydantic model. + + Parameters + ---------- + text: + Raw JSON string from the LLM. + model: + Pydantic model class to validate against. + + Returns + ------- + T + Validated Pydantic model instance. + + Raises + ------ + pydantic.ValidationError + If the JSON doesn't match the schema. + ValueError + If the text is not valid JSON. + """ + try: + return model.model_validate_json(text) + except Exception: + logger.error( + "Failed to parse LLM response as %s. Response text: %.500s", + model.__name__, + text, + ) + raise diff --git a/backend/pipeline/schemas.py b/backend/pipeline/schemas.py new file mode 100644 index 0000000..e96709e --- /dev/null +++ b/backend/pipeline/schemas.py @@ -0,0 +1,99 @@ +"""Pydantic schemas for pipeline stage inputs and outputs. + +Stage 2 — Segmentation: groups transcript segments by topic. +Stage 3 — Extraction: extracts key moments from segments. +Stage 4 — Classification: classifies moments by category/tags. +Stage 5 — Synthesis: generates technique pages from classified moments. +""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +# ── Stage 2: Segmentation ─────────────────────────────────────────────────── + +class TopicSegment(BaseModel): + """A contiguous group of transcript segments sharing a topic.""" + + start_index: int = Field(description="First transcript segment index in this group") + end_index: int = Field(description="Last transcript segment index in this group (inclusive)") + topic_label: str = Field(description="Short label describing the topic") + summary: str = Field(description="Brief summary of what is discussed") + + +class SegmentationResult(BaseModel): + """Full output of stage 2 (segmentation).""" + + segments: list[TopicSegment] + + +# ── Stage 3: Extraction ───────────────────────────────────────────────────── + +class ExtractedMoment(BaseModel): + """A single key moment extracted from a topic segment group.""" + + title: str = Field(description="Concise title for the moment") + summary: str = Field(description="Detailed summary of the technique/concept") + start_time: float = Field(description="Start time in seconds") + end_time: float = Field(description="End time in seconds") + content_type: str = Field(description="One of: technique, settings, reasoning, workflow") + plugins: list[str] = Field(default_factory=list, description="Plugins/tools mentioned") + raw_transcript: str = Field(default="", description="Raw transcript text for this moment") + + +class ExtractionResult(BaseModel): + """Full output of stage 3 (extraction).""" + + moments: list[ExtractedMoment] + + +# ── Stage 4: Classification ───────────────────────────────────────────────── + +class ClassifiedMoment(BaseModel): + """Classification metadata for a single extracted moment.""" + + moment_index: int = Field(description="Index into ExtractionResult.moments") + topic_category: str = Field(description="High-level topic category") + topic_tags: list[str] = Field(default_factory=list, description="Specific topic tags") + content_type_override: str | None = Field( + default=None, + description="Override for content_type if classification disagrees with extraction", + ) + + +class ClassificationResult(BaseModel): + """Full output of stage 4 (classification).""" + + classifications: list[ClassifiedMoment] + + +# ── Stage 5: Synthesis ─────────────────────────────────────────────────────── + +class SynthesizedPage(BaseModel): + """A technique page synthesized from classified moments.""" + + title: str = Field(description="Page title") + slug: str = Field(description="URL-safe slug") + topic_category: str = Field(description="Primary topic category") + topic_tags: list[str] = Field(default_factory=list, description="Associated tags") + summary: str = Field(description="Page summary / overview paragraph") + body_sections: dict = Field( + default_factory=dict, + description="Structured body content as section_name -> content mapping", + ) + signal_chains: list[dict] = Field( + default_factory=list, + description="Signal chain descriptions (for audio/music production contexts)", + ) + plugins: list[str] = Field(default_factory=list, description="Plugins/tools referenced") + source_quality: str = Field( + default="mixed", + description="One of: structured, mixed, unstructured", + ) + + +class SynthesisResult(BaseModel): + """Full output of stage 5 (synthesis).""" + + pages: list[SynthesizedPage] diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py new file mode 100644 index 0000000..073805e --- /dev/null +++ b/backend/pipeline/stages.py @@ -0,0 +1,5 @@ +"""Pipeline stage tasks (stages 2-5). + +Task implementations will be added in T02. This module must be importable +so that ``worker.py`` can register Celery tasks. +""" diff --git a/backend/requirements.txt b/backend/requirements.txt index dd7aba6..c3f0fc0 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,6 +10,9 @@ redis>=5.0,<6.0 python-dotenv>=1.0,<2.0 python-multipart>=0.0.9,<1.0 httpx>=0.27.0,<1.0 +openai>=1.0,<2.0 +qdrant-client>=1.9,<2.0 +pyyaml>=6.0,<7.0 # Test dependencies pytest>=8.0,<10.0 pytest-asyncio>=0.24,<1.0 diff --git a/backend/worker.py b/backend/worker.py new file mode 100644 index 0000000..7eea336 --- /dev/null +++ b/backend/worker.py @@ -0,0 +1,32 @@ +"""Celery application instance for the Chrysopedia pipeline. + +Usage: + celery -A worker worker --loglevel=info +""" + +from celery import Celery + +from config import get_settings + +settings = get_settings() + +celery_app = Celery( + "chrysopedia", + broker=settings.redis_url, + backend=settings.redis_url, +) + +celery_app.conf.update( + task_serializer="json", + result_serializer="json", + accept_content=["json"], + timezone="UTC", + enable_utc=True, + task_track_started=True, + task_acks_late=True, + worker_prefetch_multiplier=1, +) + +# Import pipeline.stages so that @celery_app.task decorators register tasks. +# This import must come after celery_app is defined. +import pipeline.stages # noqa: E402, F401