chrysopedia/backend/tests/test_search.py
jlightner 195ba6e0a7 test: Added 6 integration tests covering creator-scoped cascade tiers (…
- "backend/tests/test_search.py"

GSD-Task: S02/T02
2026-04-04 05:07:24 +00:00

1249 lines
45 KiB
Python

"""Integration tests for the /api/v1/search endpoint.
Tests run against a real PostgreSQL test database via httpx.AsyncClient.
SearchService is mocked at the router dependency level so we can test
endpoint behavior without requiring external embedding API or Qdrant.
LightRAG integration tests mock httpx calls at the service-instance level
to exercise _lightrag_search, result mapping, and fallback behavior with
real DB lookups.
"""
from __future__ import annotations
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from models import (
ContentType,
Creator,
KeyMoment,
KeyMomentContentType,
ProcessingStatus,
SourceVideo,
TechniquePage,
)
from search_service import SearchService
SEARCH_URL = "/api/v1/search"
# ── Seed helpers ─────────────────────────────────────────────────────────────
async def _seed_search_data(db_engine) -> dict:
"""Seed 2 creators, 3 technique pages, and 5 key moments for search tests.
Returns a dict with creator/technique IDs and metadata for assertions.
"""
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
# Creators
creator1 = Creator(
name="Mr. Bill",
slug="mr-bill",
genres=["Bass music", "Glitch"],
folder_name="MrBill",
)
creator2 = Creator(
name="KOAN Sound",
slug="koan-sound",
genres=["Drum & bass", "Neuro"],
folder_name="KOANSound",
)
session.add_all([creator1, creator2])
await session.flush()
# Videos (needed for key moments FK)
video1 = SourceVideo(
creator_id=creator1.id,
filename="bass-design-101.mp4",
file_path="MrBill/bass-design-101.mp4",
duration_seconds=600,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.complete,
)
video2 = SourceVideo(
creator_id=creator2.id,
filename="reese-bass-deep-dive.mp4",
file_path="KOANSound/reese-bass-deep-dive.mp4",
duration_seconds=900,
content_type=ContentType.tutorial,
processing_status=ProcessingStatus.complete,
)
session.add_all([video1, video2])
await session.flush()
# Technique pages
tp1 = TechniquePage(
creator_id=creator1.id,
title="Reese Bass Design",
slug="reese-bass-design",
topic_category="Sound design",
topic_tags=["bass", "textures"],
summary="How to create a classic reese bass",
)
tp2 = TechniquePage(
creator_id=creator2.id,
title="Granular Pad Textures",
slug="granular-pad-textures",
topic_category="Synthesis",
topic_tags=["granular", "pads"],
summary="Creating pad textures with granular synthesis",
)
tp3 = TechniquePage(
creator_id=creator1.id,
title="FM Bass Layering",
slug="fm-bass-layering",
topic_category="Synthesis",
topic_tags=["fm", "bass"],
summary="FM synthesis techniques for bass layering",
)
session.add_all([tp1, tp2, tp3])
await session.flush()
# Key moments
km1 = KeyMoment(
source_video_id=video1.id,
technique_page_id=tp1.id,
title="Setting up the Reese oscillator",
summary="Initial oscillator setup for reese bass",
start_time=10.0,
end_time=60.0,
content_type=KeyMomentContentType.technique,
)
km2 = KeyMoment(
source_video_id=video1.id,
technique_page_id=tp1.id,
title="Adding distortion to the Reese",
summary="Distortion processing chain for reese bass",
start_time=60.0,
end_time=120.0,
content_type=KeyMomentContentType.technique,
)
km3 = KeyMoment(
source_video_id=video2.id,
technique_page_id=tp2.id,
title="Granular engine settings",
summary="Dialing in granular engine parameters",
start_time=20.0,
end_time=80.0,
content_type=KeyMomentContentType.settings,
)
km4 = KeyMoment(
source_video_id=video1.id,
technique_page_id=tp3.id,
title="FM ratio selection",
summary="Choosing FM ratios for bass tones",
start_time=5.0,
end_time=45.0,
content_type=KeyMomentContentType.technique,
)
km5 = KeyMoment(
source_video_id=video2.id,
title="Outro and credits",
summary="End of the video",
start_time=800.0,
end_time=900.0,
content_type=KeyMomentContentType.workflow,
)
session.add_all([km1, km2, km3, km4, km5])
await session.commit()
return {
"creator1_id": str(creator1.id),
"creator1_name": creator1.name,
"creator1_slug": creator1.slug,
"creator2_id": str(creator2.id),
"creator2_name": creator2.name,
"tp1_slug": tp1.slug,
"tp1_title": tp1.title,
"tp2_slug": tp2.slug,
"tp3_slug": tp3.slug,
}
# ── Tests ────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_search_happy_path_with_mocked_service(client, db_engine):
"""Search endpoint returns mocked results with correct response shape."""
seed = await _seed_search_data(db_engine)
# Mock the SearchService.search method to return canned results
mock_result = {
"items": [
{
"type": "technique_page",
"title": "Reese Bass Design",
"slug": "reese-bass-design",
"summary": "How to create a classic reese bass",
"topic_category": "Sound design",
"topic_tags": ["bass", "textures"],
"creator_name": "Mr. Bill",
"creator_slug": "mr-bill",
"score": 0.95,
}
],
"total": 1,
"query": "reese bass",
"fallback_used": False,
}
with patch("routers.search.SearchService") as MockSvc:
instance = MockSvc.return_value
instance.search = AsyncMock(return_value=mock_result)
resp = await client.get(SEARCH_URL, params={"q": "reese bass"})
assert resp.status_code == 200
data = resp.json()
assert data["query"] == "reese bass"
assert data["total"] == 1
assert data["fallback_used"] is False
assert len(data["items"]) == 1
item = data["items"][0]
assert item["title"] == "Reese Bass Design"
assert item["slug"] == "reese-bass-design"
assert "score" in item
@pytest.mark.asyncio
async def test_search_empty_query_returns_empty(client, db_engine):
"""Empty search query returns empty results without hitting SearchService."""
await _seed_search_data(db_engine)
# With empty query, the search service returns empty results directly
mock_result = {
"items": [],
"total": 0,
"query": "",
"fallback_used": False,
}
with patch("routers.search.SearchService") as MockSvc:
instance = MockSvc.return_value
instance.search = AsyncMock(return_value=mock_result)
resp = await client.get(SEARCH_URL, params={"q": ""})
assert resp.status_code == 200
data = resp.json()
assert data["items"] == []
assert data["total"] == 0
assert data["query"] == ""
assert data["fallback_used"] is False
@pytest.mark.asyncio
async def test_search_keyword_fallback(client, db_engine):
"""When embedding fails, search uses keyword fallback and sets fallback_used=true."""
seed = await _seed_search_data(db_engine)
mock_result = {
"items": [
{
"type": "technique_page",
"title": "Reese Bass Design",
"slug": "reese-bass-design",
"summary": "How to create a classic reese bass",
"topic_category": "Sound design",
"topic_tags": ["bass", "textures"],
"creator_name": "",
"creator_slug": "",
"score": 0.0,
}
],
"total": 1,
"query": "reese",
"fallback_used": True,
}
with patch("routers.search.SearchService") as MockSvc:
instance = MockSvc.return_value
instance.search = AsyncMock(return_value=mock_result)
resp = await client.get(SEARCH_URL, params={"q": "reese"})
assert resp.status_code == 200
data = resp.json()
assert data["fallback_used"] is True
assert data["total"] >= 1
assert data["items"][0]["title"] == "Reese Bass Design"
@pytest.mark.asyncio
async def test_search_scope_filter(client, db_engine):
"""Search with scope=topics returns only technique_page type results."""
await _seed_search_data(db_engine)
mock_result = {
"items": [
{
"type": "technique_page",
"title": "FM Bass Layering",
"slug": "fm-bass-layering",
"summary": "FM synthesis techniques for bass layering",
"topic_category": "Synthesis",
"topic_tags": ["fm", "bass"],
"creator_name": "Mr. Bill",
"creator_slug": "mr-bill",
"score": 0.88,
}
],
"total": 1,
"query": "bass",
"fallback_used": False,
}
with patch("routers.search.SearchService") as MockSvc:
instance = MockSvc.return_value
instance.search = AsyncMock(return_value=mock_result)
resp = await client.get(SEARCH_URL, params={"q": "bass", "scope": "topics"})
assert resp.status_code == 200
data = resp.json()
# All items should be technique_page type when scope=topics
for item in data["items"]:
assert item["type"] == "technique_page"
# Verify the service was called with scope=topics
call_kwargs = instance.search.call_args
assert call_kwargs.kwargs.get("scope") == "topics" or call_kwargs[1].get("scope") == "topics"
@pytest.mark.asyncio
async def test_search_no_matching_results(client, db_engine):
"""Search with no matching results returns empty items list."""
await _seed_search_data(db_engine)
mock_result = {
"items": [],
"total": 0,
"query": "zzzznonexistent",
"fallback_used": True,
}
with patch("routers.search.SearchService") as MockSvc:
instance = MockSvc.return_value
instance.search = AsyncMock(return_value=mock_result)
resp = await client.get(SEARCH_URL, params={"q": "zzzznonexistent"})
assert resp.status_code == 200
data = resp.json()
assert data["items"] == []
assert data["total"] == 0
# ── SearchService.keyword_search integration tests ──────────────────────────
@pytest.mark.asyncio
async def test_keyword_search_technique_page_has_technique_page_slug(db_engine):
"""Keyword search for technique pages includes technique_page_slug matching its own slug."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
results = kw_result["items"]
assert len(results) >= 1
tp_result = next(r for r in results if r["type"] == "technique_page")
assert tp_result["technique_page_slug"] == "reese-bass-design"
assert tp_result["slug"] == "reese-bass-design"
# technique_page_slug == slug for technique pages (they ARE the parent)
assert tp_result["technique_page_slug"] == tp_result["slug"]
@pytest.mark.asyncio
async def test_keyword_search_key_moment_has_parent_technique_page_slug(db_engine):
"""Keyword search for key moments returns the parent technique page slug."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("Reese", "all", 20, session)
results = kw_result["items"]
km_results = [r for r in results if r["type"] == "key_moment"]
assert len(km_results) >= 1
for km in km_results:
assert "technique_page_slug" in km
# Both Reese-related key moments belong to tp1 (reese-bass-design)
assert km["technique_page_slug"] == "reese-bass-design"
@pytest.mark.asyncio
async def test_keyword_search_key_moment_without_technique_page(db_engine):
"""Key moments without a technique_page_id get empty technique_page_slug."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("Outro", "all", 20, session)
results = kw_result["items"]
km_results = [r for r in results if r["type"] == "key_moment"]
assert len(km_results) == 1
assert km_results[0]["technique_page_slug"] == ""
# ── Multi-token AND keyword search tests ─────────────────────────────────────
@pytest.mark.asyncio
async def test_keyword_search_multi_token_and_logic(db_engine):
"""Multi-token query requires all tokens to match across fields."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Reese Bass" — both tokens appear in tp1 title "Reese Bass Design"
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
assert all("reese" in r["title"].lower() or "bass" in r["title"].lower()
for r in items if r["type"] == "technique_page")
# "Granular bass" — 'granular' is in tp2, 'bass' is NOT in tp2 title/summary
# but tp2 summary says "granular synthesis" not "bass" — no AND match expected
kw_result2 = await svc.keyword_search("Granular bass", "topics", 10, session)
items2 = kw_result2["items"]
# Should NOT contain tp2 since "bass" doesn't appear in tp2's fields
tp2_results = [r for r in items2 if r["slug"] == "granular-pad-textures"]
assert len(tp2_results) == 0
@pytest.mark.asyncio
async def test_keyword_search_cross_field_token_matching(db_engine):
"""Tokens can match across different fields (e.g., one in title, one in creator name)."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Bill Reese" — "Bill" matches Creator.name "Mr. Bill", "Reese" matches title
kw_result = await svc.keyword_search("Bill Reese", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# tp1 "Reese Bass Design" by "Mr. Bill" should match
slugs = [r["slug"] for r in items]
assert "reese-bass-design" in slugs
@pytest.mark.asyncio
async def test_keyword_search_partial_matches_on_zero_and(db_engine):
"""When AND yields no results, partial_matches returns rows scored by token coverage."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "xyznonexistent Reese" — no row matches both, but "Reese" matches several
kw_result = await svc.keyword_search("xyznonexistent Reese", "all", 20, session)
assert kw_result["items"] == []
assert len(kw_result["partial_matches"]) >= 1
# Partial matches should have scores between 0 and 1
for pm in kw_result["partial_matches"]:
assert 0 < pm["score"] <= 1.0
@pytest.mark.asyncio
async def test_keyword_search_single_token_no_partial(db_engine):
"""Single-token search that fails returns no partial_matches (only multi-token triggers partial)."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("xyznonexistent", "all", 20, session)
assert kw_result["items"] == []
assert kw_result["partial_matches"] == []
@pytest.mark.asyncio
async def test_keyword_search_topic_tags_matching(db_engine):
"""Tokens that appear in topic_tags array are matched via array_to_string."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "textures" is a topic_tag on tp1, "Bill" is the creator
kw_result = await svc.keyword_search("textures Bill", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
slugs = [r["slug"] for r in items]
assert "reese-bass-design" in slugs
@pytest.mark.asyncio
async def test_keyword_search_creator_genres_matching(db_engine):
"""Creator search matches against genres array via array_to_string."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Glitch" is a genre on creator1 "Mr. Bill"
kw_result = await svc.keyword_search("Bill Glitch", "creators", 10, session)
items = kw_result["items"]
assert len(items) >= 1
assert any(r["title"] == "Mr. Bill" for r in items)
# ── Suggestions endpoint tests ───────────────────────────────────────────────
SUGGESTIONS_URL = "/api/v1/search/suggestions"
@pytest.mark.asyncio
async def test_suggestions_returns_correct_shape(client, db_engine):
"""Suggestions endpoint returns items with text and type fields."""
await _seed_search_data(db_engine)
resp = await client.get(SUGGESTIONS_URL)
assert resp.status_code == 200
data = resp.json()
assert "suggestions" in data
assert isinstance(data["suggestions"], list)
assert len(data["suggestions"]) > 0
for item in data["suggestions"]:
assert "text" in item
assert "type" in item
assert item["type"] in ("topic", "technique", "creator")
assert len(item["text"]) > 0
@pytest.mark.asyncio
async def test_suggestions_includes_all_types(client, db_engine):
"""Suggestions should include technique, topic, and creator types."""
await _seed_search_data(db_engine)
resp = await client.get(SUGGESTIONS_URL)
assert resp.status_code == 200
data = resp.json()
types_present = {item["type"] for item in data["suggestions"]}
assert "technique" in types_present, "Expected technique suggestions"
assert "topic" in types_present, "Expected topic suggestions"
assert "creator" in types_present, "Expected creator suggestions"
@pytest.mark.asyncio
async def test_suggestions_no_duplicates(client, db_engine):
"""Suggestions should not contain duplicate texts (case-insensitive)."""
await _seed_search_data(db_engine)
resp = await client.get(SUGGESTIONS_URL)
assert resp.status_code == 200
data = resp.json()
texts_lower = [item["text"].lower() for item in data["suggestions"]]
assert len(texts_lower) == len(set(texts_lower)), "Duplicate suggestions found"
@pytest.mark.asyncio
async def test_suggestions_empty_db(client, db_engine):
"""Suggestions endpoint returns empty list on empty database."""
resp = await client.get(SUGGESTIONS_URL)
assert resp.status_code == 200
data = resp.json()
assert data["suggestions"] == []
@pytest.mark.asyncio
async def test_suggestions_respects_view_count_ordering(client, db_engine):
"""Higher view_count technique pages should appear first among techniques."""
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
creator = Creator(
name="Test Creator",
slug="test-creator",
genres=["Electronic"],
folder_name="TestCreator",
view_count=10,
)
session.add(creator)
await session.flush()
tp_low = TechniquePage(
creator_id=creator.id,
title="Low Views Page",
slug="low-views-page",
topic_category="Sound design",
topic_tags=["bass"],
view_count=5,
)
tp_high = TechniquePage(
creator_id=creator.id,
title="High Views Page",
slug="high-views-page",
topic_category="Synthesis",
topic_tags=["pads"],
view_count=100,
)
session.add_all([tp_low, tp_high])
await session.commit()
resp = await client.get(SUGGESTIONS_URL)
assert resp.status_code == 200
data = resp.json()
technique_items = [
item for item in data["suggestions"] if item["type"] == "technique"
]
assert len(technique_items) >= 2
# High Views Page should come before Low Views Page
titles = [item["text"] for item in technique_items]
assert titles.index("High Views Page") < titles.index("Low Views Page")
# ── Match context tests ──────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_keyword_search_match_context_creator(db_engine):
"""Match context includes creator name when query matches creator."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("Bill", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# At least one result should have match_context mentioning the creator
contexts = [r["match_context"] for r in items]
assert any("Creator: Mr. Bill" in c for c in contexts), f"Expected creator context, got: {contexts}"
@pytest.mark.asyncio
async def test_keyword_search_match_context_tag(db_engine):
"""Match context includes tag name when query matches a topic tag."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
kw_result = await svc.keyword_search("granular", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
contexts = [r["match_context"] for r in items]
assert any("Tag: granular" in c for c in contexts), f"Expected tag context, got: {contexts}"
@pytest.mark.asyncio
async def test_keyword_search_match_context_multi_token(db_engine):
"""Multi-token match context shows multiple match reasons."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# "Bill bass" — "Bill" matches creator, "bass" matches tag/title
kw_result = await svc.keyword_search("Bill bass", "topics", 10, session)
items = kw_result["items"]
assert len(items) >= 1
# The match_context should contain both creator and another field
contexts = [r["match_context"] for r in items]
assert any("Creator: Mr. Bill" in c for c in contexts)
# ── LightRAG integration tests ──────────────────────────────────────────────
def _make_lightrag_response(seed: dict) -> dict:
"""Build a realistic LightRAG /query/data response body.
Uses seed data to construct file_source paths that match seeded technique
pages (slug + creator_id format).
"""
return {
"data": {
"chunks": [
{
"content": "Layering multiple snare samples for punch and body",
"file_path": f"technique:reese-bass-design:creator:{seed['creator1_id']}",
},
{
"content": "Granular techniques for pad textures",
"file_path": f"technique:granular-pad-textures:creator:{seed['creator2_id']}",
},
],
"entities": [
{"entity_name": "Reese Bass Design"},
{"entity_name": "Granular Pad Textures"},
],
"relationships": [
{"source": "Reese Bass Design", "target": "FM Bass Layering", "relationship": "related_to"},
],
}
}
def _mock_httpx_response(body: dict, status_code: int = 200) -> httpx.Response:
"""Build a mock httpx.Response with JSON body."""
resp = httpx.Response(
status_code=status_code,
json=body,
request=httpx.Request("POST", "http://mock/query/data"),
)
return resp
@pytest.mark.asyncio
async def test_search_lightrag_primary_path(db_engine):
"""LightRAG primary path returns mapped technique pages, fallback_used=False."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Mock the httpx client's post method
mock_resp = _mock_httpx_response(_make_lightrag_response(seed))
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=mock_resp)
# Also mock embed_query to avoid hitting real embedding API
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
assert result["fallback_used"] is False
assert result["total"] >= 1
slugs = [item["slug"] for item in result["items"]]
assert "reese-bass-design" in slugs
# Verify result structure matches SearchResponse schema fields
for item in result["items"]:
assert "type" in item
assert "title" in item
assert "slug" in item
assert "score" in item
assert "creator_name" in item
assert "match_context" in item
# LightRAG results should have "LightRAG graph match" context
lightrag_items = [i for i in result["items"] if i.get("match_context") == "LightRAG graph match"]
assert len(lightrag_items) >= 1
@pytest.mark.asyncio
async def test_search_lightrag_fallback_on_timeout(db_engine):
"""When LightRAG times out, search falls back to keyword engine."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Mock httpx.post to raise TimeoutException
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(side_effect=httpx.TimeoutException("read timed out"))
# Mock embed_query to avoid hitting real embedding API (Qdrant fallback path)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
# Should fall back — keyword engine has "Reese Bass Design" from seed data
assert result["fallback_used"] is True
assert result["total"] >= 1
# Results should come from keyword search (seeded data matches "reese bass")
slugs = [item["slug"] for item in result["items"]]
assert "reese-bass-design" in slugs
@pytest.mark.asyncio
async def test_search_lightrag_fallback_on_connection_error(db_engine):
"""When LightRAG connection fails, search falls back to keyword engine."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Mock httpx.post to raise ConnectError
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(
side_effect=httpx.ConnectError("connection refused")
)
# Mock embed_query to avoid hitting real embedding API
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
assert result["fallback_used"] is True
assert result["total"] >= 1
@pytest.mark.asyncio
async def test_search_lightrag_fallback_on_empty_response(db_engine):
"""When LightRAG returns empty data, search falls back to keyword engine."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Mock httpx.post to return empty data
mock_resp = _mock_httpx_response({"data": {}})
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=mock_resp)
# Mock embed_query to avoid hitting real embedding API
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
assert result["fallback_used"] is True
# Keyword fallback should still find results from seed data
assert result["total"] >= 1
@pytest.mark.asyncio
async def test_search_lightrag_skipped_for_short_query(db_engine):
"""Queries shorter than lightrag_min_query_length skip LightRAG entirely."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Mock httpx — should NOT be called for short queries
mock_httpx = AsyncMock()
mock_httpx.post = AsyncMock()
svc._httpx = mock_httpx
# Mock embed_query to avoid real calls
svc.embed_query = AsyncMock(return_value=None)
# "ab" is 2 chars, below the default min_query_length of 3
result = await svc.search("ab", "all", 10, session)
# LightRAG should not have been called
mock_httpx.post.assert_not_called()
# fallback_used should be True since LightRAG was skipped
assert result["fallback_used"] is True
@pytest.mark.asyncio
async def test_search_lightrag_result_ordering_preserved(db_engine):
"""LightRAG results maintain retrieval-rank ordering with decreasing scores."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
mock_resp = _mock_httpx_response(_make_lightrag_response(seed))
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=mock_resp)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
# LightRAG items should have scores in descending order
lightrag_items = [i for i in result["items"] if i.get("match_context") == "LightRAG graph match"]
if len(lightrag_items) >= 2:
scores = [item["score"] for item in lightrag_items]
assert scores == sorted(scores, reverse=True), f"Scores not descending: {scores}"
# First result should have highest score (1.0)
assert scores[0] == 1.0
@pytest.mark.asyncio
async def test_search_lightrag_fallback_on_http_error(db_engine):
"""When LightRAG returns a 500 status, search falls back gracefully."""
seed = await _seed_search_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Return a 500 response — raise_for_status() will raise
error_resp = httpx.Response(
status_code=500,
text="Internal Server Error",
request=httpx.Request("POST", "http://mock/query/data"),
)
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=error_resp)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass design", "all", 10, session)
assert result["fallback_used"] is True
assert result["total"] >= 1
# ── Creator-scoped cascade integration tests ─────────────────────────────────
async def _seed_cascade_data(db_engine) -> dict:
"""Seed creators and technique pages for cascade tier testing.
Creator 'keota' has 3 Sound Design pages (≥2 → domain='Sound Design').
Creator 'virtual-riot' has 1 Synthesis page (< 2 → no dominant domain).
"""
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
keota = Creator(
name="Keota",
slug="keota",
genres=["Bass music"],
folder_name="Keota",
)
vr = Creator(
name="Virtual Riot",
slug="virtual-riot",
genres=["Dubstep"],
folder_name="VirtualRiot",
)
session.add_all([keota, vr])
await session.flush()
tp1 = TechniquePage(
creator_id=keota.id,
title="Reese Bass Fundamentals",
slug="reese-bass-fundamentals",
topic_category="Sound Design",
topic_tags=["bass", "reese"],
summary="Fundamentals of reese bass",
)
tp2 = TechniquePage(
creator_id=keota.id,
title="FM Sound Design",
slug="fm-sound-design",
topic_category="Sound Design",
topic_tags=["fm", "design"],
summary="FM sound design techniques",
)
tp3 = TechniquePage(
creator_id=keota.id,
title="Granular Textures",
slug="granular-textures",
topic_category="Sound Design",
topic_tags=["granular"],
summary="Granular texture design",
)
tp4 = TechniquePage(
creator_id=vr.id,
title="Serum Wavetable Tricks",
slug="serum-wavetable-tricks",
topic_category="Synthesis",
topic_tags=["serum", "wavetable"],
summary="Advanced Serum wavetable tricks",
)
session.add_all([tp1, tp2, tp3, tp4])
await session.commit()
return {
"keota_id": str(keota.id),
"keota_name": keota.name,
"keota_slug": keota.slug,
"vr_id": str(vr.id),
"vr_name": vr.name,
"tp1_slug": tp1.slug,
"tp2_slug": tp2.slug,
"tp3_slug": tp3.slug,
"tp4_slug": tp4.slug,
}
def _cascade_lightrag_body(chunks: list[dict]) -> dict:
"""Build a LightRAG /query/data response with given chunks."""
return {
"data": {
"chunks": chunks,
"entities": [],
"relationships": [],
}
}
def _chunk(slug: str, creator_id: str, content: str = "chunk content") -> dict:
return {
"content": content,
"file_path": f"technique:{slug}:creator:{creator_id}",
}
@pytest.mark.asyncio
async def test_search_cascade_creator_tier(db_engine):
"""Tier 1: creator-scoped search returns results → cascade_tier='creator'."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# httpx returns chunks matching keota's technique pages
body = _cascade_lightrag_body([
_chunk(seed["tp1_slug"], seed["keota_id"], "Reese bass fundamentals"),
])
mock_resp = _mock_httpx_response(body)
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=mock_resp)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("reese bass", "all", 10, session, creator="keota")
assert result["cascade_tier"] == "creator"
assert result["fallback_used"] is False
assert result["total"] >= 1
# All cascade items belong to keota
cascade_items = [i for i in result["items"] if i.get("creator_slug") == "keota"]
assert len(cascade_items) >= 1
@pytest.mark.asyncio
async def test_search_cascade_domain_tier(db_engine):
"""Tier 2: creator-scoped empty → domain-scoped returns results → cascade_tier='domain'."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Call 1 (creator-scoped): returns chunks for a DIFFERENT creator → post-filter removes them
creator_body = _cascade_lightrag_body([
_chunk(seed["tp4_slug"], seed["vr_id"], "VR content not Keota"),
])
# Call 2 (domain-scoped with "Sound Design"): returns chunks matching Keota
domain_body = _cascade_lightrag_body([
_chunk(seed["tp1_slug"], seed["keota_id"], "Reese bass from domain"),
])
call_count = 0
async def _side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return _mock_httpx_response(creator_body)
else:
return _mock_httpx_response(domain_body)
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(side_effect=_side_effect)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("synthesis techniques", "all", 10, session, creator="keota")
assert result["cascade_tier"] == "domain"
assert result["fallback_used"] is False
assert result["total"] >= 1
@pytest.mark.asyncio
async def test_search_cascade_global_fallback(db_engine):
"""Tier 3: creator + domain empty → global LightRAG returns → cascade_tier='global'."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# Calls 1-2 (creator + domain): empty chunks
empty_body = _cascade_lightrag_body([])
# Call 3 (global _lightrag_search): returns results
global_body = _cascade_lightrag_body([
_chunk(seed["tp4_slug"], seed["vr_id"], "Global result"),
])
call_count = 0
async def _side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2:
return _mock_httpx_response(empty_body)
else:
return _mock_httpx_response(global_body)
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(side_effect=_side_effect)
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("mixing tips", "all", 10, session, creator="keota")
assert result["cascade_tier"] == "global"
assert result["fallback_used"] is False
assert result["total"] >= 1
@pytest.mark.asyncio
async def test_search_cascade_graceful_empty(db_engine):
"""Tier 4: all tiers empty → cascade_tier='none', fallback_used=True."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# All calls return empty chunks
empty_body = _cascade_lightrag_body([])
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(empty_body))
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("nonexistent topic xyz", "all", 10, session, creator="keota")
assert result["cascade_tier"] == "none"
assert result["fallback_used"] is True
@pytest.mark.asyncio
async def test_search_cascade_unknown_creator(db_engine):
"""Unknown creator slug → cascade skipped, normal search, cascade_tier=''."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
# LightRAG returns normal results (non-cascade path)
body = _cascade_lightrag_body([
_chunk(seed["tp4_slug"], seed["vr_id"], "Normal search result"),
])
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(body))
svc.embed_query = AsyncMock(return_value=None)
result = await svc.search("bass design", "all", 10, session, creator="nonexistent-slug")
# Cascade skipped — falls through to normal search
assert result["cascade_tier"] == ""
@pytest.mark.asyncio
async def test_search_no_creator_param_unchanged(db_engine):
"""No creator param → normal search path, cascade_tier='' (empty)."""
seed = await _seed_cascade_data(db_engine)
session_factory = async_sessionmaker(
db_engine, class_=AsyncSession, expire_on_commit=False
)
async with session_factory() as session:
from config import Settings
svc = SearchService(settings=Settings())
body = _cascade_lightrag_body([
_chunk(seed["tp1_slug"], seed["keota_id"], "Normal result"),
])
svc._httpx = AsyncMock()
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(body))
svc.embed_query = AsyncMock(return_value=None)
# No creator param
result = await svc.search("reese bass", "all", 10, session)
assert result["cascade_tier"] == ""
assert result["total"] >= 1