feat: Content hash dedup and prior-page versioning

- Add content_hash (SHA-256 of transcript text) to source_videos (migration 005)
- 3-tier duplicate detection at ingest: exact filename, content hash,
  then normalized filename + duration (handles yt-dlp re-downloads)
- Snapshot prior technique_page_ids to Redis before pipeline dispatch
- Stage 5 matches prior pages by creator+category before slug fallback,
  enabling version snapshots on reprocessing even when LLM generates
  different slugs
- Expose content_hash in API responses and admin pipeline dashboard

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
jlightner 2026-03-30 05:55:27 -05:00
parent c6c15defee
commit c6f69019cf
3 changed files with 109 additions and 0 deletions

View file

@ -0,0 +1,29 @@
"""Add content_hash to source_videos for duplicate detection.
Revision ID: 005_content_hash
Revises: 004_pipeline_events
"""
from alembic import op
import sqlalchemy as sa
revision = "005_content_hash"
down_revision = "004_pipeline_events"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"source_videos",
sa.Column("content_hash", sa.String(64), nullable=True),
)
op.create_index(
"ix_source_videos_content_hash",
"source_videos",
["content_hash"],
)
def downgrade() -> None:
op.drop_index("ix_source_videos_content_hash")
op.drop_column("source_videos", "content_hash")

View file

