test: Added 10 integration tests covering pipeline stages 2-6, trigger…

- "backend/tests/test_pipeline.py"
- "backend/tests/fixtures/mock_llm_responses.py"
- "backend/tests/conftest.py"

GSD-Task: S03/T05
This commit is contained in:
jlightner 2026-03-29 22:51:26 +00:00
parent 910e945d9c
commit 2cb10b5db8
6 changed files with 1064 additions and 1 deletions

View file

@ -224,7 +224,7 @@
- Estimate: 45m
- Files: backend/routers/ingest.py, backend/routers/pipeline.py, backend/main.py
- Verify: cd backend && python -c "from routers.pipeline import router; print([r.path for r in router.routes])" && grep -q 'pipeline' backend/main.py && grep -q 'run_pipeline' backend/routers/ingest.py
- [ ] **T05: Integration tests for pipeline, embedding, Qdrant, and trigger endpoints** — Write comprehensive integration tests with mocked LLM and Qdrant that verify the entire pipeline flow. Tests use the existing pytest-asyncio infrastructure from S02, extended with mocks for the OpenAI client and Qdrant client.
- [x] **T05: Added 10 integration tests covering pipeline stages 2-6, trigger endpoint, ingest dispatch, resumability, and LLM fallback with mocked LLM/Qdrant and real PostgreSQL** — Write comprehensive integration tests with mocked LLM and Qdrant that verify the entire pipeline flow. Tests use the existing pytest-asyncio infrastructure from S02, extended with mocks for the OpenAI client and Qdrant client.
## Negative Tests

View file

@ -0,0 +1,28 @@
{
"schemaVersion": 1,
"taskId": "T04",
"unitId": "M001/S03/T04",
"timestamp": 1774824062138,
"passed": true,
"discoverySource": "task-plan",
"checks": [
{
"command": "cd backend",
"exitCode": 0,
"durationMs": 4,
"verdict": "pass"
},
{
"command": "grep -q 'pipeline' backend/main.py",
"exitCode": 0,
"durationMs": 5,
"verdict": "pass"
},
{
"command": "grep -q 'run_pipeline' backend/routers/ingest.py",
"exitCode": 0,
"durationMs": 4,
"verdict": "pass"
}
]
}

View file

