Deleted files: - generate_stage5_variants.py (874 lines) — superseded by pipeline.quality toolkit - PROJECT_CONTEXT.md (461 lines) — stale, .gsd/PROJECT.md is the living doc - CHRYSOPEDIA-ASSESSMENT.md (654 lines) — M011 triage artifact, all findings actioned CSS cleanup (364 lines): - 20 orphaned block groups from deleted review queue/old components - Duplicate .btn base rule, .btn--warning, @keyframes stagePulse Python imports: - routers/pipeline.py: uuid, literal_column, over, text - tests/test_pipeline.py: 9 unused imports (PropertyMock, create_engine, etc.) Build verified: tsc --noEmit clean, npm run build clean (59 modules, 0 warnings).
754 lines
27 KiB
Python
754 lines
27 KiB
Python
"""Integration tests for the LLM extraction pipeline.
|
||
|
||
Tests run against a real PostgreSQL test database with mocked LLM and Qdrant
|
||
clients. Pipeline stages are sync (Celery tasks), so tests call stage
|
||
functions directly with sync SQLAlchemy sessions.
|
||
|
||
Tests (a)–(f) call pipeline stages directly. Tests (g)–(i) use the async
|
||
HTTP client. Test (j) verifies LLM fallback logic.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import pathlib
|
||
import uuid
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
import openai
|
||
import pytest
|
||
from sqlalchemy import select
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
from models import (
|
||
KeyMoment,
|
||
KeyMomentContentType,
|
||
ProcessingStatus,
|
||
SourceVideo,
|
||
TechniquePage,
|
||
TranscriptSegment,
|
||
)
|
||
|
||
from tests.fixtures.mock_llm_responses import (
|
||
STAGE2_SEGMENTATION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE4_CLASSIFICATION_RESPONSE,
|
||
STAGE5_SYNTHESIS_RESPONSE,
|
||
make_mock_embeddings,
|
||
)
|
||
|
||
# ── Test database URL ────────────────────────────────────────────────────────
|
||
|
||
TEST_DATABASE_URL_SYNC = os.getenv(
|
||
"TEST_DATABASE_URL",
|
||
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
|
||
).replace("postgresql+asyncpg://", "postgresql+psycopg2://")
|
||
|
||
|
||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _make_mock_openai_response(content: str):
|
||
"""Build a mock OpenAI ChatCompletion response object."""
|
||
mock_message = MagicMock()
|
||
mock_message.content = content
|
||
|
||
mock_choice = MagicMock()
|
||
mock_choice.message = mock_message
|
||
|
||
mock_response = MagicMock()
|
||
mock_response.choices = [mock_choice]
|
||
return mock_response
|
||
|
||
|
||
def _make_mock_embedding_response(vectors: list[list[float]]):
|
||
"""Build a mock OpenAI Embedding response object."""
|
||
mock_items = []
|
||
for i, vec in enumerate(vectors):
|
||
item = MagicMock()
|
||
item.embedding = vec
|
||
item.index = i
|
||
mock_items.append(item)
|
||
|
||
mock_response = MagicMock()
|
||
mock_response.data = mock_items
|
||
return mock_response
|
||
|
||
|
||
def _patch_pipeline_engine(sync_engine):
|
||
"""Patch the pipeline.stages module to use the test sync engine/session."""
|
||
return [
|
||
patch("pipeline.stages._engine", sync_engine),
|
||
patch(
|
||
"pipeline.stages._SessionLocal",
|
||
sessionmaker(bind=sync_engine),
|
||
),
|
||
]
|
||
|
||
|
||
def _patch_llm_completions(side_effect_fn):
|
||
"""Patch openai.OpenAI so all instances share a mocked chat.completions.create."""
|
||
mock_client = MagicMock()
|
||
mock_client.chat.completions.create.side_effect = side_effect_fn
|
||
return patch("openai.OpenAI", return_value=mock_client)
|
||
|
||
|
||
def _create_canonical_tags_file(tmp_path: pathlib.Path) -> pathlib.Path:
|
||
"""Write a minimal canonical_tags.yaml for stage4 to load."""
|
||
config_dir = tmp_path / "config"
|
||
config_dir.mkdir(exist_ok=True)
|
||
tags_path = config_dir / "canonical_tags.yaml"
|
||
tags_path.write_text(
|
||
"categories:\n"
|
||
" - name: Mixing\n"
|
||
" description: Balancing and processing elements\n"
|
||
" sub_topics: [eq, compression, gain staging, bus processing]\n"
|
||
" - name: Sound design\n"
|
||
" description: Creating sounds\n"
|
||
" sub_topics: [bass, drums]\n"
|
||
)
|
||
return tags_path
|
||
|
||
|
||
# ── (a) Stage 2: Segmentation ───────────────────────────────────────────────
|
||
|
||
|
||
def test_stage2_segmentation_updates_topic_labels(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""Stage 2 should update topic_label on each TranscriptSegment."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
# Create prompts directory
|
||
prompts_dir = tmp_path / "prompts"
|
||
prompts_dir.mkdir()
|
||
(prompts_dir / "stage2_segmentation.txt").write_text("You are a segmentation assistant.")
|
||
|
||
# Build the mock LLM that returns the segmentation response
|
||
def llm_side_effect(**kwargs):
|
||
return _make_mock_openai_response(STAGE2_SEGMENTATION_RESPONSE)
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
with _patch_llm_completions(llm_side_effect), \
|
||
patch("pipeline.stages.get_settings") as mock_settings:
|
||
s = MagicMock()
|
||
s.prompts_path = str(prompts_dir)
|
||
s.llm_api_url = "http://mock:11434/v1"
|
||
s.llm_api_key = "sk-test"
|
||
s.llm_model = "test-model"
|
||
s.llm_fallback_url = "http://mock:11434/v1"
|
||
s.llm_fallback_model = "test-model"
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
mock_settings.return_value = s
|
||
|
||
# Import and call stage directly (not via Celery)
|
||
from pipeline.stages import stage2_segmentation
|
||
|
||
result = stage2_segmentation(video_id)
|
||
assert result == video_id
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
# Verify: check topic_label on segments
|
||
factory = sessionmaker(bind=sync_engine)
|
||
session = factory()
|
||
try:
|
||
segments = (
|
||
session.execute(
|
||
select(TranscriptSegment)
|
||
.where(TranscriptSegment.source_video_id == video_id)
|
||
.order_by(TranscriptSegment.segment_index)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
# Segments 0,1 should have "Introduction", segments 2,3,4 should have "Gain Staging Technique"
|
||
assert segments[0].topic_label == "Introduction"
|
||
assert segments[1].topic_label == "Introduction"
|
||
assert segments[2].topic_label == "Gain Staging Technique"
|
||
assert segments[3].topic_label == "Gain Staging Technique"
|
||
assert segments[4].topic_label == "Gain Staging Technique"
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# ── (b) Stage 3: Extraction ─────────────────────────────────────────────────
|
||
|
||
|
||
def test_stage3_extraction_creates_key_moments(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""Stages 2+3 should create KeyMoment rows."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
prompts_dir = tmp_path / "prompts"
|
||
prompts_dir.mkdir()
|
||
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
|
||
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
|
||
|
||
call_count = {"n": 0}
|
||
responses = [STAGE2_SEGMENTATION_RESPONSE, STAGE3_EXTRACTION_RESPONSE, STAGE3_EXTRACTION_RESPONSE]
|
||
|
||
def llm_side_effect(**kwargs):
|
||
idx = min(call_count["n"], len(responses) - 1)
|
||
resp = responses[idx]
|
||
call_count["n"] += 1
|
||
return _make_mock_openai_response(resp)
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
with _patch_llm_completions(llm_side_effect), \
|
||
patch("pipeline.stages.get_settings") as mock_settings:
|
||
s = MagicMock()
|
||
s.prompts_path = str(prompts_dir)
|
||
s.llm_api_url = "http://mock:11434/v1"
|
||
s.llm_api_key = "sk-test"
|
||
s.llm_model = "test-model"
|
||
s.llm_fallback_url = "http://mock:11434/v1"
|
||
s.llm_fallback_model = "test-model"
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
mock_settings.return_value = s
|
||
|
||
from pipeline.stages import stage2_segmentation, stage3_extraction
|
||
|
||
stage2_segmentation(video_id)
|
||
stage3_extraction(video_id)
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
# Verify key moments created
|
||
factory = sessionmaker(bind=sync_engine)
|
||
session = factory()
|
||
try:
|
||
moments = (
|
||
session.execute(
|
||
select(KeyMoment)
|
||
.where(KeyMoment.source_video_id == video_id)
|
||
.order_by(KeyMoment.start_time)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
# Two topic groups → extraction called twice → up to 4 moments
|
||
# (2 per group from the mock response)
|
||
assert len(moments) >= 2
|
||
assert moments[0].title == "Setting Levels for Gain Staging"
|
||
assert moments[0].content_type == KeyMomentContentType.technique
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# ── (c) Stage 4: Classification ─────────────────────────────────────────────
|
||
|
||
|
||
def test_stage4_classification_assigns_tags(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""Stages 2+3+4 should store classification data in Redis."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
prompts_dir = tmp_path / "prompts"
|
||
prompts_dir.mkdir()
|
||
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
|
||
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
|
||
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
|
||
|
||
_create_canonical_tags_file(tmp_path)
|
||
|
||
call_count = {"n": 0}
|
||
responses = [
|
||
STAGE2_SEGMENTATION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE4_CLASSIFICATION_RESPONSE,
|
||
]
|
||
|
||
def llm_side_effect(**kwargs):
|
||
idx = min(call_count["n"], len(responses) - 1)
|
||
resp = responses[idx]
|
||
call_count["n"] += 1
|
||
return _make_mock_openai_response(resp)
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
stored_cls_data = {}
|
||
|
||
def mock_store_classification(vid, data):
|
||
stored_cls_data[vid] = data
|
||
|
||
with _patch_llm_completions(llm_side_effect), \
|
||
patch("pipeline.stages.get_settings") as mock_settings, \
|
||
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
|
||
patch("pipeline.stages._store_classification_data", side_effect=mock_store_classification):
|
||
s = MagicMock()
|
||
s.prompts_path = str(prompts_dir)
|
||
s.llm_api_url = "http://mock:11434/v1"
|
||
s.llm_api_key = "sk-test"
|
||
s.llm_model = "test-model"
|
||
s.llm_fallback_url = "http://mock:11434/v1"
|
||
s.llm_fallback_model = "test-model"
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
mock_settings.return_value = s
|
||
|
||
mock_tags.return_value = {
|
||
"categories": [
|
||
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging", "eq"]},
|
||
]
|
||
}
|
||
|
||
from pipeline.stages import stage2_segmentation, stage3_extraction, stage4_classification
|
||
|
||
stage2_segmentation(video_id)
|
||
stage3_extraction(video_id)
|
||
stage4_classification(video_id)
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
# Verify classification data was stored
|
||
assert video_id in stored_cls_data
|
||
cls_data = stored_cls_data[video_id]
|
||
assert len(cls_data) >= 1
|
||
assert cls_data[0]["topic_category"] == "Mixing"
|
||
assert "gain staging" in cls_data[0]["topic_tags"]
|
||
|
||
|
||
# ── (d) Stage 5: Synthesis ──────────────────────────────────────────────────
|
||
|
||
|
||
def test_stage5_synthesis_creates_technique_pages(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""Full pipeline stages 2-5 should create TechniquePage rows linked to KeyMoments."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
prompts_dir = tmp_path / "prompts"
|
||
prompts_dir.mkdir()
|
||
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
|
||
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
|
||
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
|
||
(prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.")
|
||
|
||
call_count = {"n": 0}
|
||
responses = [
|
||
STAGE2_SEGMENTATION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE4_CLASSIFICATION_RESPONSE,
|
||
STAGE5_SYNTHESIS_RESPONSE,
|
||
]
|
||
|
||
def llm_side_effect(**kwargs):
|
||
idx = min(call_count["n"], len(responses) - 1)
|
||
resp = responses[idx]
|
||
call_count["n"] += 1
|
||
return _make_mock_openai_response(resp)
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
# Mock classification data in Redis (simulate stage 4 having stored it)
|
||
mock_cls_data = [
|
||
{"moment_id": "will-be-replaced", "topic_category": "Mixing", "topic_tags": ["gain staging"]},
|
||
]
|
||
|
||
with _patch_llm_completions(llm_side_effect), \
|
||
patch("pipeline.stages.get_settings") as mock_settings, \
|
||
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
|
||
patch("pipeline.stages._store_classification_data"), \
|
||
patch("pipeline.stages._load_classification_data") as mock_load_cls:
|
||
s = MagicMock()
|
||
s.prompts_path = str(prompts_dir)
|
||
s.llm_api_url = "http://mock:11434/v1"
|
||
s.llm_api_key = "sk-test"
|
||
s.llm_model = "test-model"
|
||
s.llm_fallback_url = "http://mock:11434/v1"
|
||
s.llm_fallback_model = "test-model"
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
mock_settings.return_value = s
|
||
|
||
mock_tags.return_value = {
|
||
"categories": [
|
||
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]},
|
||
]
|
||
}
|
||
|
||
from pipeline.stages import (
|
||
stage2_segmentation,
|
||
stage3_extraction,
|
||
stage4_classification,
|
||
stage5_synthesis,
|
||
)
|
||
|
||
stage2_segmentation(video_id)
|
||
stage3_extraction(video_id)
|
||
stage4_classification(video_id)
|
||
|
||
# Now set up mock_load_cls to return data with real moment IDs
|
||
factory = sessionmaker(bind=sync_engine)
|
||
sess = factory()
|
||
real_moments = (
|
||
sess.execute(
|
||
select(KeyMoment).where(KeyMoment.source_video_id == video_id)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
real_cls = [
|
||
{"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]}
|
||
for m in real_moments
|
||
]
|
||
sess.close()
|
||
mock_load_cls.return_value = real_cls
|
||
|
||
stage5_synthesis(video_id)
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
# Verify TechniquePages created
|
||
factory = sessionmaker(bind=sync_engine)
|
||
session = factory()
|
||
try:
|
||
pages = session.execute(select(TechniquePage)).scalars().all()
|
||
assert len(pages) >= 1
|
||
page = pages[0]
|
||
assert page.title == "Gain Staging in Mixing"
|
||
assert page.body_sections is not None
|
||
assert "Overview" in page.body_sections
|
||
assert page.signal_chains is not None
|
||
assert len(page.signal_chains) >= 1
|
||
assert page.summary is not None
|
||
|
||
# Verify KeyMoments are linked to the TechniquePage
|
||
moments = (
|
||
session.execute(
|
||
select(KeyMoment).where(KeyMoment.technique_page_id == page.id)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
assert len(moments) >= 1
|
||
|
||
# Verify processing_status updated
|
||
video = session.execute(
|
||
select(SourceVideo).where(SourceVideo.id == video_id)
|
||
).scalar_one()
|
||
assert video.processing_status == ProcessingStatus.complete
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# ── (e) Stage 6: Embed & Index ──────────────────────────────────────────────
|
||
|
||
|
||
def test_stage6_embeds_and_upserts_to_qdrant(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""Full pipeline through stage 6 should call EmbeddingClient and QdrantManager."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
prompts_dir = tmp_path / "prompts"
|
||
prompts_dir.mkdir()
|
||
(prompts_dir / "stage2_segmentation.txt").write_text("Segment assistant.")
|
||
(prompts_dir / "stage3_extraction.txt").write_text("Extraction assistant.")
|
||
(prompts_dir / "stage4_classification.txt").write_text("Classification assistant.")
|
||
(prompts_dir / "stage5_synthesis.txt").write_text("Synthesis assistant.")
|
||
|
||
call_count = {"n": 0}
|
||
responses = [
|
||
STAGE2_SEGMENTATION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE3_EXTRACTION_RESPONSE,
|
||
STAGE4_CLASSIFICATION_RESPONSE,
|
||
STAGE5_SYNTHESIS_RESPONSE,
|
||
]
|
||
|
||
def llm_side_effect(**kwargs):
|
||
idx = min(call_count["n"], len(responses) - 1)
|
||
resp = responses[idx]
|
||
call_count["n"] += 1
|
||
return _make_mock_openai_response(resp)
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
mock_embed_client = MagicMock()
|
||
mock_embed_client.embed.side_effect = lambda texts: make_mock_embeddings(len(texts))
|
||
|
||
mock_qdrant_mgr = MagicMock()
|
||
|
||
with _patch_llm_completions(llm_side_effect), \
|
||
patch("pipeline.stages.get_settings") as mock_settings, \
|
||
patch("pipeline.stages._load_canonical_tags") as mock_tags, \
|
||
patch("pipeline.stages._store_classification_data"), \
|
||
patch("pipeline.stages._load_classification_data") as mock_load_cls, \
|
||
patch("pipeline.stages.EmbeddingClient", return_value=mock_embed_client), \
|
||
patch("pipeline.stages.QdrantManager", return_value=mock_qdrant_mgr):
|
||
s = MagicMock()
|
||
s.prompts_path = str(prompts_dir)
|
||
s.llm_api_url = "http://mock:11434/v1"
|
||
s.llm_api_key = "sk-test"
|
||
s.llm_model = "test-model"
|
||
s.llm_fallback_url = "http://mock:11434/v1"
|
||
s.llm_fallback_model = "test-model"
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
s.embedding_api_url = "http://mock:11434/v1"
|
||
s.embedding_model = "test-embed"
|
||
s.embedding_dimensions = 768
|
||
s.qdrant_url = "http://mock:6333"
|
||
s.qdrant_collection = "test_collection"
|
||
mock_settings.return_value = s
|
||
|
||
mock_tags.return_value = {
|
||
"categories": [
|
||
{"name": "Mixing", "description": "Balancing", "sub_topics": ["gain staging"]},
|
||
]
|
||
}
|
||
|
||
from pipeline.stages import (
|
||
stage2_segmentation,
|
||
stage3_extraction,
|
||
stage4_classification,
|
||
stage5_synthesis,
|
||
stage6_embed_and_index,
|
||
)
|
||
|
||
stage2_segmentation(video_id)
|
||
stage3_extraction(video_id)
|
||
stage4_classification(video_id)
|
||
|
||
# Load real moment IDs for classification data mock
|
||
factory = sessionmaker(bind=sync_engine)
|
||
sess = factory()
|
||
real_moments = (
|
||
sess.execute(
|
||
select(KeyMoment).where(KeyMoment.source_video_id == video_id)
|
||
)
|
||
.scalars()
|
||
.all()
|
||
)
|
||
real_cls = [
|
||
{"moment_id": str(m.id), "topic_category": "Mixing", "topic_tags": ["gain staging"]}
|
||
for m in real_moments
|
||
]
|
||
sess.close()
|
||
mock_load_cls.return_value = real_cls
|
||
|
||
stage5_synthesis(video_id)
|
||
stage6_embed_and_index(video_id)
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
# Verify EmbeddingClient.embed was called
|
||
assert mock_embed_client.embed.called
|
||
# Verify QdrantManager methods called
|
||
mock_qdrant_mgr.ensure_collection.assert_called_once()
|
||
assert (
|
||
mock_qdrant_mgr.upsert_technique_pages.called
|
||
or mock_qdrant_mgr.upsert_key_moments.called
|
||
), "Expected at least one upsert call to QdrantManager"
|
||
|
||
|
||
# ── (f) Resumability ────────────────────────────────────────────────────────
|
||
|
||
|
||
def test_run_pipeline_resumes_from_processing(
|
||
db_engine, sync_engine, pre_ingested_video, tmp_path
|
||
):
|
||
"""When status=processing, run_pipeline should re-run the full pipeline."""
|
||
video_id = pre_ingested_video["video_id"]
|
||
|
||
# Set video status to "processing" directly
|
||
factory = sessionmaker(bind=sync_engine)
|
||
session = factory()
|
||
video = session.execute(
|
||
select(SourceVideo).where(SourceVideo.id == video_id)
|
||
).scalar_one()
|
||
video.processing_status = ProcessingStatus.processing
|
||
session.commit()
|
||
session.close()
|
||
|
||
patches = _patch_pipeline_engine(sync_engine)
|
||
for p in patches:
|
||
p.start()
|
||
|
||
with patch("pipeline.stages.get_settings") as mock_settings, \
|
||
patch("pipeline.stages.stage2_segmentation") as mock_s2, \
|
||
patch("pipeline.stages.stage3_extraction") as mock_s3, \
|
||
patch("pipeline.stages.stage4_classification") as mock_s4, \
|
||
patch("pipeline.stages.stage5_synthesis") as mock_s5, \
|
||
patch("pipeline.stages.stage6_embed_and_index") as mock_s6, \
|
||
patch("pipeline.stages.celery_chain") as mock_chain, \
|
||
patch("pipeline.stages.mark_pipeline_error") as mock_err:
|
||
s = MagicMock()
|
||
s.database_url = TEST_DATABASE_URL_SYNC.replace("psycopg2", "asyncpg")
|
||
mock_settings.return_value = s
|
||
|
||
mock_pipeline = MagicMock()
|
||
mock_chain.return_value = mock_pipeline
|
||
|
||
mock_s2.s = MagicMock(return_value="s2_sig")
|
||
mock_s3.s = MagicMock(return_value="s3_sig")
|
||
mock_s4.s = MagicMock(return_value="s4_sig")
|
||
mock_s5.s = MagicMock(return_value="s5_sig")
|
||
mock_s6.s = MagicMock(return_value="s6_sig")
|
||
mock_err.s = MagicMock(return_value="err_sig")
|
||
|
||
from pipeline.stages import run_pipeline
|
||
|
||
run_pipeline(video_id)
|
||
|
||
# All stages should be called — full re-run
|
||
mock_s2.s.assert_called_once_with(video_id)
|
||
mock_s3.s.assert_called_once()
|
||
mock_s4.s.assert_called_once()
|
||
mock_s5.s.assert_called_once()
|
||
mock_s6.s.assert_called_once()
|
||
|
||
for p in patches:
|
||
p.stop()
|
||
|
||
|
||
# ── (g) Pipeline trigger endpoint ───────────────────────────────────────────
|
||
|
||
|
||
async def test_pipeline_trigger_endpoint(client, db_engine):
|
||
"""POST /api/v1/pipeline/trigger/{video_id} with valid video returns 200."""
|
||
# Ingest a transcript first to create a video
|
||
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
|
||
|
||
with patch("routers.ingest.run_pipeline", create=True) as mock_rp:
|
||
mock_rp.delay = MagicMock()
|
||
resp = await client.post(
|
||
"/api/v1/ingest",
|
||
files={"file": (sample.name, sample.read_bytes(), "application/json")},
|
||
)
|
||
assert resp.status_code == 200
|
||
video_id = resp.json()["video_id"]
|
||
|
||
# Trigger the pipeline
|
||
with patch("pipeline.stages.run_pipeline") as mock_rp:
|
||
mock_rp.delay = MagicMock()
|
||
resp = await client.post(f"/api/v1/pipeline/trigger/{video_id}")
|
||
|
||
assert resp.status_code == 200
|
||
data = resp.json()
|
||
assert data["status"] == "triggered"
|
||
assert data["video_id"] == video_id
|
||
|
||
|
||
# ── (h) Pipeline trigger 404 ────────────────────────────────────────────────
|
||
|
||
|
||
async def test_pipeline_trigger_404_for_missing_video(client):
|
||
"""POST /api/v1/pipeline/trigger/{nonexistent} returns 404."""
|
||
fake_id = str(uuid.uuid4())
|
||
resp = await client.post(f"/api/v1/pipeline/trigger/{fake_id}")
|
||
assert resp.status_code == 404
|
||
assert "not found" in resp.json()["detail"].lower()
|
||
|
||
|
||
# ── (i) Ingest dispatches pipeline ──────────────────────────────────────────
|
||
|
||
|
||
async def test_ingest_dispatches_pipeline(client, db_engine):
|
||
"""Ingesting a transcript should call run_pipeline.delay with the video_id."""
|
||
sample = pathlib.Path(__file__).parent / "fixtures" / "sample_transcript.json"
|
||
|
||
with patch("pipeline.stages.run_pipeline") as mock_rp:
|
||
mock_rp.delay = MagicMock()
|
||
resp = await client.post(
|
||
"/api/v1/ingest",
|
||
files={"file": (sample.name, sample.read_bytes(), "application/json")},
|
||
)
|
||
|
||
assert resp.status_code == 200
|
||
video_id = resp.json()["video_id"]
|
||
mock_rp.delay.assert_called_once_with(video_id)
|
||
|
||
|
||
# ── (j) LLM fallback on primary failure ─────────────────────────────────────
|
||
|
||
|
||
def test_llm_fallback_on_primary_failure():
|
||
"""LLMClient should fall back to secondary endpoint when primary raises APIConnectionError."""
|
||
from pipeline.llm_client import LLMClient
|
||
|
||
settings = MagicMock()
|
||
settings.llm_api_url = "http://primary:11434/v1"
|
||
settings.llm_api_key = "sk-test"
|
||
settings.llm_fallback_url = "http://fallback:11434/v1"
|
||
settings.llm_fallback_model = "fallback-model"
|
||
settings.llm_model = "primary-model"
|
||
|
||
with patch("openai.OpenAI") as MockOpenAI:
|
||
primary_client = MagicMock()
|
||
fallback_client = MagicMock()
|
||
|
||
# First call → primary, second call → fallback
|
||
MockOpenAI.side_effect = [primary_client, fallback_client]
|
||
|
||
client = LLMClient(settings)
|
||
|
||
# Primary raises APIConnectionError
|
||
primary_client.chat.completions.create.side_effect = openai.APIConnectionError(
|
||
request=MagicMock()
|
||
)
|
||
|
||
# Fallback succeeds
|
||
fallback_response = _make_mock_openai_response('{"result": "ok"}')
|
||
fallback_client.chat.completions.create.return_value = fallback_response
|
||
|
||
result = client.complete("system", "user")
|
||
|
||
assert result == '{"result": "ok"}'
|
||
primary_client.chat.completions.create.assert_called_once()
|
||
fallback_client.chat.completions.create.assert_called_once()
|
||
|
||
|
||
# ── Think-tag stripping ─────────────────────────────────────────────────────
|
||
|
||
|
||
def test_strip_think_tags():
|
||
"""strip_think_tags should handle all edge cases correctly."""
|
||
from pipeline.llm_client import strip_think_tags
|
||
|
||
# Single block with JSON after
|
||
assert strip_think_tags('<think>reasoning here</think>{"a": 1}') == '{"a": 1}'
|
||
|
||
# Multiline think block
|
||
assert strip_think_tags(
|
||
'<think>\nI need to analyze this.\nLet me think step by step.\n</think>\n{"result": "ok"}'
|
||
) == '{"result": "ok"}'
|
||
|
||
# Multiple think blocks
|
||
result = strip_think_tags('<think>first</think>hello<think>second</think> world')
|
||
assert result == "hello world"
|
||
|
||
# No think tags — passthrough
|
||
assert strip_think_tags('{"clean": true}') == '{"clean": true}'
|
||
|
||
# Empty string
|
||
assert strip_think_tags("") == ""
|
||
|
||
# Think block with special characters
|
||
assert strip_think_tags(
|
||
'<think>analyzing "complex" <data> & stuff</think>{"done": true}'
|
||
) == '{"done": true}'
|
||
|
||
# Only a think block, no actual content
|
||
assert strip_think_tags("<think>just thinking</think>") == ""
|