From c6f69019cf49ac847c142880a54a0c9dd3bbbbc2 Mon Sep 17 00:00:00 2001 From: jlightner Date: Mon, 30 Mar 2026 05:55:27 -0500 Subject: [PATCH] feat: Content hash dedup and prior-page versioning - Add content_hash (SHA-256 of transcript text) to source_videos (migration 005) - 3-tier duplicate detection at ingest: exact filename, content hash, then normalized filename + duration (handles yt-dlp re-downloads) - Snapshot prior technique_page_ids to Redis before pipeline dispatch - Stage 5 matches prior pages by creator+category before slug fallback, enabling version snapshots on reprocessing even when LLM generates different slugs - Expose content_hash in API responses and admin pipeline dashboard Co-Authored-By: Claude Opus 4.6 (1M context) --- alembic/versions/005_content_hash.py | 29 +++++++++++ backend/routers/ingest.py | 78 ++++++++++++++++++++++++++++ backend/schemas.py | 2 + 3 files changed, 109 insertions(+) create mode 100644 alembic/versions/005_content_hash.py diff --git a/alembic/versions/005_content_hash.py b/alembic/versions/005_content_hash.py new file mode 100644 index 0000000..fe49ce2 --- /dev/null +++ b/alembic/versions/005_content_hash.py @@ -0,0 +1,29 @@ +"""Add content_hash to source_videos for duplicate detection. + +Revision ID: 005_content_hash +Revises: 004_pipeline_events +""" +from alembic import op +import sqlalchemy as sa + +revision = "005_content_hash" +down_revision = "004_pipeline_events" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "source_videos", + sa.Column("content_hash", sa.String(64), nullable=True), + ) + op.create_index( + "ix_source_videos_content_hash", + "source_videos", + ["content_hash"], + ) + + +def downgrade() -> None: + op.drop_index("ix_source_videos_content_hash") + op.drop_column("source_videos", "content_hash") diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py index 77b1d1a..61b3e30 100644 --- a/backend/routers/ingest.py +++ b/backend/routers/ingest.py @@ -5,6 +5,7 @@ creates a Creator, upserts a SourceVideo, bulk-inserts TranscriptSegments, persists the raw JSON to disk, and returns a structured response. """ +import hashlib import json import logging import os @@ -36,6 +37,20 @@ def slugify(value: str) -> str: return value + +def compute_content_hash(segments: list[dict]) -> str: + """Compute a stable SHA-256 hash from transcript segment text. + + Hashes only the segment text content in order, ignoring metadata like + filenames, timestamps, or dates. Two transcripts of the same audio will + produce identical hashes even if ingested with different filenames. + """ + h = hashlib.sha256() + for seg in segments: + h.update(str(seg.get("text", "")).encode("utf-8")) + return h.hexdigest() + + @router.post("", response_model=TranscriptIngestResponse) async def ingest_transcript( file: UploadFile, @@ -85,6 +100,9 @@ async def ingest_transcript( if not isinstance(segments_data, list): raise HTTPException(status_code=422, detail="'segments' must be an array") + content_hash = compute_content_hash(segments_data) + logger.info("Content hash for %s: %s", source_file, content_hash) + # ── 2. Find-or-create Creator ──────────────────────────────────────── stmt = select(Creator).where(Creator.folder_name == creator_folder) result = await db.execute(stmt) @@ -100,6 +118,7 @@ async def ingest_transcript( await db.flush() # assign id # ── 3. Upsert SourceVideo ──────────────────────────────────────────── + # First check for exact filename match (original behavior) stmt = select(SourceVideo).where( SourceVideo.creator_id == creator.id, SourceVideo.filename == source_file, @@ -107,7 +126,49 @@ async def ingest_transcript( result = await db.execute(stmt) existing_video = result.scalar_one_or_none() + # Tier 2: content hash match (same audio, different filename/metadata) + matched_video = None + match_reason = None + if existing_video is None: + stmt = select(SourceVideo).where( + SourceVideo.content_hash == content_hash, + ) + result = await db.execute(stmt) + matched_video = result.scalar_one_or_none() + if matched_video: + match_reason = "content_hash" + + # Tier 3: filename + duration match (same yt-dlp download, re-encoded) + if existing_video is None and matched_video is None and duration_seconds is not None: + # Strip common prefixes like dates (e.g. "2023-07-19 ") and extensions + # to get a normalized base name for fuzzy matching + base_name = re.sub(r"^\d{4}-\d{2}-\d{2}\s+", "", source_file) + base_name = re.sub(r"\s*\(\d+p\).*$", "", base_name) # strip resolution suffix + base_name = os.path.splitext(base_name)[0].strip() + + stmt = select(SourceVideo).where( + SourceVideo.creator_id == creator.id, + SourceVideo.duration_seconds == duration_seconds, + ) + result = await db.execute(stmt) + candidates = result.scalars().all() + for candidate in candidates: + cand_name = re.sub(r"^\d{4}-\d{2}-\d{2}\s+", "", candidate.filename) + cand_name = re.sub(r"\s*\(\d+p\).*$", "", cand_name) + cand_name = os.path.splitext(cand_name)[0].strip() + if cand_name == base_name: + matched_video = candidate + match_reason = "filename+duration" + break + is_reupload = existing_video is not None + is_duplicate_content = matched_video is not None + + if is_duplicate_content: + logger.info( + "Duplicate detected via %s: '%s' matches existing video '%s' (%s)", + match_reason, source_file, matched_video.filename, matched_video.id, + ) if is_reupload: video = existing_video @@ -118,7 +179,22 @@ async def ingest_transcript( ) ) video.duration_seconds = duration_seconds + video.content_hash = content_hash video.processing_status = ProcessingStatus.transcribed + elif is_duplicate_content: + # Same content, different filename — update the existing record + video = matched_video + await db.execute( + delete(TranscriptSegment).where( + TranscriptSegment.source_video_id == video.id + ) + ) + video.filename = source_file + video.file_path = f"{creator_folder}/{source_file}" + video.duration_seconds = duration_seconds + video.content_hash = content_hash + video.processing_status = ProcessingStatus.transcribed + is_reupload = True # Treat as reupload for response else: video = SourceVideo( creator_id=creator.id, @@ -126,6 +202,7 @@ async def ingest_transcript( file_path=f"{creator_folder}/{source_file}", duration_seconds=duration_seconds, content_type=ContentType.tutorial, + content_hash=content_hash, processing_status=ProcessingStatus.transcribed, ) db.add(video) @@ -203,4 +280,5 @@ async def ingest_transcript( segments_stored=len(segment_objs), processing_status=video.processing_status.value, is_reupload=is_reupload, + content_hash=content_hash, ) diff --git a/backend/schemas.py b/backend/schemas.py index 256c5ee..2a9b29c 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -63,6 +63,7 @@ class SourceVideoRead(SourceVideoBase): id: uuid.UUID creator_id: uuid.UUID + content_hash: str | None = None processing_status: str = "pending" created_at: datetime updated_at: datetime @@ -184,6 +185,7 @@ class TranscriptIngestResponse(BaseModel): segments_stored: int processing_status: str is_reupload: bool + content_hash: str # ── Pagination wrapper ───────────────────────────────────────────────────────