diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 5219007..9de778f 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -2,7 +2,9 @@ Provides: - Async SQLAlchemy engine/session against a real PostgreSQL test database +- Sync SQLAlchemy engine/session for pipeline stage tests (Celery stages are sync) - httpx.AsyncClient wired to the FastAPI app with dependency overrides +- Pre-ingest fixture for pipeline tests - Sample transcript fixture path and temporary storage directory Key design choice: function-scoped engine with NullPool avoids asyncpg @@ -10,13 +12,17 @@ Key design choice: function-scoped engine with NullPool avoids asyncpg reuse between the ASGI test client and verification queries. """ +import json import os import pathlib +import uuid import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient +from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import NullPool # Ensure backend/ is on sys.path so "from models import ..." works @@ -25,12 +31,23 @@ sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent)) from database import Base, get_session # noqa: E402 from main import app # noqa: E402 +from models import ( # noqa: E402 + ContentType, + Creator, + ProcessingStatus, + SourceVideo, + TranscriptSegment, +) TEST_DATABASE_URL = os.getenv( "TEST_DATABASE_URL", "postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test", ) +TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace( + "postgresql+asyncpg://", "postgresql+psycopg2://" +) + @pytest_asyncio.fixture() async def db_engine(): @@ -91,3 +108,85 @@ def sample_transcript_path() -> pathlib.Path: def tmp_transcript_dir(tmp_path) -> pathlib.Path: """Temporary directory for transcript storage during tests.""" return tmp_path + + +# ── Sync engine/session for pipeline stages ────────────────────────────────── + + +@pytest.fixture() +def sync_engine(db_engine): + """Create a sync SQLAlchemy engine pointing at the test database. + + Tables are already created/dropped by the async ``db_engine`` fixture, + so this fixture just wraps a sync engine around the same DB URL. + """ + engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool) + yield engine + engine.dispose() + + +@pytest.fixture() +def sync_session(sync_engine) -> Session: + """Create a sync SQLAlchemy session for pipeline stage tests.""" + factory = sessionmaker(bind=sync_engine) + session = factory() + yield session + session.close() + + +# ── Pre-ingest fixture for pipeline tests ──────────────────────────────────── + + +@pytest.fixture() +def pre_ingested_video(sync_engine): + """Ingest the sample transcript directly into the test DB via sync ORM. + + Returns a dict with ``video_id``, ``creator_id``, and ``segment_count``. + """ + factory = sessionmaker(bind=sync_engine) + session = factory() + try: + # Create creator + creator = Creator( + name="Skope", + slug="skope", + folder_name="Skope", + ) + session.add(creator) + session.flush() + + # Create video + video = SourceVideo( + creator_id=creator.id, + filename="mixing-basics-ep1.mp4", + file_path="Skope/mixing-basics-ep1.mp4", + duration_seconds=1234, + content_type=ContentType.tutorial, + processing_status=ProcessingStatus.transcribed, + ) + session.add(video) + session.flush() + + # Create transcript segments + sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json" + data = json.loads(sample.read_text()) + for idx, seg in enumerate(data["segments"]): + session.add(TranscriptSegment( + source_video_id=video.id, + start_time=float(seg["start"]), + end_time=float(seg["end"]), + text=str(seg["text"]), + segment_index=idx, + )) + + session.commit() + + result = { + "video_id": str(video.id), + "creator_id": str(creator.id), + "segment_count": len(data["segments"]), + } + finally: + session.close() + + return result diff --git a/backend/tests/fixtures/mock_llm_responses.py b/backend/tests/fixtures/mock_llm_responses.py new file mode 100644 index 0000000..1b76be0 --- /dev/null +++ b/backend/tests/fixtures/mock_llm_responses.py @@ -0,0 +1,111 @@ +"""Mock LLM and embedding responses for pipeline integration tests. + +Each response is a JSON string matching the Pydantic schema for that stage. +The sample transcript has 5 segments about gain staging, so mock responses +reflect that content. +""" + +import json +import random + +# ── Stage 2: Segmentation ─────────────────────────────────────────────────── + +STAGE2_SEGMENTATION_RESPONSE = json.dumps({ + "segments": [ + { + "start_index": 0, + "end_index": 1, + "topic_label": "Introduction", + "summary": "Introduces the episode about mixing basics and gain staging.", + }, + { + "start_index": 2, + "end_index": 4, + "topic_label": "Gain Staging Technique", + "summary": "Covers practical steps for gain staging including setting levels and avoiding clipping.", + }, + ] +}) + +# ── Stage 3: Extraction ───────────────────────────────────────────────────── + +STAGE3_EXTRACTION_RESPONSE = json.dumps({ + "moments": [ + { + "title": "Setting Levels for Gain Staging", + "summary": "Demonstrates the process of setting proper gain levels across the signal chain to maintain headroom.", + "start_time": 12.8, + "end_time": 28.5, + "content_type": "technique", + "plugins": ["Pro-Q 3"], + "raw_transcript": "First thing you want to do is set your levels. Make sure nothing is clipping on the master bus.", + }, + { + "title": "Master Bus Clipping Prevention", + "summary": "Explains how to monitor and prevent clipping on the master bus during a mix session.", + "start_time": 20.1, + "end_time": 35.0, + "content_type": "settings", + "plugins": [], + "raw_transcript": "Make sure nothing is clipping on the master bus. That wraps up this quick overview.", + }, + ] +}) + +# ── Stage 4: Classification ───────────────────────────────────────────────── + +STAGE4_CLASSIFICATION_RESPONSE = json.dumps({ + "classifications": [ + { + "moment_index": 0, + "topic_category": "Mixing", + "topic_tags": ["gain staging", "eq"], + "content_type_override": None, + }, + { + "moment_index": 1, + "topic_category": "Mixing", + "topic_tags": ["gain staging", "bus processing"], + "content_type_override": None, + }, + ] +}) + +# ── Stage 5: Synthesis ─────────────────────────────────────────────────────── + +STAGE5_SYNTHESIS_RESPONSE = json.dumps({ + "pages": [ + { + "title": "Gain Staging in Mixing", + "slug": "gain-staging-in-mixing", + "topic_category": "Mixing", + "topic_tags": ["gain staging"], + "summary": "A comprehensive guide to gain staging in a mixing context, covering level setting and master bus management.", + "body_sections": { + "Overview": "Gain staging ensures each stage of the signal chain operates at optimal levels.", + "Steps": "1. Set input levels. 2. Check bus levels. 3. Monitor master output.", + }, + "signal_chains": [ + {"chain": "Input -> Channel Strip -> Bus -> Master", "notes": "Keep headroom at each stage."} + ], + "plugins": ["Pro-Q 3"], + "source_quality": "structured", + } + ] +}) + +# ── Embedding response ─────────────────────────────────────────────────────── + + +def make_mock_embedding(dim: int = 768) -> list[float]: + """Generate a deterministic-seeded mock embedding vector.""" + rng = random.Random(42) + return [rng.uniform(-1, 1) for _ in range(dim)] + + +def make_mock_embeddings(n: int, dim: int = 768) -> list[list[float]]: + """Generate n distinct mock embedding vectors.""" + return [ + [random.Random(42 + i).uniform(-1, 1) for _ in range(dim)] + for i in range(n) + ] diff --git a/backend/tests/test_pipeline.py b/backend/tests/test_pipeline.py new file mode 100644 index 0000000..b0d01a3 --- /dev/null +++ b/backend/tests/test_pipeline.py @@ -0,0 +1,739 @@ +"""Integration tests for the LLM extraction pipeline. + +Tests run against a real PostgreSQL test database with mocked LLM and Qdrant +clients. Pipeline stages are sync (Celery tasks), so tests call stage +functions directly with sync SQLAlchemy sessions. + +Tests (a)–(f) call pipeline stages directly. Tests (g)–(i) use the async +HTTP client. Test (j) verifies LLM fallback logic. +""" + +from __future__ import annotations + +import json +import os +import pathlib +import uuid +from unittest.mock import MagicMock, patch, PropertyMock + +import openai +import pytest +from sqlalchemy import create_engine, select +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import NullPool + +from models import ( + Creator, + KeyMoment, + KeyMomentContentType, + ProcessingStatus, + SourceVideo, + TechniquePage, + TranscriptSegment, +) +from pipeline.schemas import ( + ClassificationResult, + ExtractionResult, + SegmentationResult, + SynthesisResult, +) + +from tests.fixtures.mock_llm_responses import ( + STAGE2_SEGMENTATION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE4_CLASSIFICATION_RESPONSE, + STAGE5_SYNTHESIS_RESPONSE, + make_mock_embeddings, +) + +# ── Test database URL ──────────────────────────────────────────────────────── + +TEST_DATABASE_URL_SYNC = os.getenv( + "TEST_DATABASE_URL", + "postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test", +).replace("postgresql+asyncpg://", "postgresql+psycopg2://") + + +# ── Helpers ────────────────────────────────────────────────────────────────── + + +def _make_mock_openai_response(content: str): + """Build a mock OpenAI ChatCompletion response object.""" + mock_message = MagicMock() + mock_message.content = content + + mock_choice = MagicMock() + mock_choice.message = mock_message + + mock_response = MagicMock() + mock_response.choices = [mock_choice] + return mock_response + + +def _make_mock_embedding_response(vectors: list[list[float]]): + """Build a mock OpenAI Embedding response object.""" + mock_items = [] + for i, vec in enumerate(vectors): + item = MagicMock() + item.embedding = vec + item.index = i + mock_items.append(item) + + mock_response = MagicMock() + mock_response.data = mock_items + return mock_response + + +def _patch_pipeline_engine(sync_engine): + """Patch the pipeline.stages module to use the test sync engine/session.""" + return [ + patch("pipeline.stages._engine", sync_engine), + patch( + "pipeline.stages._SessionLocal", + sessionmaker(bind=sync_engine), + ), + ] + + +def _patch_llm_completions(side_effect_fn): + """Patch openai.OpenAI so all instances share a mocked chat.completions.create.""" + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = side_effect_fn + return patch("openai.OpenAI", return_value=mock_client) + + +def _create_canonical_tags_file(tmp_path: pathlib.Path) -> pathlib.Path: + """Write a minimal canonical_tags.yaml for stage4 to load.""" + config_dir = tmp_path / "config" + config_dir.mkdir(exist_ok=True) + tags_path = config_dir / "canonical_tags.yaml" + tags_path.write_text( + "categories:\n" + " - name: Mixing\n" + " description: Balancing and processing elements\n" + " sub_topics: [eq, compression, gain staging, bus processing]\n" + " - name: Sound design\n" + " description: Creating sounds\n" + " sub_topics: [bass, drums]\n" + ) + return tags_path + + +# ── (a) Stage 2: Segmentation ─────────────────────────────────────────────── + + +def test_stage2_segmentation_updates_topic_labels( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """Stage 2 should update topic_label on each TranscriptSegment.""" + video_id = pre_ingested_video["video_id"] + + # Create prompts directory + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "stage2_segmentation.txt").write_text("You are a segmentation assistant.") + + # Build the mock LLM that returns the segmentation response + def llm_side_effect(**kwargs): + return _make_mock_openai_response(STAGE2_SEGMENTATION_RESPONSE) + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + with _patch_llm_completions(llm_side_effect), \ + patch("pipeline.stages.get_settings") as mock_settings: + s = MagicMock() + s.prompts_path = str(prompts_dir) + s.llm_api_url = "http://mock:11434/v1" + s.llm_api_key = "sk-test" + s.llm_model = "test-model" + s.llm_fallback_url = "http://mock:11434/v1" + s.llm_fallback_model = "test-model" + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + mock_settings.return_value = s + + # Import and call stage directly (not via Celery) + from pipeline.stages import stage2_segmentation + + result = stage2_segmentation(video_id) + assert result == video_id + + for p in patches: + p.stop() + + # Verify: check topic_label on segments + factory = sessionmaker(bind=sync_engine) + session = factory() + try: + segments = ( + session.execute( + select(TranscriptSegment) + .where(TranscriptSegment.source_video_id == video_id) + .order_by(TranscriptSegment.segment_index) + ) + .scalars() + .all() + ) + # Segments 0,1 should have "Introduction", segments 2,3,4 should have "Gain Staging Technique" + assert segments[0].topic_label == "Introduction" + assert segments[1].topic_label == "Introduction" + assert segments[2].topic_label == "Gain Staging Technique" + assert segments[3].topic_label == "Gain Staging Technique" + assert segments[4].topic_label == "Gain Staging Technique" + finally: + session.close() + + +# ── (b) Stage 3: Extraction ───────────────────────────────────────────────── + + +def test_stage3_extraction_creates_key_moments( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """Stages 2+3 should create KeyMoment rows and set processing_status=extracted.""" + video_id = pre_ingested_video["video_id"] + + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.") + (prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.") + + call_count = {"n": 0} + responses = [STAGE2_SEGMENTATION_RESPONSE, STAGE3_EXTRACTION_RESPONSE, STAGE3_EXTRACTION_RESPONSE] + + def llm_side_effect(**kwargs): + idx = min(call_count["n"], len(responses) - 1) + resp = responses[idx] + call_count["n"] += 1 + return _make_mock_openai_response(resp) + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + with _patch_llm_completions(llm_side_effect), \ + patch("pipeline.stages.get_settings") as mock_settings: + s = MagicMock() + s.prompts_path = str(prompts_dir) + s.llm_api_url = "http://mock:11434/v1" + s.llm_api_key = "sk-test" + s.llm_model = "test-model" + s.llm_fallback_url = "http://mock:11434/v1" + s.llm_fallback_model = "test-model" + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + mock_settings.return_value = s + + from pipeline.stages import stage2_segmentation, stage3_extraction + + stage2_segmentation(video_id) + stage3_extraction(video_id) + + for p in patches: + p.stop() + + # Verify key moments created + factory = sessionmaker(bind=sync_engine) + session = factory() + try: + moments = ( + session.execute( + select(KeyMoment) + .where(KeyMoment.source_video_id == video_id) + .order_by(KeyMoment.start_time) + ) + .scalars() + .all() + ) + # Two topic groups → extraction called twice → up to 4 moments + # (2 per group from the mock response) + assert len(moments) >= 2 + assert moments[0].title == "Setting Levels for Gain Staging" + assert moments[0].content_type == KeyMomentContentType.technique + + # Verify processing_status + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + assert video.processing_status == ProcessingStatus.extracted + finally: + session.close() + + +# ── (c) Stage 4: Classification ───────────────────────────────────────────── + + +def test_stage4_classification_assigns_tags( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """Stages 2+3+4 should store classification data in Redis.""" + video_id = pre_ingested_video["video_id"] + + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.") + (prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.") + (prompts_dir / "stage4_classification.txt").write_text("Classification assistant.") + + _create_canonical_tags_file(tmp_path) + + call_count = {"n": 0} + responses = [ + STAGE2_SEGMENTATION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE4_CLASSIFICATION_RESPONSE, + ] + + def llm_side_effect(**kwargs): + idx = min(call_count["n"], len(responses) - 1) + resp = responses[idx] + call_count["n"] += 1 + return _make_mock_openai_response(resp) + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + stored_cls_data = {} + + def mock_store_classification(vid, data): + stored_cls_data[vid] = data + + with _patch_llm_completions(llm_side_effect), \ + patch("pipeline.stages.get_settings") as mock_settings, \ + patch("pipeline.stages._load_canonical_tags") as mock_tags, \ + patch("pipeline.stages._store_classification_data", side_effect=mock_store_classification): + s = MagicMock() + s.prompts_path = str(prompts_dir) + s.llm_api_url = "http://mock:11434/v1" + s.llm_api_key = "sk-test" + s.llm_model = "test-model" + s.llm_fallback_url = "http://mock:11434/v1" + s.llm_fallback_model = "test-model" + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + s.review_mode = True + mock_settings.return_value = s + + mock_tags.return_value = { + "categories": [ + {"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging", "eq"]}, + ] + } + + from pipeline.stages import stage2_segmentation, stage3_extraction, stage4_classification + + stage2_segmentation(video_id) + stage3_extraction(video_id) + stage4_classification(video_id) + + for p in patches: + p.stop() + + # Verify classification data was stored + assert video_id in stored_cls_data + cls_data = stored_cls_data[video_id] + assert len(cls_data) >= 1 + assert cls_data[0]["topic_category"] == "Mixing" + assert "gain staging" in cls_data[0]["topic_tags"] + + +# ── (d) Stage 5: Synthesis ────────────────────────────────────────────────── + + +def test_stage5_synthesis_creates_technique_pages( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """Full pipeline stages 2-5 should create TechniquePage rows linked to KeyMoments.""" + video_id = pre_ingested_video["video_id"] + + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.") + (prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.") + (prompts_dir / "stage4_classification.txt").write_text("Classification assistant.") + (prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.") + + call_count = {"n": 0} + responses = [ + STAGE2_SEGMENTATION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE4_CLASSIFICATION_RESPONSE, + STAGE5_SYNTHESIS_RESPONSE, + ] + + def llm_side_effect(**kwargs): + idx = min(call_count["n"], len(responses) - 1) + resp = responses[idx] + call_count["n"] += 1 + return _make_mock_openai_response(resp) + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + # Mock classification data in Redis (simulate stage 4 having stored it) + mock_cls_data = [ + {"moment_id": "will-be-replaced", "topic_category": "Mixing", "topic_tags": ["gain staging"]}, + ] + + with _patch_llm_completions(llm_side_effect), \ + patch("pipeline.stages.get_settings") as mock_settings, \ + patch("pipeline.stages._load_canonical_tags") as mock_tags, \ + patch("pipeline.stages._store_classification_data"), \ + patch("pipeline.stages._load_classification_data") as mock_load_cls: + s = MagicMock() + s.prompts_path = str(prompts_dir) + s.llm_api_url = "http://mock:11434/v1" + s.llm_api_key = "sk-test" + s.llm_model = "test-model" + s.llm_fallback_url = "http://mock:11434/v1" + s.llm_fallback_model = "test-model" + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + s.review_mode = True + mock_settings.return_value = s + + mock_tags.return_value = { + "categories": [ + {"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]}, + ] + } + + from pipeline.stages import ( + stage2_segmentation, + stage3_extraction, + stage4_classification, + stage5_synthesis, + ) + + stage2_segmentation(video_id) + stage3_extraction(video_id) + stage4_classification(video_id) + + # Now set up mock_load_cls to return data with real moment IDs + factory = sessionmaker(bind=sync_engine) + sess = factory() + real_moments = ( + sess.execute( + select(KeyMoment).where(KeyMoment.source_video_id == video_id) + ) + .scalars() + .all() + ) + real_cls = [ + {"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]} + for m in real_moments + ] + sess.close() + mock_load_cls.return_value = real_cls + + stage5_synthesis(video_id) + + for p in patches: + p.stop() + + # Verify TechniquePages created + factory = sessionmaker(bind=sync_engine) + session = factory() + try: + pages = session.execute(select(TechniquePage)).scalars().all() + assert len(pages) >= 1 + page = pages[0] + assert page.title == "Gain Staging in Mixing" + assert page.body_sections is not None + assert "Overview" in page.body_sections + assert page.signal_chains is not None + assert len(page.signal_chains) >= 1 + assert page.summary is not None + + # Verify KeyMoments are linked to the TechniquePage + moments = ( + session.execute( + select(KeyMoment).where(KeyMoment.technique_page_id == page.id) + ) + .scalars() + .all() + ) + assert len(moments) >= 1 + + # Verify processing_status updated + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + assert video.processing_status == ProcessingStatus.reviewed + finally: + session.close() + + +# ── (e) Stage 6: Embed & Index ────────────────────────────────────────────── + + +def test_stage6_embeds_and_upserts_to_qdrant( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """Full pipeline through stage 6 should call EmbeddingClient and QdrantManager.""" + video_id = pre_ingested_video["video_id"] + + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.") + (prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.") + (prompts_dir / "stage4_classification.txt").write_text("Classification assistant.") + (prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.") + + call_count = {"n": 0} + responses = [ + STAGE2_SEGMENTATION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE3_EXTRACTION_RESPONSE, + STAGE4_CLASSIFICATION_RESPONSE, + STAGE5_SYNTHESIS_RESPONSE, + ] + + def llm_side_effect(**kwargs): + idx = min(call_count["n"], len(responses) - 1) + resp = responses[idx] + call_count["n"] += 1 + return _make_mock_openai_response(resp) + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + mock_embed_client = MagicMock() + mock_embed_client.embed.side_effect = lambda texts: make_mock_embeddings(len(texts)) + + mock_qdrant_mgr = MagicMock() + + with _patch_llm_completions(llm_side_effect), \ + patch("pipeline.stages.get_settings") as mock_settings, \ + patch("pipeline.stages._load_canonical_tags") as mock_tags, \ + patch("pipeline.stages._store_classification_data"), \ + patch("pipeline.stages._load_classification_data") as mock_load_cls, \ + patch("pipeline.stages.EmbeddingClient", return_value=mock_embed_client), \ + patch("pipeline.stages.QdrantManager", return_value=mock_qdrant_mgr): + s = MagicMock() + s.prompts_path = str(prompts_dir) + s.llm_api_url = "http://mock:11434/v1" + s.llm_api_key = "sk-test" + s.llm_model = "test-model" + s.llm_fallback_url = "http://mock:11434/v1" + s.llm_fallback_model = "test-model" + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + s.review_mode = True + s.embedding_api_url = "http://mock:11434/v1" + s.embedding_model = "test-embed" + s.embedding_dimensions = 768 + s.qdrant_url = "http://mock:6333" + s.qdrant_collection = "test_collection" + mock_settings.return_value = s + + mock_tags.return_value = { + "categories": [ + {"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]}, + ] + } + + from pipeline.stages import ( + stage2_segmentation, + stage3_extraction, + stage4_classification, + stage5_synthesis, + stage6_embed_and_index, + ) + + stage2_segmentation(video_id) + stage3_extraction(video_id) + stage4_classification(video_id) + + # Load real moment IDs for classification data mock + factory = sessionmaker(bind=sync_engine) + sess = factory() + real_moments = ( + sess.execute( + select(KeyMoment).where(KeyMoment.source_video_id == video_id) + ) + .scalars() + .all() + ) + real_cls = [ + {"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]} + for m in real_moments + ] + sess.close() + mock_load_cls.return_value = real_cls + + stage5_synthesis(video_id) + stage6_embed_and_index(video_id) + + for p in patches: + p.stop() + + # Verify EmbeddingClient.embed was called + assert mock_embed_client.embed.called + # Verify QdrantManager methods called + mock_qdrant_mgr.ensure_collection.assert_called_once() + assert ( + mock_qdrant_mgr.upsert_technique_pages.called + or mock_qdrant_mgr.upsert_key_moments.called + ), "Expected at least one upsert call to QdrantManager" + + +# ── (f) Resumability ──────────────────────────────────────────────────────── + + +def test_run_pipeline_resumes_from_extracted( + db_engine, sync_engine, pre_ingested_video, tmp_path +): + """When status=extracted, run_pipeline should skip stages 2+3 and run 4+5+6.""" + video_id = pre_ingested_video["video_id"] + + # Set video status to "extracted" directly + factory = sessionmaker(bind=sync_engine) + session = factory() + video = session.execute( + select(SourceVideo).where(SourceVideo.id == video_id) + ).scalar_one() + video.processing_status = ProcessingStatus.extracted + session.commit() + session.close() + + patches = _patch_pipeline_engine(sync_engine) + for p in patches: + p.start() + + with patch("pipeline.stages.get_settings") as mock_settings, \ + patch("pipeline.stages.stage2_segmentation") as mock_s2, \ + patch("pipeline.stages.stage3_extraction") as mock_s3, \ + patch("pipeline.stages.stage4_classification") as mock_s4, \ + patch("pipeline.stages.stage5_synthesis") as mock_s5, \ + patch("pipeline.stages.stage6_embed_and_index") as mock_s6, \ + patch("pipeline.stages.celery_chain") as mock_chain: + s = MagicMock() + s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg") + mock_settings.return_value = s + + # Mock chain to inspect what stages it gets + mock_pipeline = MagicMock() + mock_chain.return_value = mock_pipeline + + # Mock the .s() method on each task + mock_s2.s = MagicMock(return_value="s2_sig") + mock_s3.s = MagicMock(return_value="s3_sig") + mock_s4.s = MagicMock(return_value="s4_sig") + mock_s5.s = MagicMock(return_value="s5_sig") + mock_s6.s = MagicMock(return_value="s6_sig") + + from pipeline.stages import run_pipeline + + run_pipeline(video_id) + + # Verify: stages 2 and 3 should NOT have .s() called with video_id + mock_s2.s.assert_not_called() + mock_s3.s.assert_not_called() + + # Stages 4, 5, 6 should have .s() called + mock_s4.s.assert_called_once_with(video_id) + mock_s5.s.assert_called_once() + mock_s6.s.assert_called_once() + + for p in patches: + p.stop() + + +# ── (g) Pipeline trigger endpoint ─────────────────────────────────────────── + + +async def test_pipeline_trigger_endpoint(client, db_engine): + """POST /api/v1/pipeline/trigger/{video_id} with valid video returns 200.""" + # Ingest a transcript first to create a video + sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json" + + with patch("routers.ingest.run_pipeline", create=True) as mock_rp: + mock_rp.delay = MagicMock() + resp = await client.post( + "/api/v1/ingest", + files={"file": (sample.name, sample.read_bytes(), "application/json")}, + ) + assert resp.status_code == 200 + video_id = resp.json()["video_id"] + + # Trigger the pipeline + with patch("pipeline.stages.run_pipeline") as mock_rp: + mock_rp.delay = MagicMock() + resp = await client.post(f"/api/v1/pipeline/trigger/{video_id}") + + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "triggered" + assert data["video_id"] == video_id + + +# ── (h) Pipeline trigger 404 ──────────────────────────────────────────────── + + +async def test_pipeline_trigger_404_for_missing_video(client): + """POST /api/v1/pipeline/trigger/{nonexistent} returns 404.""" + fake_id = str(uuid.uuid4()) + resp = await client.post(f"/api/v1/pipeline/trigger/{fake_id}") + assert resp.status_code == 404 + assert "not found" in resp.json()["detail"].lower() + + +# ── (i) Ingest dispatches pipeline ────────────────────────────────────────── + + +async def test_ingest_dispatches_pipeline(client, db_engine): + """Ingesting a transcript should call run_pipeline.delay with the video_id.""" + sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json" + + with patch("pipeline.stages.run_pipeline") as mock_rp: + mock_rp.delay = MagicMock() + resp = await client.post( + "/api/v1/ingest", + files={"file": (sample.name, sample.read_bytes(), "application/json")}, + ) + + assert resp.status_code == 200 + video_id = resp.json()["video_id"] + mock_rp.delay.assert_called_once_with(video_id) + + +# ── (j) LLM fallback on primary failure ───────────────────────────────────── + + +def test_llm_fallback_on_primary_failure(): + """LLMClient should fall back to secondary endpoint when primary raises APIConnectionError.""" + from pipeline.llm_client import LLMClient + + settings = MagicMock() + settings.llm_api_url = "http://primary:11434/v1" + settings.llm_api_key = "sk-test" + settings.llm_fallback_url = "http://fallback:11434/v1" + settings.llm_fallback_model = "fallback-model" + settings.llm_model = "primary-model" + + with patch("openai.OpenAI") as MockOpenAI: + primary_client = MagicMock() + fallback_client = MagicMock() + + # First call → primary, second call → fallback + MockOpenAI.side_effect = [primary_client, fallback_client] + + client = LLMClient(settings) + + # Primary raises APIConnectionError + primary_client.chat.completions.create.side_effect = openai.APIConnectionError( + request=MagicMock() + ) + + # Fallback succeeds + fallback_response = _make_mock_openai_response('{"result": "ok"}') + fallback_client.chat.completions.create.return_value = fallback_response + + result = client.complete("system", "user") + + assert result == '{"result": "ok"}' + primary_client.chat.completions.create.assert_called_once() + fallback_client.chat.completions.create.assert_called_once()