diff --git a/alembic/versions/002_technique_page_versions.py b/alembic/versions/002_technique_page_versions.py new file mode 100644 index 0000000..f679ad1 --- /dev/null +++ b/alembic/versions/002_technique_page_versions.py @@ -0,0 +1,39 @@ +"""technique_page_versions table for article versioning + +Revision ID: 002_technique_page_versions +Revises: 001_initial +Create Date: 2026-03-30 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + +# revision identifiers, used by Alembic. +revision: str = "002_technique_page_versions" +down_revision: Union[str, None] = "001_initial" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "technique_page_versions", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("technique_page_id", UUID(as_uuid=True), sa.ForeignKey("technique_pages.id", ondelete="CASCADE"), nullable=False), + sa.Column("version_number", sa.Integer, nullable=False), + sa.Column("content_snapshot", JSONB, nullable=False), + sa.Column("pipeline_metadata", JSONB, nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.now()), + ) + op.create_index( + "ix_technique_page_versions_page_version", + "technique_page_versions", + ["technique_page_id", "version_number"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_table("technique_page_versions") diff --git a/backend/models.py b/backend/models.py index 242b160..4321417 100644 --- a/backend/models.py +++ b/backend/models.py @@ -253,6 +253,9 @@ class TechniquePage(Base): key_moments: Mapped[list[KeyMoment]] = sa_relationship( back_populates="technique_page", foreign_keys=[KeyMoment.technique_page_id] ) + versions: Mapped[list[TechniquePageVersion]] = sa_relationship( + back_populates="technique_page", order_by="TechniquePageVersion.version_number" + ) outgoing_links: Mapped[list[RelatedTechniqueLink]] = sa_relationship( foreign_keys="RelatedTechniqueLink.source_page_id", back_populates="source_page" ) @@ -288,6 +291,27 @@ class RelatedTechniqueLink(Base): ) +class TechniquePageVersion(Base): + """Snapshot of a TechniquePage before a pipeline re-synthesis overwrites it.""" + __tablename__ = "technique_page_versions" + + id: Mapped[uuid.UUID] = _uuid_pk() + technique_page_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("technique_pages.id", ondelete="CASCADE"), nullable=False + ) + version_number: Mapped[int] = mapped_column(Integer, nullable=False) + content_snapshot: Mapped[dict] = mapped_column(JSONB, nullable=False) + pipeline_metadata: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + created_at: Mapped[datetime] = mapped_column( + default=_now, server_default=func.now() + ) + + # relationships + technique_page: Mapped[TechniquePage] = sa_relationship( + back_populates="versions" + ) + + class Tag(Base): __tablename__ = "tags" diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index fd979a6..3a49f86 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -9,6 +9,7 @@ Celery tasks are synchronous — all DB access uses ``sqlalchemy.orm.Session``. from __future__ import annotations +import hashlib import json import logging import time @@ -18,7 +19,7 @@ from pathlib import Path import yaml from celery import chain as celery_chain from pydantic import ValidationError -from sqlalchemy import create_engine, select +from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker from config import get_settings @@ -28,6 +29,7 @@ from models import ( ProcessingStatus, SourceVideo, TechniquePage, + TechniquePageVersion, TranscriptSegment, ) from pipeline.embedding_client import EmbeddingClient @@ -474,6 +476,53 @@ def _load_classification_data(video_id: str) -> list[dict]: return json.loads(raw) +def _capture_pipeline_metadata() -> dict: + """Capture current pipeline configuration for version metadata. + + Returns a dict with model names, prompt file SHA-256 hashes, and stage + modality settings. Handles missing prompt files gracefully. + """ + settings = get_settings() + prompts_path = Path(settings.prompts_path) + + # Hash each prompt template file + prompt_hashes: dict[str, str] = {} + prompt_files = [ + "stage2_segmentation.txt", + "stage3_extraction.txt", + "stage4_classification.txt", + "stage5_synthesis.txt", + ] + for filename in prompt_files: + filepath = prompts_path / filename + try: + content = filepath.read_bytes() + prompt_hashes[filename] = hashlib.sha256(content).hexdigest() + except FileNotFoundError: + logger.warning("Prompt file not found for metadata capture: %s", filepath) + prompt_hashes[filename] = "" + except OSError as exc: + logger.warning("Could not read prompt file %s: %s", filepath, exc) + prompt_hashes[filename] = "" + + return { + "models": { + "stage2": settings.llm_stage2_model, + "stage3": settings.llm_stage3_model, + "stage4": settings.llm_stage4_model, + "stage5": settings.llm_stage5_model, + "embedding": settings.embedding_model, + }, + "modalities": { + "stage2": settings.llm_stage2_modality, + "stage3": settings.llm_stage3_modality, + "stage4": settings.llm_stage4_modality, + "stage5": settings.llm_stage5_modality, + }, + "prompt_hashes": prompt_hashes, + } + + # ── Stage 5: Synthesis ─────────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, default_retry_delay=30) @@ -564,6 +613,44 @@ def stage5_synthesis(self, video_id: str) -> str: ).scalar_one_or_none() if existing: + # Snapshot existing content before overwriting + try: + snapshot = { + "title": existing.title, + "slug": existing.slug, + "topic_category": existing.topic_category, + "topic_tags": existing.topic_tags, + "summary": existing.summary, + "body_sections": existing.body_sections, + "signal_chains": existing.signal_chains, + "plugins": existing.plugins, + "source_quality": existing.source_quality.value if existing.source_quality else None, + } + version_count = session.execute( + select(func.count()).where( + TechniquePageVersion.technique_page_id == existing.id + ) + ).scalar() + version_number = version_count + 1 + + version = TechniquePageVersion( + technique_page_id=existing.id, + version_number=version_number, + content_snapshot=snapshot, + pipeline_metadata=_capture_pipeline_metadata(), + ) + session.add(version) + logger.info( + "Version snapshot v%d created for page slug=%s", + version_number, existing.slug, + ) + except Exception as snap_exc: + logger.error( + "Failed to create version snapshot for page slug=%s: %s", + existing.slug, snap_exc, + ) + # Best-effort versioning — continue with page update + # Update existing page existing.title = page_data.title existing.summary = page_data.summary