"""Pipeline integration tests — embed → cluster → heat. Proves the full desire processing pipeline works end-to-end by: 1. Verifying similar texts produce embeddings with cosine similarity above the clustering threshold (0.82) 2. Verifying dissimilar texts stay below the clustering threshold 3. Validating heat calculation logic for clustered desires 4. Checking that the router and worker are wired correctly (static assertions) """ import uuid from pathlib import Path from unittest.mock import MagicMock import numpy as np import pytest from app.services.embedding import embed_text def cosine_sim(a: list[float], b: list[float]) -> float: """Cosine similarity between two L2-normalized vectors (= dot product).""" return float(np.dot(a, b)) # --------------------------------------------------------------------------- # Embedding pipeline: similar texts cluster, dissimilar texts don't # --------------------------------------------------------------------------- class TestSimilarDesiresClustering: """Verify that similar desire texts produce clusterable embeddings.""" SIMILAR_TEXTS = [ "ragdoll physics dark moody slow motion", "dark physics ragdoll slow motion moody", "slow motion ragdoll dark physics moody", ] def test_similar_desires_produce_clusterable_embeddings(self) -> None: """All pairwise cosine similarities among similar texts exceed 0.82.""" embeddings = [embed_text(t) for t in self.SIMILAR_TEXTS] for i in range(len(embeddings)): for j in range(i + 1, len(embeddings)): sim = cosine_sim(embeddings[i], embeddings[j]) assert sim > 0.82, ( f"Texts [{i}] and [{j}] should cluster (sim > 0.82), " f"got {sim:.4f}:\n" f" [{i}] '{self.SIMILAR_TEXTS[i]}'\n" f" [{j}] '{self.SIMILAR_TEXTS[j]}'" ) def test_dissimilar_desire_does_not_cluster(self) -> None: """A dissimilar text has cosine similarity < 0.82 with all similar texts.""" dissimilar = embed_text("bright colorful kaleidoscope flowers rainbow") similar_embeddings = [embed_text(t) for t in self.SIMILAR_TEXTS] for i, emb in enumerate(similar_embeddings): sim = cosine_sim(dissimilar, emb) assert sim < 0.82, ( f"Dissimilar text should NOT cluster with text [{i}] " f"(sim < 0.82), got {sim:.4f}:\n" f" dissimilar: 'bright colorful kaleidoscope flowers rainbow'\n" f" similar[{i}]: '{self.SIMILAR_TEXTS[i]}'" ) # --------------------------------------------------------------------------- # Heat calculation logic # --------------------------------------------------------------------------- class TestPipelineHeatCalculation: """Verify heat score calculation matches cluster size.""" def test_pipeline_heat_calculation_logic(self) -> None: """A cluster of 3 desires should produce heat_score = 3.0 for each member. This tests the recalculate_heat_sync logic by simulating its DB interaction pattern with mocks. """ from app.services.clustering import recalculate_heat_sync cluster_id = uuid.uuid4() session = MagicMock() # Mock COUNT(*) returning 3 members count_result = MagicMock() count_result.scalar_one.return_value = 3 # Mock UPDATE (no meaningful return) update_result = MagicMock() session.execute = MagicMock(side_effect=[count_result, update_result]) heat = recalculate_heat_sync(cluster_id, session) assert heat == 3.0 assert session.execute.call_count == 2 assert session.flush.call_count >= 1 def test_single_member_cluster_has_heat_1(self) -> None: """A new single-member cluster should have heat_score = 1.0.""" from app.services.clustering import recalculate_heat_sync cluster_id = uuid.uuid4() session = MagicMock() count_result = MagicMock() count_result.scalar_one.return_value = 1 update_result = MagicMock() session.execute = MagicMock(side_effect=[count_result, update_result]) heat = recalculate_heat_sync(cluster_id, session) assert heat == 1.0 # --------------------------------------------------------------------------- # Sync clustering orchestrator # --------------------------------------------------------------------------- class TestSyncClusteringOrchestrator: """Test cluster_desire_sync orchestration with mocked sub-functions.""" def test_new_desire_creates_cluster(self) -> None: """When no nearby cluster exists, creates a new one.""" from unittest.mock import patch from app.services.clustering import cluster_desire_sync desire_id = uuid.uuid4() embedding = embed_text("ragdoll physics dark moody slow") new_cluster_id = uuid.uuid4() session = MagicMock() with patch("app.services.clustering.find_nearest_cluster_sync") as mock_find, \ patch("app.services.clustering.create_cluster_sync") as mock_create: mock_find.return_value = (None, 0.0) mock_create.return_value = new_cluster_id result = cluster_desire_sync(desire_id, embedding, session) assert result["is_new"] is True assert result["cluster_id"] == new_cluster_id assert result["heat_score"] == 1.0 mock_find.assert_called_once_with(embedding, session) mock_create.assert_called_once_with(desire_id, session) def test_similar_desire_joins_existing_cluster(self) -> None: """When a nearby cluster is found, joins it and recalculates heat.""" from unittest.mock import patch from app.services.clustering import cluster_desire_sync desire_id = uuid.uuid4() embedding = embed_text("ragdoll physics dark moody slow") existing_cluster_id = uuid.uuid4() session = MagicMock() with patch("app.services.clustering.find_nearest_cluster_sync") as mock_find, \ patch("app.services.clustering.add_to_cluster_sync") as mock_add, \ patch("app.services.clustering.recalculate_heat_sync") as mock_recalc: mock_find.return_value = (existing_cluster_id, 0.91) mock_recalc.return_value = 3.0 result = cluster_desire_sync(desire_id, embedding, session) assert result["is_new"] is False assert result["cluster_id"] == existing_cluster_id assert result["heat_score"] == 3.0 mock_add.assert_called_once_with( existing_cluster_id, desire_id, 0.91, session ) # --------------------------------------------------------------------------- # Wiring checks: router + worker are connected # --------------------------------------------------------------------------- class TestWiring: """Static assertions that the router and worker are properly wired.""" def test_router_has_worker_enqueue(self) -> None: """desires.py contains process_desire.delay — fire-and-forget enqueue.""" desires_path = ( Path(__file__).resolve().parent.parent / "app" / "routers" / "desires.py" ) source = desires_path.read_text() assert "process_desire.delay" in source, ( "Router should call process_desire.delay() to enqueue worker task" ) def test_worker_task_is_implemented(self) -> None: """process_desire task body is not just 'pass' — has real implementation. Reads the worker source file directly to avoid importing celery (which may not be installed in the test environment). """ worker_path = ( Path(__file__).resolve().parent.parent / "app" / "worker" / "__init__.py" ) source = worker_path.read_text() # Should contain key implementation markers assert "embed_text" in source, ( "Worker should call embed_text to embed desire prompt" ) assert "cluster_desire_sync" in source, ( "Worker should call cluster_desire_sync to cluster the desire" ) assert "session.commit" in source, ( "Worker should commit the DB transaction" ) def test_worker_has_structured_logging(self) -> None: """process_desire task includes structured logging of key fields.""" worker_path = ( Path(__file__).resolve().parent.parent / "app" / "worker" / "__init__.py" ) source = worker_path.read_text() assert "desire_id" in source, "Should log desire_id" assert "cluster_id" in source, "Should log cluster_id" assert "heat_score" in source, "Should log heat_score" assert "elapsed_ms" in source, "Should log elapsed_ms" def test_worker_has_error_handling_with_retry(self) -> None: """process_desire catches exceptions and retries.""" worker_path = ( Path(__file__).resolve().parent.parent / "app" / "worker" / "__init__.py" ) source = worker_path.read_text() assert "self.retry" in source, ( "Worker should use self.retry for transient error handling" ) assert "session.rollback" in source, ( "Worker should rollback on error before retrying" )