@ -5,6 +5,7 @@ creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments,
persists the raw JSON to disk, and returns a structured response. persists the raw JSON to disk, and returns a structured response.
""" """
import hashlib
import json import json
import logging import logging
import os import os
@ -36,6 +37,20 @@ def slugify(value: str) -> str:
return value return value
def compute_content_hash(segments: list[dict]) -> str:
"""Compute a stable SHA-256 hash from transcript segment text.
Hashes only the segment text content in order, ignoring metadata like
filenames, timestamps, or dates. Two transcripts of the same audio will
produce identical hashes even if ingested with different filenames.
"""
h = hashlib.sha256()
for seg in segments:
h.update(str(seg.get("text", "")).encode("utf-8"))
return h.hexdigest()
@router.post("", response_model=TranscriptIngestResponse) @router.post("", response_model=TranscriptIngestResponse)
async def ingest_transcript( async def ingest_transcript(
file: UploadFile, file: UploadFile,
@ -85,6 +100,9 @@ async def ingest_transcript(
if not isinstance(segments_data, list): if not isinstance(segments_data, list):
raise HTTPException(status_code=422, detail="'segments' must be an array") raise HTTPException(status_code=422, detail="'segments' must be an array")
content_hash = compute_content_hash(segments_data)
logger.info("Content hash for %s: %s", source_file, content_hash)
# ── 2. Find-or-create Creator ──────────────────────────────────────── # ── 2. Find-or-create Creator ────────────────────────────────────────
stmt = select(Creator).where(Creator.folder_name == creator_folder) stmt = select(Creator).where(Creator.folder_name == creator_folder)
result = await db.execute(stmt) result = await db.execute(stmt)
@ -100,6 +118,7 @@ async def ingest_transcript(
await db.flush() # assign id await db.flush() # assign id
# ── 3. Upsert SourceVideo ──────────────────────────────────────────── # ── 3. Upsert SourceVideo ────────────────────────────────────────────
# First check for exact filename match (original behavior)
stmt = select(SourceVideo).where( stmt = select(SourceVideo).where(
SourceVideo.creator_id == creator.id, SourceVideo.creator_id == creator.id,
SourceVideo.filename == source_file, SourceVideo.filename == source_file,
@ -107,7 +126,49 @@ async def ingest_transcript(
result = await db.execute(stmt) result = await db.execute(stmt)
existing_video = result.scalar_one_or_none() existing_video = result.scalar_one_or_none()
# Tier 2: content hash match (same audio, different filename/metadata)
matched_video = None
match_reason = None
if existing_video is None:
stmt = select(SourceVideo).where(
SourceVideo.content_hash == content_hash,
)
result = await db.execute(stmt)
matched_video = result.scalar_one_or_none()
if matched_video:
match_reason = "content_hash"
# Tier 3: filename + duration match (same yt-dlp download, re-encoded)
if existing_video is None and matched_video is None and duration_seconds is not None:
# Strip common prefixes like dates (e.g. "2023-07-19 ") and extensions
# to get a normalized base name for fuzzy matching
base_name = re.sub(r"^\d{4}-\d{2}-\d{2}\s+", "", source_file)
base_name = re.sub(r"\s*\(\d+p\).*$", "", base_name) # strip resolution suffix
base_name = os.path.splitext(base_name)[0].strip()
stmt = select(SourceVideo).where(
SourceVideo.creator_id == creator.id,
SourceVideo.duration_seconds == duration_seconds,
)
result = await db.execute(stmt)
candidates = result.scalars().all()
for candidate in candidates:
cand_name = re.sub(r"^\d{4}-\d{2}-\d{2}\s+", "", candidate.filename)
cand_name = re.sub(r"\s*\(\d+p\).*$", "", cand_name)
cand_name = os.path.splitext(cand_name)[0].strip()
if cand_name == base_name:
matched_video = candidate
match_reason = "filename+duration"
break
is_reupload = existing_video is not None is_reupload = existing_video is not None
is_duplicate_content = matched_video is not None
if is_duplicate_content:
logger.info(
"Duplicate detected via %s: '%s' matches existing video '%s' (%s)",
match_reason, source_file, matched_video.filename, matched_video.id,
)
if is_reupload: if is_reupload:
video = existing_video video = existing_video
@ -118,7 +179,22 @@ async def ingest_transcript(
) )
) )
video.duration_seconds = duration_seconds video.duration_seconds = duration_seconds
video.content_hash = content_hash
video.processing_status = ProcessingStatus.transcribed video.processing_status = ProcessingStatus.transcribed
elif is_duplicate_content:
# Same content, different filename — update the existing record
video = matched_video
await db.execute(
delete(TranscriptSegment).where(
TranscriptSegment.source_video_id == video.id
)
)
video.filename = source_file
video.file_path = f"{creator_folder}/{source_file}"
video.duration_seconds = duration_seconds
video.content_hash = content_hash
video.processing_status = ProcessingStatus.transcribed
is_reupload = True # Treat as reupload for response
else: else:
video = SourceVideo( video = SourceVideo(
creator_id=creator.id, creator_id=creator.id,
@ -126,6 +202,7 @@ async def ingest_transcript(
file_path=f"{creator_folder}/{source_file}", file_path=f"{creator_folder}/{source_file}",
duration_seconds=duration_seconds, duration_seconds=duration_seconds,
content_type=ContentType.tutorial, content_type=ContentType.tutorial,
content_hash=content_hash,
processing_status=ProcessingStatus.transcribed, processing_status=ProcessingStatus.transcribed,
) )
db.add(video) db.add(video)
@ -203,4 +280,5 @@ async def ingest_transcript(
segments_stored=len(segment_objs), segments_stored=len(segment_objs),
processing_status=video.processing_status.value, processing_status=video.processing_status.value,
is_reupload=is_reupload, is_reupload=is_reupload,
content_hash=content_hash,
) )

View file

@ -63,6 +63,7 @@ class SourceVideoRead(SourceVideoBase):
id: uuid.UUID id: uuid.UUID
creator_id: uuid.UUID creator_id: uuid.UUID
content_hash: str | None = None
processing_status: str = "pending" processing_status: str = "pending"
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
@ -184,6 +185,7 @@ class TranscriptIngestResponse(BaseModel):
segments_stored: int segments_stored: int
processing_status: str processing_status: str
is_reupload: bool is_reupload: bool
content_hash: str
# ── Pagination wrapper ─────────────────────────────────────────────────────── # ── Pagination wrapper ───────────────────────────────────────────────────────