1249 lines
45 KiB
Python
1249 lines
45 KiB
Python
"""Integration tests for the /api/v1/search endpoint.
|
|
|
|
Tests run against a real PostgreSQL test database via httpx.AsyncClient.
|
|
SearchService is mocked at the router dependency level so we can test
|
|
endpoint behavior without requiring external embedding API or Qdrant.
|
|
|
|
LightRAG integration tests mock httpx calls at the service-instance level
|
|
to exercise _lightrag_search, result mapping, and fallback behavior with
|
|
real DB lookups.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
from models import (
|
|
ContentType,
|
|
Creator,
|
|
KeyMoment,
|
|
KeyMomentContentType,
|
|
ProcessingStatus,
|
|
SourceVideo,
|
|
TechniquePage,
|
|
)
|
|
from search_service import SearchService
|
|
|
|
SEARCH_URL = "/api/v1/search"
|
|
|
|
|
|
# ── Seed helpers ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _seed_search_data(db_engine) -> dict:
|
|
"""Seed 2 creators, 3 technique pages, and 5 key moments for search tests.
|
|
|
|
Returns a dict with creator/technique IDs and metadata for assertions.
|
|
"""
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
# Creators
|
|
creator1 = Creator(
|
|
name="Mr. Bill",
|
|
slug="mr-bill",
|
|
genres=["Bass music", "Glitch"],
|
|
folder_name="MrBill",
|
|
)
|
|
creator2 = Creator(
|
|
name="KOAN Sound",
|
|
slug="koan-sound",
|
|
genres=["Drum & bass", "Neuro"],
|
|
folder_name="KOANSound",
|
|
)
|
|
session.add_all([creator1, creator2])
|
|
await session.flush()
|
|
|
|
# Videos (needed for key moments FK)
|
|
video1 = SourceVideo(
|
|
creator_id=creator1.id,
|
|
filename="bass-design-101.mp4",
|
|
file_path="MrBill/bass-design-101.mp4",
|
|
duration_seconds=600,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.complete,
|
|
)
|
|
video2 = SourceVideo(
|
|
creator_id=creator2.id,
|
|
filename="reese-bass-deep-dive.mp4",
|
|
file_path="KOANSound/reese-bass-deep-dive.mp4",
|
|
duration_seconds=900,
|
|
content_type=ContentType.tutorial,
|
|
processing_status=ProcessingStatus.complete,
|
|
)
|
|
session.add_all([video1, video2])
|
|
await session.flush()
|
|
|
|
# Technique pages
|
|
tp1 = TechniquePage(
|
|
creator_id=creator1.id,
|
|
title="Reese Bass Design",
|
|
slug="reese-bass-design",
|
|
topic_category="Sound design",
|
|
topic_tags=["bass", "textures"],
|
|
summary="How to create a classic reese bass",
|
|
)
|
|
tp2 = TechniquePage(
|
|
creator_id=creator2.id,
|
|
title="Granular Pad Textures",
|
|
slug="granular-pad-textures",
|
|
topic_category="Synthesis",
|
|
topic_tags=["granular", "pads"],
|
|
summary="Creating pad textures with granular synthesis",
|
|
)
|
|
tp3 = TechniquePage(
|
|
creator_id=creator1.id,
|
|
title="FM Bass Layering",
|
|
slug="fm-bass-layering",
|
|
topic_category="Synthesis",
|
|
topic_tags=["fm", "bass"],
|
|
summary="FM synthesis techniques for bass layering",
|
|
)
|
|
session.add_all([tp1, tp2, tp3])
|
|
await session.flush()
|
|
|
|
# Key moments
|
|
km1 = KeyMoment(
|
|
source_video_id=video1.id,
|
|
technique_page_id=tp1.id,
|
|
title="Setting up the Reese oscillator",
|
|
summary="Initial oscillator setup for reese bass",
|
|
start_time=10.0,
|
|
end_time=60.0,
|
|
content_type=KeyMomentContentType.technique,
|
|
)
|
|
km2 = KeyMoment(
|
|
source_video_id=video1.id,
|
|
technique_page_id=tp1.id,
|
|
title="Adding distortion to the Reese",
|
|
summary="Distortion processing chain for reese bass",
|
|
start_time=60.0,
|
|
end_time=120.0,
|
|
content_type=KeyMomentContentType.technique,
|
|
)
|
|
km3 = KeyMoment(
|
|
source_video_id=video2.id,
|
|
technique_page_id=tp2.id,
|
|
title="Granular engine settings",
|
|
summary="Dialing in granular engine parameters",
|
|
start_time=20.0,
|
|
end_time=80.0,
|
|
content_type=KeyMomentContentType.settings,
|
|
)
|
|
km4 = KeyMoment(
|
|
source_video_id=video1.id,
|
|
technique_page_id=tp3.id,
|
|
title="FM ratio selection",
|
|
summary="Choosing FM ratios for bass tones",
|
|
start_time=5.0,
|
|
end_time=45.0,
|
|
content_type=KeyMomentContentType.technique,
|
|
)
|
|
km5 = KeyMoment(
|
|
source_video_id=video2.id,
|
|
title="Outro and credits",
|
|
summary="End of the video",
|
|
start_time=800.0,
|
|
end_time=900.0,
|
|
content_type=KeyMomentContentType.workflow,
|
|
)
|
|
session.add_all([km1, km2, km3, km4, km5])
|
|
await session.commit()
|
|
|
|
return {
|
|
"creator1_id": str(creator1.id),
|
|
"creator1_name": creator1.name,
|
|
"creator1_slug": creator1.slug,
|
|
"creator2_id": str(creator2.id),
|
|
"creator2_name": creator2.name,
|
|
"tp1_slug": tp1.slug,
|
|
"tp1_title": tp1.title,
|
|
"tp2_slug": tp2.slug,
|
|
"tp3_slug": tp3.slug,
|
|
}
|
|
|
|
|
|
# ── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_happy_path_with_mocked_service(client, db_engine):
|
|
"""Search endpoint returns mocked results with correct response shape."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
# Mock the SearchService.search method to return canned results
|
|
mock_result = {
|
|
"items": [
|
|
{
|
|
"type": "technique_page",
|
|
"title": "Reese Bass Design",
|
|
"slug": "reese-bass-design",
|
|
"summary": "How to create a classic reese bass",
|
|
"topic_category": "Sound design",
|
|
"topic_tags": ["bass", "textures"],
|
|
"creator_name": "Mr. Bill",
|
|
"creator_slug": "mr-bill",
|
|
"score": 0.95,
|
|
}
|
|
],
|
|
"total": 1,
|
|
"query": "reese bass",
|
|
"fallback_used": False,
|
|
}
|
|
|
|
with patch("routers.search.SearchService") as MockSvc:
|
|
instance = MockSvc.return_value
|
|
instance.search = AsyncMock(return_value=mock_result)
|
|
|
|
resp = await client.get(SEARCH_URL, params={"q": "reese bass"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["query"] == "reese bass"
|
|
assert data["total"] == 1
|
|
assert data["fallback_used"] is False
|
|
assert len(data["items"]) == 1
|
|
|
|
item = data["items"][0]
|
|
assert item["title"] == "Reese Bass Design"
|
|
assert item["slug"] == "reese-bass-design"
|
|
assert "score" in item
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_empty_query_returns_empty(client, db_engine):
|
|
"""Empty search query returns empty results without hitting SearchService."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
# With empty query, the search service returns empty results directly
|
|
mock_result = {
|
|
"items": [],
|
|
"total": 0,
|
|
"query": "",
|
|
"fallback_used": False,
|
|
}
|
|
|
|
with patch("routers.search.SearchService") as MockSvc:
|
|
instance = MockSvc.return_value
|
|
instance.search = AsyncMock(return_value=mock_result)
|
|
|
|
resp = await client.get(SEARCH_URL, params={"q": ""})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["items"] == []
|
|
assert data["total"] == 0
|
|
assert data["query"] == ""
|
|
assert data["fallback_used"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_keyword_fallback(client, db_engine):
|
|
"""When embedding fails, search uses keyword fallback and sets fallback_used=true."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
mock_result = {
|
|
"items": [
|
|
{
|
|
"type": "technique_page",
|
|
"title": "Reese Bass Design",
|
|
"slug": "reese-bass-design",
|
|
"summary": "How to create a classic reese bass",
|
|
"topic_category": "Sound design",
|
|
"topic_tags": ["bass", "textures"],
|
|
"creator_name": "",
|
|
"creator_slug": "",
|
|
"score": 0.0,
|
|
}
|
|
],
|
|
"total": 1,
|
|
"query": "reese",
|
|
"fallback_used": True,
|
|
}
|
|
|
|
with patch("routers.search.SearchService") as MockSvc:
|
|
instance = MockSvc.return_value
|
|
instance.search = AsyncMock(return_value=mock_result)
|
|
|
|
resp = await client.get(SEARCH_URL, params={"q": "reese"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["fallback_used"] is True
|
|
assert data["total"] >= 1
|
|
assert data["items"][0]["title"] == "Reese Bass Design"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_scope_filter(client, db_engine):
|
|
"""Search with scope=topics returns only technique_page type results."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
mock_result = {
|
|
"items": [
|
|
{
|
|
"type": "technique_page",
|
|
"title": "FM Bass Layering",
|
|
"slug": "fm-bass-layering",
|
|
"summary": "FM synthesis techniques for bass layering",
|
|
"topic_category": "Synthesis",
|
|
"topic_tags": ["fm", "bass"],
|
|
"creator_name": "Mr. Bill",
|
|
"creator_slug": "mr-bill",
|
|
"score": 0.88,
|
|
}
|
|
],
|
|
"total": 1,
|
|
"query": "bass",
|
|
"fallback_used": False,
|
|
}
|
|
|
|
with patch("routers.search.SearchService") as MockSvc:
|
|
instance = MockSvc.return_value
|
|
instance.search = AsyncMock(return_value=mock_result)
|
|
|
|
resp = await client.get(SEARCH_URL, params={"q": "bass", "scope": "topics"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
# All items should be technique_page type when scope=topics
|
|
for item in data["items"]:
|
|
assert item["type"] == "technique_page"
|
|
|
|
# Verify the service was called with scope=topics
|
|
call_kwargs = instance.search.call_args
|
|
assert call_kwargs.kwargs.get("scope") == "topics" or call_kwargs[1].get("scope") == "topics"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_no_matching_results(client, db_engine):
|
|
"""Search with no matching results returns empty items list."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
mock_result = {
|
|
"items": [],
|
|
"total": 0,
|
|
"query": "zzzznonexistent",
|
|
"fallback_used": True,
|
|
}
|
|
|
|
with patch("routers.search.SearchService") as MockSvc:
|
|
instance = MockSvc.return_value
|
|
instance.search = AsyncMock(return_value=mock_result)
|
|
|
|
resp = await client.get(SEARCH_URL, params={"q": "zzzznonexistent"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["items"] == []
|
|
assert data["total"] == 0
|
|
|
|
|
|
# ── SearchService.keyword_search integration tests ──────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_technique_page_has_technique_page_slug(db_engine):
|
|
"""Keyword search for technique pages includes technique_page_slug matching its own slug."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
|
|
results = kw_result["items"]
|
|
|
|
assert len(results) >= 1
|
|
tp_result = next(r for r in results if r["type"] == "technique_page")
|
|
assert tp_result["technique_page_slug"] == "reese-bass-design"
|
|
assert tp_result["slug"] == "reese-bass-design"
|
|
# technique_page_slug == slug for technique pages (they ARE the parent)
|
|
assert tp_result["technique_page_slug"] == tp_result["slug"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_key_moment_has_parent_technique_page_slug(db_engine):
|
|
"""Keyword search for key moments returns the parent technique page slug."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
kw_result = await svc.keyword_search("Reese", "all", 20, session)
|
|
results = kw_result["items"]
|
|
|
|
km_results = [r for r in results if r["type"] == "key_moment"]
|
|
assert len(km_results) >= 1
|
|
for km in km_results:
|
|
assert "technique_page_slug" in km
|
|
# Both Reese-related key moments belong to tp1 (reese-bass-design)
|
|
assert km["technique_page_slug"] == "reese-bass-design"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_key_moment_without_technique_page(db_engine):
|
|
"""Key moments without a technique_page_id get empty technique_page_slug."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
kw_result = await svc.keyword_search("Outro", "all", 20, session)
|
|
results = kw_result["items"]
|
|
|
|
km_results = [r for r in results if r["type"] == "key_moment"]
|
|
assert len(km_results) == 1
|
|
assert km_results[0]["technique_page_slug"] == ""
|
|
|
|
|
|
# ── Multi-token AND keyword search tests ─────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_multi_token_and_logic(db_engine):
|
|
"""Multi-token query requires all tokens to match across fields."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "Reese Bass" — both tokens appear in tp1 title "Reese Bass Design"
|
|
kw_result = await svc.keyword_search("Reese Bass", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
assert all("reese" in r["title"].lower() or "bass" in r["title"].lower()
|
|
for r in items if r["type"] == "technique_page")
|
|
|
|
# "Granular bass" — 'granular' is in tp2, 'bass' is NOT in tp2 title/summary
|
|
# but tp2 summary says "granular synthesis" not "bass" — no AND match expected
|
|
kw_result2 = await svc.keyword_search("Granular bass", "topics", 10, session)
|
|
items2 = kw_result2["items"]
|
|
# Should NOT contain tp2 since "bass" doesn't appear in tp2's fields
|
|
tp2_results = [r for r in items2 if r["slug"] == "granular-pad-textures"]
|
|
assert len(tp2_results) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_cross_field_token_matching(db_engine):
|
|
"""Tokens can match across different fields (e.g., one in title, one in creator name)."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "Bill Reese" — "Bill" matches Creator.name "Mr. Bill", "Reese" matches title
|
|
kw_result = await svc.keyword_search("Bill Reese", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
# tp1 "Reese Bass Design" by "Mr. Bill" should match
|
|
slugs = [r["slug"] for r in items]
|
|
assert "reese-bass-design" in slugs
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_partial_matches_on_zero_and(db_engine):
|
|
"""When AND yields no results, partial_matches returns rows scored by token coverage."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "xyznonexistent Reese" — no row matches both, but "Reese" matches several
|
|
kw_result = await svc.keyword_search("xyznonexistent Reese", "all", 20, session)
|
|
assert kw_result["items"] == []
|
|
assert len(kw_result["partial_matches"]) >= 1
|
|
# Partial matches should have scores between 0 and 1
|
|
for pm in kw_result["partial_matches"]:
|
|
assert 0 < pm["score"] <= 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_single_token_no_partial(db_engine):
|
|
"""Single-token search that fails returns no partial_matches (only multi-token triggers partial)."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
kw_result = await svc.keyword_search("xyznonexistent", "all", 20, session)
|
|
assert kw_result["items"] == []
|
|
assert kw_result["partial_matches"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_topic_tags_matching(db_engine):
|
|
"""Tokens that appear in topic_tags array are matched via array_to_string."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "textures" is a topic_tag on tp1, "Bill" is the creator
|
|
kw_result = await svc.keyword_search("textures Bill", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
slugs = [r["slug"] for r in items]
|
|
assert "reese-bass-design" in slugs
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_creator_genres_matching(db_engine):
|
|
"""Creator search matches against genres array via array_to_string."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "Glitch" is a genre on creator1 "Mr. Bill"
|
|
kw_result = await svc.keyword_search("Bill Glitch", "creators", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
assert any(r["title"] == "Mr. Bill" for r in items)
|
|
|
|
|
|
# ── Suggestions endpoint tests ───────────────────────────────────────────────
|
|
|
|
SUGGESTIONS_URL = "/api/v1/search/suggestions"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggestions_returns_correct_shape(client, db_engine):
|
|
"""Suggestions endpoint returns items with text and type fields."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
resp = await client.get(SUGGESTIONS_URL)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert "suggestions" in data
|
|
assert isinstance(data["suggestions"], list)
|
|
assert len(data["suggestions"]) > 0
|
|
|
|
for item in data["suggestions"]:
|
|
assert "text" in item
|
|
assert "type" in item
|
|
assert item["type"] in ("topic", "technique", "creator")
|
|
assert len(item["text"]) > 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggestions_includes_all_types(client, db_engine):
|
|
"""Suggestions should include technique, topic, and creator types."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
resp = await client.get(SUGGESTIONS_URL)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
types_present = {item["type"] for item in data["suggestions"]}
|
|
assert "technique" in types_present, "Expected technique suggestions"
|
|
assert "topic" in types_present, "Expected topic suggestions"
|
|
assert "creator" in types_present, "Expected creator suggestions"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggestions_no_duplicates(client, db_engine):
|
|
"""Suggestions should not contain duplicate texts (case-insensitive)."""
|
|
await _seed_search_data(db_engine)
|
|
|
|
resp = await client.get(SUGGESTIONS_URL)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
texts_lower = [item["text"].lower() for item in data["suggestions"]]
|
|
assert len(texts_lower) == len(set(texts_lower)), "Duplicate suggestions found"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggestions_empty_db(client, db_engine):
|
|
"""Suggestions endpoint returns empty list on empty database."""
|
|
resp = await client.get(SUGGESTIONS_URL)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["suggestions"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggestions_respects_view_count_ordering(client, db_engine):
|
|
"""Higher view_count technique pages should appear first among techniques."""
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
creator = Creator(
|
|
name="Test Creator",
|
|
slug="test-creator",
|
|
genres=["Electronic"],
|
|
folder_name="TestCreator",
|
|
view_count=10,
|
|
)
|
|
session.add(creator)
|
|
await session.flush()
|
|
|
|
tp_low = TechniquePage(
|
|
creator_id=creator.id,
|
|
title="Low Views Page",
|
|
slug="low-views-page",
|
|
topic_category="Sound design",
|
|
topic_tags=["bass"],
|
|
view_count=5,
|
|
)
|
|
tp_high = TechniquePage(
|
|
creator_id=creator.id,
|
|
title="High Views Page",
|
|
slug="high-views-page",
|
|
topic_category="Synthesis",
|
|
topic_tags=["pads"],
|
|
view_count=100,
|
|
)
|
|
session.add_all([tp_low, tp_high])
|
|
await session.commit()
|
|
|
|
resp = await client.get(SUGGESTIONS_URL)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
technique_items = [
|
|
item for item in data["suggestions"] if item["type"] == "technique"
|
|
]
|
|
assert len(technique_items) >= 2
|
|
# High Views Page should come before Low Views Page
|
|
titles = [item["text"] for item in technique_items]
|
|
assert titles.index("High Views Page") < titles.index("Low Views Page")
|
|
|
|
|
|
# ── Match context tests ──────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_match_context_creator(db_engine):
|
|
"""Match context includes creator name when query matches creator."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
kw_result = await svc.keyword_search("Bill", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
# At least one result should have match_context mentioning the creator
|
|
contexts = [r["match_context"] for r in items]
|
|
assert any("Creator: Mr. Bill" in c for c in contexts), f"Expected creator context, got: {contexts}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_match_context_tag(db_engine):
|
|
"""Match context includes tag name when query matches a topic tag."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
kw_result = await svc.keyword_search("granular", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
contexts = [r["match_context"] for r in items]
|
|
assert any("Tag: granular" in c for c in contexts), f"Expected tag context, got: {contexts}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyword_search_match_context_multi_token(db_engine):
|
|
"""Multi-token match context shows multiple match reasons."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# "Bill bass" — "Bill" matches creator, "bass" matches tag/title
|
|
kw_result = await svc.keyword_search("Bill bass", "topics", 10, session)
|
|
items = kw_result["items"]
|
|
assert len(items) >= 1
|
|
# The match_context should contain both creator and another field
|
|
contexts = [r["match_context"] for r in items]
|
|
assert any("Creator: Mr. Bill" in c for c in contexts)
|
|
|
|
|
|
# ── LightRAG integration tests ──────────────────────────────────────────────
|
|
|
|
|
|
def _make_lightrag_response(seed: dict) -> dict:
|
|
"""Build a realistic LightRAG /query/data response body.
|
|
|
|
Uses seed data to construct file_source paths that match seeded technique
|
|
pages (slug + creator_id format).
|
|
"""
|
|
return {
|
|
"data": {
|
|
"chunks": [
|
|
{
|
|
"content": "Layering multiple snare samples for punch and body",
|
|
"file_path": f"technique:reese-bass-design:creator:{seed['creator1_id']}",
|
|
},
|
|
{
|
|
"content": "Granular techniques for pad textures",
|
|
"file_path": f"technique:granular-pad-textures:creator:{seed['creator2_id']}",
|
|
},
|
|
],
|
|
"entities": [
|
|
{"entity_name": "Reese Bass Design"},
|
|
{"entity_name": "Granular Pad Textures"},
|
|
],
|
|
"relationships": [
|
|
{"source": "Reese Bass Design", "target": "FM Bass Layering", "relationship": "related_to"},
|
|
],
|
|
}
|
|
}
|
|
|
|
|
|
def _mock_httpx_response(body: dict, status_code: int = 200) -> httpx.Response:
|
|
"""Build a mock httpx.Response with JSON body."""
|
|
resp = httpx.Response(
|
|
status_code=status_code,
|
|
json=body,
|
|
request=httpx.Request("POST", "http://mock/query/data"),
|
|
)
|
|
return resp
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_primary_path(db_engine):
|
|
"""LightRAG primary path returns mapped technique pages, fallback_used=False."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Mock the httpx client's post method
|
|
mock_resp = _mock_httpx_response(_make_lightrag_response(seed))
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=mock_resp)
|
|
|
|
# Also mock embed_query to avoid hitting real embedding API
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
assert result["fallback_used"] is False
|
|
assert result["total"] >= 1
|
|
slugs = [item["slug"] for item in result["items"]]
|
|
assert "reese-bass-design" in slugs
|
|
|
|
# Verify result structure matches SearchResponse schema fields
|
|
for item in result["items"]:
|
|
assert "type" in item
|
|
assert "title" in item
|
|
assert "slug" in item
|
|
assert "score" in item
|
|
assert "creator_name" in item
|
|
assert "match_context" in item
|
|
|
|
# LightRAG results should have "LightRAG graph match" context
|
|
lightrag_items = [i for i in result["items"] if i.get("match_context") == "LightRAG graph match"]
|
|
assert len(lightrag_items) >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_fallback_on_timeout(db_engine):
|
|
"""When LightRAG times out, search falls back to keyword engine."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Mock httpx.post to raise TimeoutException
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(side_effect=httpx.TimeoutException("read timed out"))
|
|
|
|
# Mock embed_query to avoid hitting real embedding API (Qdrant fallback path)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
# Should fall back — keyword engine has "Reese Bass Design" from seed data
|
|
assert result["fallback_used"] is True
|
|
assert result["total"] >= 1
|
|
# Results should come from keyword search (seeded data matches "reese bass")
|
|
slugs = [item["slug"] for item in result["items"]]
|
|
assert "reese-bass-design" in slugs
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_fallback_on_connection_error(db_engine):
|
|
"""When LightRAG connection fails, search falls back to keyword engine."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Mock httpx.post to raise ConnectError
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(
|
|
side_effect=httpx.ConnectError("connection refused")
|
|
)
|
|
|
|
# Mock embed_query to avoid hitting real embedding API
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
assert result["fallback_used"] is True
|
|
assert result["total"] >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_fallback_on_empty_response(db_engine):
|
|
"""When LightRAG returns empty data, search falls back to keyword engine."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Mock httpx.post to return empty data
|
|
mock_resp = _mock_httpx_response({"data": {}})
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=mock_resp)
|
|
|
|
# Mock embed_query to avoid hitting real embedding API
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
assert result["fallback_used"] is True
|
|
# Keyword fallback should still find results from seed data
|
|
assert result["total"] >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_skipped_for_short_query(db_engine):
|
|
"""Queries shorter than lightrag_min_query_length skip LightRAG entirely."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Mock httpx — should NOT be called for short queries
|
|
mock_httpx = AsyncMock()
|
|
mock_httpx.post = AsyncMock()
|
|
svc._httpx = mock_httpx
|
|
|
|
# Mock embed_query to avoid real calls
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
# "ab" is 2 chars, below the default min_query_length of 3
|
|
result = await svc.search("ab", "all", 10, session)
|
|
|
|
# LightRAG should not have been called
|
|
mock_httpx.post.assert_not_called()
|
|
# fallback_used should be True since LightRAG was skipped
|
|
assert result["fallback_used"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_result_ordering_preserved(db_engine):
|
|
"""LightRAG results maintain retrieval-rank ordering with decreasing scores."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
mock_resp = _mock_httpx_response(_make_lightrag_response(seed))
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=mock_resp)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
# LightRAG items should have scores in descending order
|
|
lightrag_items = [i for i in result["items"] if i.get("match_context") == "LightRAG graph match"]
|
|
if len(lightrag_items) >= 2:
|
|
scores = [item["score"] for item in lightrag_items]
|
|
assert scores == sorted(scores, reverse=True), f"Scores not descending: {scores}"
|
|
# First result should have highest score (1.0)
|
|
assert scores[0] == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_lightrag_fallback_on_http_error(db_engine):
|
|
"""When LightRAG returns a 500 status, search falls back gracefully."""
|
|
seed = await _seed_search_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Return a 500 response — raise_for_status() will raise
|
|
error_resp = httpx.Response(
|
|
status_code=500,
|
|
text="Internal Server Error",
|
|
request=httpx.Request("POST", "http://mock/query/data"),
|
|
)
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=error_resp)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass design", "all", 10, session)
|
|
|
|
assert result["fallback_used"] is True
|
|
assert result["total"] >= 1
|
|
|
|
|
|
# ── Creator-scoped cascade integration tests ─────────────────────────────────
|
|
|
|
|
|
async def _seed_cascade_data(db_engine) -> dict:
|
|
"""Seed creators and technique pages for cascade tier testing.
|
|
|
|
Creator 'keota' has 3 Sound Design pages (≥2 → domain='Sound Design').
|
|
Creator 'virtual-riot' has 1 Synthesis page (< 2 → no dominant domain).
|
|
"""
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
keota = Creator(
|
|
name="Keota",
|
|
slug="keota",
|
|
genres=["Bass music"],
|
|
folder_name="Keota",
|
|
)
|
|
vr = Creator(
|
|
name="Virtual Riot",
|
|
slug="virtual-riot",
|
|
genres=["Dubstep"],
|
|
folder_name="VirtualRiot",
|
|
)
|
|
session.add_all([keota, vr])
|
|
await session.flush()
|
|
|
|
tp1 = TechniquePage(
|
|
creator_id=keota.id,
|
|
title="Reese Bass Fundamentals",
|
|
slug="reese-bass-fundamentals",
|
|
topic_category="Sound Design",
|
|
topic_tags=["bass", "reese"],
|
|
summary="Fundamentals of reese bass",
|
|
)
|
|
tp2 = TechniquePage(
|
|
creator_id=keota.id,
|
|
title="FM Sound Design",
|
|
slug="fm-sound-design",
|
|
topic_category="Sound Design",
|
|
topic_tags=["fm", "design"],
|
|
summary="FM sound design techniques",
|
|
)
|
|
tp3 = TechniquePage(
|
|
creator_id=keota.id,
|
|
title="Granular Textures",
|
|
slug="granular-textures",
|
|
topic_category="Sound Design",
|
|
topic_tags=["granular"],
|
|
summary="Granular texture design",
|
|
)
|
|
tp4 = TechniquePage(
|
|
creator_id=vr.id,
|
|
title="Serum Wavetable Tricks",
|
|
slug="serum-wavetable-tricks",
|
|
topic_category="Synthesis",
|
|
topic_tags=["serum", "wavetable"],
|
|
summary="Advanced Serum wavetable tricks",
|
|
)
|
|
session.add_all([tp1, tp2, tp3, tp4])
|
|
await session.commit()
|
|
|
|
return {
|
|
"keota_id": str(keota.id),
|
|
"keota_name": keota.name,
|
|
"keota_slug": keota.slug,
|
|
"vr_id": str(vr.id),
|
|
"vr_name": vr.name,
|
|
"tp1_slug": tp1.slug,
|
|
"tp2_slug": tp2.slug,
|
|
"tp3_slug": tp3.slug,
|
|
"tp4_slug": tp4.slug,
|
|
}
|
|
|
|
|
|
def _cascade_lightrag_body(chunks: list[dict]) -> dict:
|
|
"""Build a LightRAG /query/data response with given chunks."""
|
|
return {
|
|
"data": {
|
|
"chunks": chunks,
|
|
"entities": [],
|
|
"relationships": [],
|
|
}
|
|
}
|
|
|
|
|
|
def _chunk(slug: str, creator_id: str, content: str = "chunk content") -> dict:
|
|
return {
|
|
"content": content,
|
|
"file_path": f"technique:{slug}:creator:{creator_id}",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_cascade_creator_tier(db_engine):
|
|
"""Tier 1: creator-scoped search returns results → cascade_tier='creator'."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# httpx returns chunks matching keota's technique pages
|
|
body = _cascade_lightrag_body([
|
|
_chunk(seed["tp1_slug"], seed["keota_id"], "Reese bass fundamentals"),
|
|
])
|
|
mock_resp = _mock_httpx_response(body)
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=mock_resp)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("reese bass", "all", 10, session, creator="keota")
|
|
|
|
assert result["cascade_tier"] == "creator"
|
|
assert result["fallback_used"] is False
|
|
assert result["total"] >= 1
|
|
# All cascade items belong to keota
|
|
cascade_items = [i for i in result["items"] if i.get("creator_slug") == "keota"]
|
|
assert len(cascade_items) >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_cascade_domain_tier(db_engine):
|
|
"""Tier 2: creator-scoped empty → domain-scoped returns results → cascade_tier='domain'."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Call 1 (creator-scoped): returns chunks for a DIFFERENT creator → post-filter removes them
|
|
creator_body = _cascade_lightrag_body([
|
|
_chunk(seed["tp4_slug"], seed["vr_id"], "VR content not Keota"),
|
|
])
|
|
# Call 2 (domain-scoped with "Sound Design"): returns chunks matching Keota
|
|
domain_body = _cascade_lightrag_body([
|
|
_chunk(seed["tp1_slug"], seed["keota_id"], "Reese bass from domain"),
|
|
])
|
|
|
|
call_count = 0
|
|
async def _side_effect(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return _mock_httpx_response(creator_body)
|
|
else:
|
|
return _mock_httpx_response(domain_body)
|
|
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(side_effect=_side_effect)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("synthesis techniques", "all", 10, session, creator="keota")
|
|
|
|
assert result["cascade_tier"] == "domain"
|
|
assert result["fallback_used"] is False
|
|
assert result["total"] >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_cascade_global_fallback(db_engine):
|
|
"""Tier 3: creator + domain empty → global LightRAG returns → cascade_tier='global'."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# Calls 1-2 (creator + domain): empty chunks
|
|
empty_body = _cascade_lightrag_body([])
|
|
# Call 3 (global _lightrag_search): returns results
|
|
global_body = _cascade_lightrag_body([
|
|
_chunk(seed["tp4_slug"], seed["vr_id"], "Global result"),
|
|
])
|
|
|
|
call_count = 0
|
|
async def _side_effect(*args, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count <= 2:
|
|
return _mock_httpx_response(empty_body)
|
|
else:
|
|
return _mock_httpx_response(global_body)
|
|
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(side_effect=_side_effect)
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("mixing tips", "all", 10, session, creator="keota")
|
|
|
|
assert result["cascade_tier"] == "global"
|
|
assert result["fallback_used"] is False
|
|
assert result["total"] >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_cascade_graceful_empty(db_engine):
|
|
"""Tier 4: all tiers empty → cascade_tier='none', fallback_used=True."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# All calls return empty chunks
|
|
empty_body = _cascade_lightrag_body([])
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(empty_body))
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("nonexistent topic xyz", "all", 10, session, creator="keota")
|
|
|
|
assert result["cascade_tier"] == "none"
|
|
assert result["fallback_used"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_cascade_unknown_creator(db_engine):
|
|
"""Unknown creator slug → cascade skipped, normal search, cascade_tier=''."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
# LightRAG returns normal results (non-cascade path)
|
|
body = _cascade_lightrag_body([
|
|
_chunk(seed["tp4_slug"], seed["vr_id"], "Normal search result"),
|
|
])
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(body))
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
result = await svc.search("bass design", "all", 10, session, creator="nonexistent-slug")
|
|
|
|
# Cascade skipped — falls through to normal search
|
|
assert result["cascade_tier"] == ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_no_creator_param_unchanged(db_engine):
|
|
"""No creator param → normal search path, cascade_tier='' (empty)."""
|
|
seed = await _seed_cascade_data(db_engine)
|
|
|
|
session_factory = async_sessionmaker(
|
|
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
async with session_factory() as session:
|
|
from config import Settings
|
|
svc = SearchService(settings=Settings())
|
|
|
|
body = _cascade_lightrag_body([
|
|
_chunk(seed["tp1_slug"], seed["keota_id"], "Normal result"),
|
|
])
|
|
svc._httpx = AsyncMock()
|
|
svc._httpx.post = AsyncMock(return_value=_mock_httpx_response(body))
|
|
svc.embed_query = AsyncMock(return_value=None)
|
|
|
|
# No creator param
|
|
result = await svc.search("reese bass", "all", 10, session)
|
|
|
|
assert result["cascade_tier"] == ""
|
|
assert result["total"] >= 1
|