feat: Added TechniquePageVersion model, Alembic migration 002, pipeline…

- "backend/models.py"
- "alembic/versions/002_technique_page_versions.py"
- "backend/pipeline/stages.py"

GSD-Task: S04/T01
This commit is contained in:
jlightner 2026-03-30 07:07:16 +00:00
parent 2a5e4d7156
commit a4d298502c
3 changed files with 151 additions and 1 deletions

View file

@ -0,0 +1,39 @@
"""technique_page_versions table for article versioning
Revision ID: 002_technique_page_versions
Revises: 001_initial
Create Date: 2026-03-30
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID
# revision identifiers, used by Alembic.
revision: str = "002_technique_page_versions"
down_revision: Union[str, None] = "001_initial"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"technique_page_versions",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("technique_page_id", UUID(as_uuid=True), sa.ForeignKey("technique_pages.id", ondelete="CASCADE"), nullable=False),
sa.Column("version_number", sa.Integer, nullable=False),
sa.Column("content_snapshot", JSONB, nullable=False),
sa.Column("pipeline_metadata", JSONB, nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.now()),
)
op.create_index(
"ix_technique_page_versions_page_version",
"technique_page_versions",
["technique_page_id", "version_number"],
unique=True,
)
def downgrade() -> None:
op.drop_table("technique_page_versions")

View file

@ -253,6 +253,9 @@ class TechniquePage(Base):
key_moments: Mapped[list[KeyMoment]] = sa_relationship( key_moments: Mapped[list[KeyMoment]] = sa_relationship(
back_populates="technique_page", foreign_keys=[KeyMoment.technique_page_id] back_populates="technique_page", foreign_keys=[KeyMoment.technique_page_id]
) )
versions: Mapped[list[TechniquePageVersion]] = sa_relationship(
back_populates="technique_page", order_by="TechniquePageVersion.version_number"
)
outgoing_links: Mapped[list[RelatedTechniqueLink]] = sa_relationship( outgoing_links: Mapped[list[RelatedTechniqueLink]] = sa_relationship(
foreign_keys="RelatedTechniqueLink.source_page_id", back_populates="source_page" foreign_keys="RelatedTechniqueLink.source_page_id", back_populates="source_page"
) )
@ -288,6 +291,27 @@ class RelatedTechniqueLink(Base):
) )
class TechniquePageVersion(Base):
"""Snapshot of a TechniquePage before a pipeline re-synthesis overwrites it."""
__tablename__ = "technique_page_versions"
id: Mapped[uuid.UUID] = _uuid_pk()
technique_page_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("technique_pages.id", ondelete="CASCADE"), nullable=False
)
version_number: Mapped[int] = mapped_column(Integer, nullable=False)
content_snapshot: Mapped[dict] = mapped_column(JSONB, nullable=False)
pipeline_metadata: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
default=_now, server_default=func.now()
)
# relationships
technique_page: Mapped[TechniquePage] = sa_relationship(
back_populates="versions"
)
class Tag(Base): class Tag(Base):
__tablename__ = "tags" __tablename__ = "tags"

View file

@ -9,6 +9,7 @@ Celery tasks are synchronous — all DB access uses ``sqlalchemy.orm.Session``.
from __future__ import annotations from __future__ import annotations
import hashlib
import json import json
import logging import logging
import time import time
@ -18,7 +19,7 @@ from pathlib import Path
import yaml import yaml
from celery import chain as celery_chain from celery import chain as celery_chain
from pydantic import ValidationError from pydantic import ValidationError
from sqlalchemy import create_engine, select from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, sessionmaker
from config import get_settings from config import get_settings
@ -28,6 +29,7 @@ from models import (
ProcessingStatus, ProcessingStatus,
SourceVideo, SourceVideo,
TechniquePage, TechniquePage,
TechniquePageVersion,
TranscriptSegment, TranscriptSegment,
) )
from pipeline.embedding_client import EmbeddingClient from pipeline.embedding_client import EmbeddingClient
@ -474,6 +476,53 @@ def _load_classification_data(video_id: str) -> list[dict]:
return json.loads(raw) return json.loads(raw)
def _capture_pipeline_metadata() -> dict:
"""Capture current pipeline configuration for version metadata.
Returns a dict with model names, prompt file SHA-256 hashes, and stage
modality settings. Handles missing prompt files gracefully.
"""
settings = get_settings()
prompts_path = Path(settings.prompts_path)
# Hash each prompt template file
prompt_hashes: dict[str, str] = {}
prompt_files = [
"stage2_segmentation.txt",
"stage3_extraction.txt",
"stage4_classification.txt",
"stage5_synthesis.txt",
]
for filename in prompt_files:
filepath = prompts_path / filename
try:
content = filepath.read_bytes()
prompt_hashes[filename] = hashlib.sha256(content).hexdigest()
except FileNotFoundError:
logger.warning("Prompt file not found for metadata capture: %s", filepath)
prompt_hashes[filename] = ""
except OSError as exc:
logger.warning("Could not read prompt file %s: %s", filepath, exc)
prompt_hashes[filename] = ""
return {
"models": {
"stage2": settings.llm_stage2_model,
"stage3": settings.llm_stage3_model,
"stage4": settings.llm_stage4_model,
"stage5": settings.llm_stage5_model,
"embedding": settings.embedding_model,
},
"modalities": {
"stage2": settings.llm_stage2_modality,
"stage3": settings.llm_stage3_modality,
"stage4": settings.llm_stage4_modality,
"stage5": settings.llm_stage5_modality,
},
"prompt_hashes": prompt_hashes,
}
# ── Stage 5: Synthesis ─────────────────────────────────────────────────────── # ── Stage 5: Synthesis ───────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) @celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
@ -564,6 +613,44 @@ def stage5_synthesis(self, video_id: str) -> str:
).scalar_one_or_none() ).scalar_one_or_none()
if existing: if existing:
# Snapshot existing content before overwriting
try:
snapshot = {
"title": existing.title,
"slug": existing.slug,
"topic_category": existing.topic_category,
"topic_tags": existing.topic_tags,
"summary": existing.summary,
"body_sections": existing.body_sections,
"signal_chains": existing.signal_chains,
"plugins": existing.plugins,
"source_quality": existing.source_quality.value if existing.source_quality else None,
}
version_count = session.execute(
select(func.count()).where(
TechniquePageVersion.technique_page_id == existing.id
)
).scalar()
version_number = version_count + 1
version = TechniquePageVersion(
technique_page_id=existing.id,
version_number=version_number,
content_snapshot=snapshot,
pipeline_metadata=_capture_pipeline_metadata(),
)
session.add(version)
logger.info(
"Version snapshot v%d created for page slug=%s",
version_number, existing.slug,
)
except Exception as snap_exc:
logger.error(
"Failed to create version snapshot for page slug=%s: %s",
existing.slug, snap_exc,
)
# Best-effort versioning — continue with page update
# Update existing page # Update existing page
existing.title = page_data.title existing.title = page_data.title
existing.summary = page_data.summary existing.summary = page_data.summary