@ -0,0 +1,86 @@
---
id: T05
parent: S03
milestone: M001
provides: []
requires: []
affects: []
key_files: ["backend/tests/test_pipeline.py", "backend/tests/fixtures/mock_llm_responses.py", "backend/tests/conftest.py"]
key_decisions: ["Pipeline stage tests patch _engine and _SessionLocal module globals to redirect stages to the test DB", "Ingest dispatch test patches pipeline.stages.run_pipeline because the import is lazy inside the handler", "Stage tests run stages in-process rather than through Celery chains"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "cd backend && python -m pytest tests/test_pipeline.py -v — 10/10 pass. cd backend && python -m pytest tests/ -v — 16/16 pass. All 5 slice-level verification checks pass."
completed_at: 2026-03-29T22:51:23.100Z
blocker_discovered: false
---
# T05: Added 10 integration tests covering pipeline stages 2-6, trigger endpoint, ingest dispatch, resumability, and LLM fallback with mocked LLM/Qdrant and real PostgreSQL
> Added 10 integration tests covering pipeline stages 2-6, trigger endpoint, ingest dispatch, resumability, and LLM fallback with mocked LLM/Qdrant and real PostgreSQL
## What Happened
---
id: T05
parent: S03
milestone: M001
key_files:
- backend/tests/test_pipeline.py
- backend/tests/fixtures/mock_llm_responses.py
- backend/tests/conftest.py
key_decisions:
- Pipeline stage tests patch _engine and _SessionLocal module globals to redirect stages to the test DB
- Ingest dispatch test patches pipeline.stages.run_pipeline because the import is lazy inside the handler
- Stage tests run stages in-process rather than through Celery chains
duration: ""
verification_result: passed
completed_at: 2026-03-29T22:51:23.100Z
blocker_discovered: false
---
# T05: Added 10 integration tests covering pipeline stages 2-6, trigger endpoint, ingest dispatch, resumability, and LLM fallback with mocked LLM/Qdrant and real PostgreSQL
**Added 10 integration tests covering pipeline stages 2-6, trigger endpoint, ingest dispatch, resumability, and LLM fallback with mocked LLM/Qdrant and real PostgreSQL**
## What Happened
Created mock_llm_responses.py with realistic JSON fixtures for all 4 pipeline stages plus embedding generators. Extended conftest.py with sync_engine, sync_session, and pre_ingested_video fixtures. Created test_pipeline.py with 10 tests: stage2 topic labels, stage3 key moment creation, stage4 classification tags, stage5 technique page synthesis, stage6 embedding/Qdrant upserts, run_pipeline resumability from extracted status, pipeline trigger endpoint 200, trigger 404 for missing video, ingest dispatches pipeline, and LLM fallback on primary failure. All tests mock LLM and Qdrant while using the real PostgreSQL test database. All 16 tests (6 ingest + 10 pipeline) pass.
## Verification
cd backend && python -m pytest tests/test_pipeline.py -v — 10/10 pass. cd backend && python -m pytest tests/ -v — 16/16 pass. All 5 slice-level verification checks pass.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd backend && python -m pytest tests/test_pipeline.py -v` | 0 | ✅ pass | 24000ms |
| 2 | `cd backend && python -m pytest tests/ -v` | 0 | ✅ pass | 122000ms |
| 3 | `cd backend && python -c "from config import Settings; s = Settings(); print(s.llm_api_url, s.qdrant_url, s.review_mode)"` | 0 | ✅ pass | 500ms |
| 4 | `cd backend && python -c "from pipeline.schemas import SegmentationResult, ExtractionResult, ClassificationResult, SynthesisResult; print('schemas ok')"` | 0 | ✅ pass | 500ms |
| 5 | `cd backend && python -c "from pipeline.llm_client import LLMClient; print('client ok')"` | 0 | ✅ pass | 500ms |
| 6 | `cd backend && python -c "from worker import celery_app; print(celery_app.main)"` | 0 | ✅ pass | 500ms |
| 7 | `grep -q 'openai' backend/requirements.txt && grep -q 'qdrant-client' backend/requirements.txt` | 0 | ✅ pass | 50ms |
## Deviations
Changed patch target from routers.ingest.run_pipeline to pipeline.stages.run_pipeline for the ingest dispatch test because run_pipeline is a lazy import inside the handler function.
## Known Issues
None.
## Files Created/Modified
- `backend/tests/test_pipeline.py`
- `backend/tests/fixtures/mock_llm_responses.py`
- `backend/tests/conftest.py`
## Deviations
Changed patch target from routers.ingest.run_pipeline to pipeline.stages.run_pipeline for the ingest dispatch test because run_pipeline is a lazy import inside the handler function.
## Known Issues
None.

View file

@ -2,7 +2,9 @@
Provides:
- Async SQLAlchemy engine/session against a real PostgreSQL test database
- Sync SQLAlchemy engine/session for pipeline stage tests (Celery stages are sync)
- httpx.AsyncClient wired to the FastAPI app with dependency overrides
- Pre-ingest fixture for pipeline tests
- Sample transcript fixture path and temporary storage directory
Key design choice: function-scoped engine with NullPool avoids asyncpg
@ -10,13 +12,17 @@ Key design choice: function-scoped engine with NullPool avoids asyncpg
reuse between the ASGI test client and verification queries.
"""
import json
import os
import pathlib
import uuid
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool
# Ensure backend/ is on sys.path so "from models import ..." works
@ -25,12 +31,23 @@ sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
from database import Base, get_session # noqa: E402
from main import app # noqa: E402
from models import ( # noqa: E402
ContentType,
Creator,
ProcessingStatus,
SourceVideo,
TranscriptSegment,
)
TEST_DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
)
TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace(
"postgresql+asyncpg://", "postgresql+psycopg2://"
)
@pytest_asyncio.fixture()
async def db_engine():
@ -91,3 +108,85 @@ def sample_transcript_path() -> pathlib.Path:
def tmp_transcript_dir(tmp_path) -> pathlib.Path:
"""Temporary directory for transcript storage during tests."""
return tmp_path
# ── Sync engine/session for pipeline stages ──────────────────────────────────
@pytest.fixture()
def sync_engine(db_engine):
"""Create a sync SQLAlchemy engine pointing at the test database.
Tables are already created/dropped by the async ``db_engine`` fixture,
so this fixture just wraps a sync engine around the same DB URL.
"""
engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool)
yield engine
engine.dispose()
@pytest.fixture()
def sync_session(sync_engine) -> Session:
"""Create a sync SQLAlchemy session for pipeline stage tests."""
factory = sessionmaker(bind=sync_engine)
session = factory()
yield session
session.close()
# ── Pre-ingest fixture for pipeline tests ────────────────────────────────────
@pytest.fixture()
def pre_ingested_video(sync_engine):
"""Ingest the sample transcript directly into the test DB via sync ORM.
Returns a dict with ``video_id``, ``creator_id``, and ``segment_count``.
"""
factory = sessionmaker(bind=sync_engine)
session = factory()
try:
# Create creator
creator = Creator(
name="Skope",
slug="skope",
folder_name="Skope",
)
session.add(creator)
session.flush()
# Create video
video = SourceVideo(
creator_id=creator.id,
filename="mixing-basics-ep1.mp4",
file_path="Skope/mixing-basics-ep1.mp4",
duration_seconds=1234,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.transcribed,
)
session.add(video)
session.flush()
# Create transcript segments
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
data = json.loads(sample.read_text())
for idx, seg in enumerate(data["segments"]):
session.add(TranscriptSegment(
source_video_id=video.id,
start_time=float(seg["start"]),
end_time=float(seg["end"]),
text=str(seg["text"]),
segment_index=idx,
))
session.commit()
result = {
"video_id": str(video.id),
"creator_id": str(creator.id),
"segment_count": len(data["segments"]),
}
finally:
session.close()
return result

View file

@ -0,0 +1,111 @@
"""Mock LLM and embedding responses for pipeline integration tests.
Each response is a JSON string matching the Pydantic schema for that stage.
The sample transcript has 5 segments about gain staging, so mock responses
reflect that content.
"""
import json
import random
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
STAGE2_SEGMENTATION_RESPONSE = json.dumps({
"segments": [
{
"start_index": 0,
"end_index": 1,
"topic_label": "Introduction",
"summary": "Introduces the episode about mixing basics and gain staging.",
},
{
"start_index": 2,
"end_index": 4,
"topic_label": "Gain Staging Technique",
"summary": "Covers practical steps for gain staging including setting levels and avoiding clipping.",
},
]
})
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
STAGE3_EXTRACTION_RESPONSE = json.dumps({
"moments": [
{
"title": "Setting Levels for Gain Staging",
"summary": "Demonstrates the process of setting proper gain levels across the signal chain to maintain headroom.",
"start_time": 12.8,
"end_time": 28.5,
"content_type": "technique",
"plugins": ["Pro-Q 3"],
"raw_transcript": "First thing you want to do is set your levels. Make sure nothing is clipping on the master bus.",
},
{
"title": "Master Bus Clipping Prevention",
"summary": "Explains how to monitor and prevent clipping on the master bus during a mix session.",
"start_time": 20.1,
"end_time": 35.0,
"content_type": "settings",
"plugins": [],
"raw_transcript": "Make sure nothing is clipping on the master bus. That wraps up this quick overview.",
},
]
})
# ── Stage 4: Classification ─────────────────────────────────────────────────
STAGE4_CLASSIFICATION_RESPONSE = json.dumps({
"classifications": [
{
"moment_index": 0,
"topic_category": "Mixing",
"topic_tags": ["gain staging", "eq"],
"content_type_override": None,
},
{
"moment_index": 1,
"topic_category": "Mixing",
"topic_tags": ["gain staging", "bus processing"],
"content_type_override": None,
},
]
})
# ── Stage 5: Synthesis ───────────────────────────────────────────────────────
STAGE5_SYNTHESIS_RESPONSE = json.dumps({
"pages": [
{
"title": "Gain Staging in Mixing",
"slug": "gain-staging-in-mixing",
"topic_category": "Mixing",
"topic_tags": ["gain staging"],
"summary": "A comprehensive guide to gain staging in a mixing context, covering level setting and master bus management.",
"body_sections": {
"Overview": "Gain staging ensures each stage of the signal chain operates at optimal levels.",
"Steps": "1. Set input levels. 2. Check bus levels. 3. Monitor master output.",
},
"signal_chains": [
{"chain": "Input -> Channel Strip -> Bus -> Master", "notes": "Keep headroom at each stage."}
],
"plugins": ["Pro-Q 3"],
"source_quality": "structured",
}
]
})
# ── Embedding response ───────────────────────────────────────────────────────
def make_mock_embedding(dim: int = 768) -> list[float]:
"""Generate a deterministic-seeded mock embedding vector."""
rng = random.Random(42)
return [rng.uniform(-1, 1) for _ in range(dim)]
def make_mock_embeddings(n: int, dim: int = 768) -> list[list[float]]:
"""Generate n distinct mock embedding vectors."""
return [
[random.Random(42 + i).uniform(-1, 1) for _ in range(dim)]
for i in range(n)
]

View file

@ -0,0 +1,739 @@
"""Integration tests for the LLM extraction pipeline.
Tests run against a real PostgreSQL test database with mocked LLM and Qdrant
clients. Pipeline stages are sync (Celery tasks), so tests call stage
functions directly with sync SQLAlchemy sessions.
Tests (a)(f) call pipeline stages directly. Tests (g)(i) use the async
HTTP client. Test (j) verifies LLM fallback logic.
"""
from __future__ import annotations
import json
import os
import pathlib
import uuid
from unittest.mock import MagicMock, patch, PropertyMock
import openai
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool
from models import (
Creator,
KeyMoment,
KeyMomentContentType,
ProcessingStatus,
SourceVideo,
TechniquePage,
TranscriptSegment,
)
from pipeline.schemas import (
ClassificationResult,
ExtractionResult,
SegmentationResult,
SynthesisResult,
)
from tests.fixtures.mock_llm_responses import (
STAGE2_SEGMENTATION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE4_CLASSIFICATION_RESPONSE,
STAGE5_SYNTHESIS_RESPONSE,
make_mock_embeddings,
)
# ── Test database URL ────────────────────────────────────────────────────────
TEST_DATABASE_URL_SYNC = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
).replace("postgresql+asyncpg://", "postgresql+psycopg2://")
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_mock_openai_response(content: str):
"""Build a mock OpenAI ChatCompletion response object."""
mock_message = MagicMock()
mock_message.content = content
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
return mock_response
def _make_mock_embedding_response(vectors: list[list[float]]):
"""Build a mock OpenAI Embedding response object."""
mock_items = []
for i, vec in enumerate(vectors):
item = MagicMock()
item.embedding = vec
item.index = i
mock_items.append(item)
mock_response = MagicMock()
mock_response.data = mock_items
return mock_response
def _patch_pipeline_engine(sync_engine):
"""Patch the pipeline.stages module to use the test sync engine/session."""
return [
patch("pipeline.stages._engine", sync_engine),
patch(
"pipeline.stages._SessionLocal",
sessionmaker(bind=sync_engine),
),
]
def _patch_llm_completions(side_effect_fn):
"""Patch openai.OpenAI so all instances share a mocked chat.completions.create."""
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = side_effect_fn
return patch("openai.OpenAI", return_value=mock_client)
def _create_canonical_tags_file(tmp_path: pathlib.Path) -> pathlib.Path:
"""Write a minimal canonical_tags.yaml for stage4 to load."""
config_dir = tmp_path / "config"
config_dir.mkdir(exist_ok=True)
tags_path = config_dir / "canonical_tags.yaml"
tags_path.write_text(
"categories:\n"
" - name: Mixing\n"
" description: Balancing and processing elements\n"
" sub_topics: [eq, compression, gain staging, bus processing]\n"
" - name: Sound design\n"
" description: Creating sounds\n"
" sub_topics: [bass, drums]\n"
)
return tags_path
# ── (a) Stage 2: Segmentation ───────────────────────────────────────────────
def test_stage2_segmentation_updates_topic_labels(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Stage 2 should update topic_label on each TranscriptSegment."""
video_id = pre_ingested_video["video_id"]
# Create prompts directory
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "stage2_segmentation.txt").write_text("You are a segmentation assistant.")
# Build the mock LLM that returns the segmentation response
def llm_side_effect(**kwargs):
return _make_mock_openai_response(STAGE2_SEGMENTATION_RESPONSE)
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
with _patch_llm_completions(llm_side_effect), \
patch("pipeline.stages.get_settings") as mock_settings:
s = MagicMock()
s.prompts_path = str(prompts_dir)
s.llm_api_url = "http://mock:11434/v1"
s.llm_api_key = "sk-test"
s.llm_model = "test-model"
s.llm_fallback_url = "http://mock:11434/v1"
s.llm_fallback_model = "test-model"
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
mock_settings.return_value = s
# Import and call stage directly (not via Celery)
from pipeline.stages import stage2_segmentation
result = stage2_segmentation(video_id)
assert result == video_id
for p in patches:
p.stop()
# Verify: check topic_label on segments
factory = sessionmaker(bind=sync_engine)
session = factory()
try:
segments = (
session.execute(
select(TranscriptSegment)
.where(TranscriptSegment.source_video_id == video_id)
.order_by(TranscriptSegment.segment_index)
)
.scalars()
.all()
)
# Segments 0,1 should have "Introduction", segments 2,3,4 should have "Gain Staging Technique"
assert segments[0].topic_label == "Introduction"
assert segments[1].topic_label == "Introduction"
assert segments[2].topic_label == "Gain Staging Technique"
assert segments[3].topic_label == "Gain Staging Technique"
assert segments[4].topic_label == "Gain Staging Technique"
finally:
session.close()
# ── (b) Stage 3: Extraction ─────────────────────────────────────────────────
def test_stage3_extraction_creates_key_moments(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Stages 2+3 should create KeyMoment rows and set processing_status=extracted."""
video_id = pre_ingested_video["video_id"]
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
call_count = {"n": 0}
responses = [STAGE2_SEGMENTATION_RESPONSE, STAGE3_EXTRACTION_RESPONSE, STAGE3_EXTRACTION_RESPONSE]
def llm_side_effect(**kwargs):
idx = min(call_count["n"], len(responses) - 1)
resp = responses[idx]
call_count["n"] += 1
return _make_mock_openai_response(resp)
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
with _patch_llm_completions(llm_side_effect), \
patch("pipeline.stages.get_settings") as mock_settings:
s = MagicMock()
s.prompts_path = str(prompts_dir)
s.llm_api_url = "http://mock:11434/v1"
s.llm_api_key = "sk-test"
s.llm_model = "test-model"
s.llm_fallback_url = "http://mock:11434/v1"
s.llm_fallback_model = "test-model"
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
mock_settings.return_value = s
from pipeline.stages import stage2_segmentation, stage3_extraction
stage2_segmentation(video_id)
stage3_extraction(video_id)
for p in patches:
p.stop()
# Verify key moments created
factory = sessionmaker(bind=sync_engine)
session = factory()
try:
moments = (
session.execute(
select(KeyMoment)
.where(KeyMoment.source_video_id == video_id)
.order_by(KeyMoment.start_time)
)
.scalars()
.all()
)
# Two topic groups → extraction called twice → up to 4 moments
# (2 per group from the mock response)
assert len(moments) >= 2
assert moments[0].title == "Setting Levels for Gain Staging"
assert moments[0].content_type == KeyMomentContentType.technique
# Verify processing_status
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
assert video.processing_status == ProcessingStatus.extracted
finally:
session.close()
# ── (c) Stage 4: Classification ─────────────────────────────────────────────
def test_stage4_classification_assigns_tags(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Stages 2+3+4 should store classification data in Redis."""
video_id = pre_ingested_video["video_id"]
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
_create_canonical_tags_file(tmp_path)
call_count = {"n": 0}
responses = [
STAGE2_SEGMENTATION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE4_CLASSIFICATION_RESPONSE,
]
def llm_side_effect(**kwargs):
idx = min(call_count["n"], len(responses) - 1)
resp = responses[idx]
call_count["n"] += 1
return _make_mock_openai_response(resp)
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
stored_cls_data = {}
def mock_store_classification(vid, data):
stored_cls_data[vid] = data
with _patch_llm_completions(llm_side_effect), \
patch("pipeline.stages.get_settings") as mock_settings, \
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
patch("pipeline.stages._store_classification_data", side_effect=mock_store_classification):
s = MagicMock()
s.prompts_path = str(prompts_dir)
s.llm_api_url = "http://mock:11434/v1"
s.llm_api_key = "sk-test"
s.llm_model = "test-model"
s.llm_fallback_url = "http://mock:11434/v1"
s.llm_fallback_model = "test-model"
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
s.review_mode = True
mock_settings.return_value = s
mock_tags.return_value = {
"categories": [
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging", "eq"]},
]
}
from pipeline.stages import stage2_segmentation, stage3_extraction, stage4_classification
stage2_segmentation(video_id)
stage3_extraction(video_id)
stage4_classification(video_id)
for p in patches:
p.stop()
# Verify classification data was stored
assert video_id in stored_cls_data
cls_data = stored_cls_data[video_id]
assert len(cls_data) >= 1
assert cls_data[0]["topic_category"] == "Mixing"
assert "gain staging" in cls_data[0]["topic_tags"]
# ── (d) Stage 5: Synthesis ──────────────────────────────────────────────────
def test_stage5_synthesis_creates_technique_pages(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Full pipeline stages 2-5 should create TechniquePage rows linked to KeyMoments."""
video_id = pre_ingested_video["video_id"]
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
(prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.")
call_count = {"n": 0}
responses = [
STAGE2_SEGMENTATION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE4_CLASSIFICATION_RESPONSE,
STAGE5_SYNTHESIS_RESPONSE,
]
def llm_side_effect(**kwargs):
idx = min(call_count["n"], len(responses) - 1)
resp = responses[idx]
call_count["n"] += 1
return _make_mock_openai_response(resp)
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
# Mock classification data in Redis (simulate stage 4 having stored it)
mock_cls_data = [
{"moment_id": "will-be-replaced", "topic_category": "Mixing", "topic_tags": ["gain staging"]},
]
with _patch_llm_completions(llm_side_effect), \
patch("pipeline.stages.get_settings") as mock_settings, \
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
patch("pipeline.stages._store_classification_data"), \
patch("pipeline.stages._load_classification_data") as mock_load_cls:
s = MagicMock()
s.prompts_path = str(prompts_dir)
s.llm_api_url = "http://mock:11434/v1"
s.llm_api_key = "sk-test"
s.llm_model = "test-model"
s.llm_fallback_url = "http://mock:11434/v1"
s.llm_fallback_model = "test-model"
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
s.review_mode = True
mock_settings.return_value = s
mock_tags.return_value = {
"categories": [
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]},
]
}
from pipeline.stages import (
stage2_segmentation,
stage3_extraction,
stage4_classification,
stage5_synthesis,
)
stage2_segmentation(video_id)
stage3_extraction(video_id)
stage4_classification(video_id)
# Now set up mock_load_cls to return data with real moment IDs
factory = sessionmaker(bind=sync_engine)
sess = factory()
real_moments = (
sess.execute(
select(KeyMoment).where(KeyMoment.source_video_id == video_id)
)
.scalars()
.all()
)
real_cls = [
{"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]}
for m in real_moments
]
sess.close()
mock_load_cls.return_value = real_cls
stage5_synthesis(video_id)
for p in patches:
p.stop()
# Verify TechniquePages created
factory = sessionmaker(bind=sync_engine)
session = factory()
try:
pages = session.execute(select(TechniquePage)).scalars().all()
assert len(pages) >= 1
page = pages[0]
assert page.title == "Gain Staging in Mixing"
assert page.body_sections is not None
assert "Overview" in page.body_sections
assert page.signal_chains is not None
assert len(page.signal_chains) >= 1
assert page.summary is not None
# Verify KeyMoments are linked to the TechniquePage
moments = (
session.execute(
select(KeyMoment).where(KeyMoment.technique_page_id == page.id)
)
.scalars()
.all()
)
assert len(moments) >= 1
# Verify processing_status updated
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
assert video.processing_status == ProcessingStatus.reviewed
finally:
session.close()
# ── (e) Stage 6: Embed & Index ──────────────────────────────────────────────
def test_stage6_embeds_and_upserts_to_qdrant(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""Full pipeline through stage 6 should call EmbeddingClient and QdrantManager."""
video_id = pre_ingested_video["video_id"]
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
(prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.")
call_count = {"n": 0}
responses = [
STAGE2_SEGMENTATION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE3_EXTRACTION_RESPONSE,
STAGE4_CLASSIFICATION_RESPONSE,
STAGE5_SYNTHESIS_RESPONSE,
]
def llm_side_effect(**kwargs):
idx = min(call_count["n"], len(responses) - 1)
resp = responses[idx]
call_count["n"] += 1
return _make_mock_openai_response(resp)
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
mock_embed_client = MagicMock()
mock_embed_client.embed.side_effect = lambda texts: make_mock_embeddings(len(texts))
mock_qdrant_mgr = MagicMock()
with _patch_llm_completions(llm_side_effect), \
patch("pipeline.stages.get_settings") as mock_settings, \
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
patch("pipeline.stages._store_classification_data"), \
patch("pipeline.stages._load_classification_data") as mock_load_cls, \
patch("pipeline.stages.EmbeddingClient", return_value=mock_embed_client), \
patch("pipeline.stages.QdrantManager", return_value=mock_qdrant_mgr):
s = MagicMock()
s.prompts_path = str(prompts_dir)
s.llm_api_url = "http://mock:11434/v1"
s.llm_api_key = "sk-test"
s.llm_model = "test-model"
s.llm_fallback_url = "http://mock:11434/v1"
s.llm_fallback_model = "test-model"
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
s.review_mode = True
s.embedding_api_url = "http://mock:11434/v1"
s.embedding_model = "test-embed"
s.embedding_dimensions = 768
s.qdrant_url = "http://mock:6333"
s.qdrant_collection = "test_collection"
mock_settings.return_value = s
mock_tags.return_value = {
"categories": [
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]},
]
}
from pipeline.stages import (
stage2_segmentation,
stage3_extraction,
stage4_classification,
stage5_synthesis,
stage6_embed_and_index,
)
stage2_segmentation(video_id)
stage3_extraction(video_id)
stage4_classification(video_id)
# Load real moment IDs for classification data mock
factory = sessionmaker(bind=sync_engine)
sess = factory()
real_moments = (
sess.execute(
select(KeyMoment).where(KeyMoment.source_video_id == video_id)
)
.scalars()
.all()
)
real_cls = [
{"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]}
for m in real_moments
]
sess.close()
mock_load_cls.return_value = real_cls
stage5_synthesis(video_id)
stage6_embed_and_index(video_id)
for p in patches:
p.stop()
# Verify EmbeddingClient.embed was called
assert mock_embed_client.embed.called
# Verify QdrantManager methods called
mock_qdrant_mgr.ensure_collection.assert_called_once()
assert (
mock_qdrant_mgr.upsert_technique_pages.called
or mock_qdrant_mgr.upsert_key_moments.called
), "Expected at least one upsert call to QdrantManager"
# ── (f) Resumability ────────────────────────────────────────────────────────
def test_run_pipeline_resumes_from_extracted(
db_engine, sync_engine, pre_ingested_video, tmp_path
):
"""When status=extracted, run_pipeline should skip stages 2+3 and run 4+5+6."""
video_id = pre_ingested_video["video_id"]
# Set video status to "extracted" directly
factory = sessionmaker(bind=sync_engine)
session = factory()
video = session.execute(
select(SourceVideo).where(SourceVideo.id == video_id)
).scalar_one()
video.processing_status = ProcessingStatus.extracted
session.commit()
session.close()
patches = _patch_pipeline_engine(sync_engine)
for p in patches:
p.start()
with patch("pipeline.stages.get_settings") as mock_settings, \
patch("pipeline.stages.stage2_segmentation") as mock_s2, \
patch("pipeline.stages.stage3_extraction") as mock_s3, \
patch("pipeline.stages.stage4_classification") as mock_s4, \
patch("pipeline.stages.stage5_synthesis") as mock_s5, \
patch("pipeline.stages.stage6_embed_and_index") as mock_s6, \
patch("pipeline.stages.celery_chain") as mock_chain:
s = MagicMock()
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
mock_settings.return_value = s
# Mock chain to inspect what stages it gets
mock_pipeline = MagicMock()
mock_chain.return_value = mock_pipeline
# Mock the .s() method on each task
mock_s2.s = MagicMock(return_value="s2_sig")
mock_s3.s = MagicMock(return_value="s3_sig")
mock_s4.s = MagicMock(return_value="s4_sig")
mock_s5.s = MagicMock(return_value="s5_sig")
mock_s6.s = MagicMock(return_value="s6_sig")
from pipeline.stages import run_pipeline
run_pipeline(video_id)
# Verify: stages 2 and 3 should NOT have .s() called with video_id
mock_s2.s.assert_not_called()
mock_s3.s.assert_not_called()
# Stages 4, 5, 6 should have .s() called
mock_s4.s.assert_called_once_with(video_id)
mock_s5.s.assert_called_once()
mock_s6.s.assert_called_once()
for p in patches:
p.stop()
# ── (g) Pipeline trigger endpoint ───────────────────────────────────────────
async def test_pipeline_trigger_endpoint(client, db_engine):
"""POST /api/v1/pipeline/trigger/{video_id} with valid video returns 200."""
# Ingest a transcript first to create a video
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
with patch("routers.ingest.run_pipeline", create=True) as mock_rp:
mock_rp.delay = MagicMock()
resp = await client.post(
"/api/v1/ingest",
files={"file": (sample.name, sample.read_bytes(), "application/json")},
)
assert resp.status_code == 200
video_id = resp.json()["video_id"]
# Trigger the pipeline
with patch("pipeline.stages.run_pipeline") as mock_rp:
mock_rp.delay = MagicMock()
resp = await client.post(f"/api/v1/pipeline/trigger/{video_id}")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "triggered"
assert data["video_id"] == video_id
# ── (h) Pipeline trigger 404 ────────────────────────────────────────────────
async def test_pipeline_trigger_404_for_missing_video(client):
"""POST /api/v1/pipeline/trigger/{nonexistent} returns 404."""
fake_id = str(uuid.uuid4())
resp = await client.post(f"/api/v1/pipeline/trigger/{fake_id}")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"].lower()
# ── (i) Ingest dispatches pipeline ──────────────────────────────────────────
async def test_ingest_dispatches_pipeline(client, db_engine):
"""Ingesting a transcript should call run_pipeline.delay with the video_id."""
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
with patch("pipeline.stages.run_pipeline") as mock_rp:
mock_rp.delay = MagicMock()
resp = await client.post(
"/api/v1/ingest",
files={"file": (sample.name, sample.read_bytes(), "application/json")},
)
assert resp.status_code == 200
video_id = resp.json()["video_id"]
mock_rp.delay.assert_called_once_with(video_id)
# ── (j) LLM fallback on primary failure ─────────────────────────────────────
def test_llm_fallback_on_primary_failure():
"""LLMClient should fall back to secondary endpoint when primary raises APIConnectionError."""
from pipeline.llm_client import LLMClient
settings = MagicMock()
settings.llm_api_url = "http://primary:11434/v1"
settings.llm_api_key = "sk-test"
settings.llm_fallback_url = "http://fallback:11434/v1"
settings.llm_fallback_model = "fallback-model"
settings.llm_model = "primary-model"
with patch("openai.OpenAI") as MockOpenAI:
primary_client = MagicMock()
fallback_client = MagicMock()
# First call → primary, second call → fallback
MockOpenAI.side_effect = [primary_client, fallback_client]
client = LLMClient(settings)
# Primary raises APIConnectionError
primary_client.chat.completions.create.side_effect = openai.APIConnectionError(
request=MagicMock()
)
# Fallback succeeds
fallback_response = _make_mock_openai_response('{"result": "ok"}')
fallback_client.chat.completions.create.return_value = fallback_response
result = client.complete("system", "user")
assert result == '{"result": "ok"}'
primary_client.chat.completions.create.assert_called_once()
fallback_client.chat.completions.create.assert_called_once()