Completed slices: - S01: Desire Embedding & Clustering - S02: Fulfillment Flow & Frontend Branch: milestone/M001
250 lines
9.4 KiB
Python
250 lines
9.4 KiB
Python
"""Pipeline integration tests — embed → cluster → heat.
|
|
|
|
Proves the full desire processing pipeline works end-to-end by:
|
|
1. Verifying similar texts produce embeddings with cosine similarity above
|
|
the clustering threshold (0.82)
|
|
2. Verifying dissimilar texts stay below the clustering threshold
|
|
3. Validating heat calculation logic for clustered desires
|
|
4. Checking that the router and worker are wired correctly (static assertions)
|
|
"""
|
|
|
|
import uuid
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from app.services.embedding import embed_text
|
|
|
|
|
|
def cosine_sim(a: list[float], b: list[float]) -> float:
|
|
"""Cosine similarity between two L2-normalized vectors (= dot product)."""
|
|
return float(np.dot(a, b))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Embedding pipeline: similar texts cluster, dissimilar texts don't
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSimilarDesiresClustering:
|
|
"""Verify that similar desire texts produce clusterable embeddings."""
|
|
|
|
SIMILAR_TEXTS = [
|
|
"ragdoll physics dark moody slow motion",
|
|
"dark physics ragdoll slow motion moody",
|
|
"slow motion ragdoll dark physics moody",
|
|
]
|
|
|
|
def test_similar_desires_produce_clusterable_embeddings(self) -> None:
|
|
"""All pairwise cosine similarities among similar texts exceed 0.82."""
|
|
embeddings = [embed_text(t) for t in self.SIMILAR_TEXTS]
|
|
|
|
for i in range(len(embeddings)):
|
|
for j in range(i + 1, len(embeddings)):
|
|
sim = cosine_sim(embeddings[i], embeddings[j])
|
|
assert sim > 0.82, (
|
|
f"Texts [{i}] and [{j}] should cluster (sim > 0.82), "
|
|
f"got {sim:.4f}:\n"
|
|
f" [{i}] '{self.SIMILAR_TEXTS[i]}'\n"
|
|
f" [{j}] '{self.SIMILAR_TEXTS[j]}'"
|
|
)
|
|
|
|
def test_dissimilar_desire_does_not_cluster(self) -> None:
|
|
"""A dissimilar text has cosine similarity < 0.82 with all similar texts."""
|
|
dissimilar = embed_text("bright colorful kaleidoscope flowers rainbow")
|
|
similar_embeddings = [embed_text(t) for t in self.SIMILAR_TEXTS]
|
|
|
|
for i, emb in enumerate(similar_embeddings):
|
|
sim = cosine_sim(dissimilar, emb)
|
|
assert sim < 0.82, (
|
|
f"Dissimilar text should NOT cluster with text [{i}] "
|
|
f"(sim < 0.82), got {sim:.4f}:\n"
|
|
f" dissimilar: 'bright colorful kaleidoscope flowers rainbow'\n"
|
|
f" similar[{i}]: '{self.SIMILAR_TEXTS[i]}'"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Heat calculation logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPipelineHeatCalculation:
|
|
"""Verify heat score calculation matches cluster size."""
|
|
|
|
def test_pipeline_heat_calculation_logic(self) -> None:
|
|
"""A cluster of 3 desires should produce heat_score = 3.0 for each member.
|
|
|
|
This tests the recalculate_heat_sync logic by simulating its
|
|
DB interaction pattern with mocks.
|
|
"""
|
|
from app.services.clustering import recalculate_heat_sync
|
|
|
|
cluster_id = uuid.uuid4()
|
|
session = MagicMock()
|
|
|
|
# Mock COUNT(*) returning 3 members
|
|
count_result = MagicMock()
|
|
count_result.scalar_one.return_value = 3
|
|
|
|
# Mock UPDATE (no meaningful return)
|
|
update_result = MagicMock()
|
|
|
|
session.execute = MagicMock(side_effect=[count_result, update_result])
|
|
|
|
heat = recalculate_heat_sync(cluster_id, session)
|
|
|
|
assert heat == 3.0
|
|
assert session.execute.call_count == 2
|
|
assert session.flush.call_count >= 1
|
|
|
|
def test_single_member_cluster_has_heat_1(self) -> None:
|
|
"""A new single-member cluster should have heat_score = 1.0."""
|
|
from app.services.clustering import recalculate_heat_sync
|
|
|
|
cluster_id = uuid.uuid4()
|
|
session = MagicMock()
|
|
|
|
count_result = MagicMock()
|
|
count_result.scalar_one.return_value = 1
|
|
update_result = MagicMock()
|
|
|
|
session.execute = MagicMock(side_effect=[count_result, update_result])
|
|
|
|
heat = recalculate_heat_sync(cluster_id, session)
|
|
|
|
assert heat == 1.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sync clustering orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSyncClusteringOrchestrator:
|
|
"""Test cluster_desire_sync orchestration with mocked sub-functions."""
|
|
|
|
def test_new_desire_creates_cluster(self) -> None:
|
|
"""When no nearby cluster exists, creates a new one."""
|
|
from unittest.mock import patch
|
|
from app.services.clustering import cluster_desire_sync
|
|
|
|
desire_id = uuid.uuid4()
|
|
embedding = embed_text("ragdoll physics dark moody slow")
|
|
new_cluster_id = uuid.uuid4()
|
|
session = MagicMock()
|
|
|
|
with patch("app.services.clustering.find_nearest_cluster_sync") as mock_find, \
|
|
patch("app.services.clustering.create_cluster_sync") as mock_create:
|
|
mock_find.return_value = (None, 0.0)
|
|
mock_create.return_value = new_cluster_id
|
|
|
|
result = cluster_desire_sync(desire_id, embedding, session)
|
|
|
|
assert result["is_new"] is True
|
|
assert result["cluster_id"] == new_cluster_id
|
|
assert result["heat_score"] == 1.0
|
|
mock_find.assert_called_once_with(embedding, session)
|
|
mock_create.assert_called_once_with(desire_id, session)
|
|
|
|
def test_similar_desire_joins_existing_cluster(self) -> None:
|
|
"""When a nearby cluster is found, joins it and recalculates heat."""
|
|
from unittest.mock import patch
|
|
from app.services.clustering import cluster_desire_sync
|
|
|
|
desire_id = uuid.uuid4()
|
|
embedding = embed_text("ragdoll physics dark moody slow")
|
|
existing_cluster_id = uuid.uuid4()
|
|
session = MagicMock()
|
|
|
|
with patch("app.services.clustering.find_nearest_cluster_sync") as mock_find, \
|
|
patch("app.services.clustering.add_to_cluster_sync") as mock_add, \
|
|
patch("app.services.clustering.recalculate_heat_sync") as mock_recalc:
|
|
mock_find.return_value = (existing_cluster_id, 0.91)
|
|
mock_recalc.return_value = 3.0
|
|
|
|
result = cluster_desire_sync(desire_id, embedding, session)
|
|
|
|
assert result["is_new"] is False
|
|
assert result["cluster_id"] == existing_cluster_id
|
|
assert result["heat_score"] == 3.0
|
|
mock_add.assert_called_once_with(
|
|
existing_cluster_id, desire_id, 0.91, session
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Wiring checks: router + worker are connected
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestWiring:
|
|
"""Static assertions that the router and worker are properly wired."""
|
|
|
|
def test_router_has_worker_enqueue(self) -> None:
|
|
"""desires.py contains process_desire.delay — fire-and-forget enqueue."""
|
|
desires_path = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "app"
|
|
/ "routers"
|
|
/ "desires.py"
|
|
)
|
|
source = desires_path.read_text()
|
|
assert "process_desire.delay" in source, (
|
|
"Router should call process_desire.delay() to enqueue worker task"
|
|
)
|
|
|
|
def test_worker_task_is_implemented(self) -> None:
|
|
"""process_desire task body is not just 'pass' — has real implementation.
|
|
|
|
Reads the worker source file directly to avoid importing celery
|
|
(which may not be installed in the test environment).
|
|
"""
|
|
worker_path = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "app"
|
|
/ "worker"
|
|
/ "__init__.py"
|
|
)
|
|
source = worker_path.read_text()
|
|
|
|
# Should contain key implementation markers
|
|
assert "embed_text" in source, (
|
|
"Worker should call embed_text to embed desire prompt"
|
|
)
|
|
assert "cluster_desire_sync" in source, (
|
|
"Worker should call cluster_desire_sync to cluster the desire"
|
|
)
|
|
assert "session.commit" in source, (
|
|
"Worker should commit the DB transaction"
|
|
)
|
|
|
|
def test_worker_has_structured_logging(self) -> None:
|
|
"""process_desire task includes structured logging of key fields."""
|
|
worker_path = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "app"
|
|
/ "worker"
|
|
/ "__init__.py"
|
|
)
|
|
source = worker_path.read_text()
|
|
|
|
assert "desire_id" in source, "Should log desire_id"
|
|
assert "cluster_id" in source, "Should log cluster_id"
|
|
assert "heat_score" in source, "Should log heat_score"
|
|
assert "elapsed_ms" in source, "Should log elapsed_ms"
|
|
|
|
def test_worker_has_error_handling_with_retry(self) -> None:
|
|
"""process_desire catches exceptions and retries."""
|
|
worker_path = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "app"
|
|
/ "worker"
|
|
/ "__init__.py"
|
|
)
|
|
source = worker_path.read_text()
|
|
|
|
assert "self.retry" in source, (
|
|
"Worker should use self.retry for transient error handling"
|
|
)
|
|
assert "session.rollback" in source, (
|
|
"Worker should rollback on error before retrying"
|
|
)
|