"""Embedding similarity scorer. Uses a configurable OpenAI-compatible embedding endpoint (e.g. Ollama nomic-embed-text, OpenAI text-embedding-3-small) to compute cosine similarity between the LLM output and a reference answer. """ import math from typing import Any import httpx from engine.scorers.base import BaseScorer class EmbeddingScorer(BaseScorer): """Score outputs by cosine similarity to a reference embedding. Args: base_url: Embedding API base URL (e.g. "http://localhost:11434/v1"). model: Embedding model name (e.g. "nomic-embed-text"). api_key: Optional API key for authenticated endpoints. timeout: HTTP request timeout in seconds. """ def __init__( self, base_url: str = "http://localhost:11434/v1", model: str = "nomic-embed-text", api_key: str | None = None, timeout: float = 30.0, ) -> None: self.base_url = base_url.rstrip("/") self.model = model self.api_key = api_key self.timeout = timeout @property def name(self) -> str: return "embedding" def score(self, input_data: Any, output: str, context: dict) -> float: """Synchronous scoring — raises because embeddings require HTTP I/O. Use ``score_async`` instead. """ import asyncio try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): raise RuntimeError( "EmbeddingScorer.score() cannot be called from an async context. " "Use score_async() instead." ) return asyncio.get_event_loop().run_until_complete( self.score_async(input_data, output, context) ) async def score_async( self, input_data: Any, output: str, context: dict ) -> float: """Compute cosine similarity between output and reference answer. The reference answer is read from ``context["reference"]``. If no reference is provided the score defaults to 0.0. """ reference = context.get("reference") if not reference: return 0.0 output_embedding, reference_embedding = await self._get_embeddings( [output, reference] ) similarity = _cosine_similarity(output_embedding, reference_embedding) # Cosine similarity is in [-1, 1]; normalize to [0, 1]. return max(0.0, min(1.0, (similarity + 1.0) / 2.0)) async def _get_embeddings( self, texts: list[str] ) -> tuple[list[float], ...]: """Fetch embeddings for a list of texts in a single API call.""" url = f"{self.base_url}/embeddings" headers: dict[str, str] = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" body = {"model": self.model, "input": texts} async with httpx.AsyncClient( timeout=httpx.Timeout(self.timeout), headers=headers ) as client: resp = await client.post(url, json=body) resp.raise_for_status() data = resp.json() embeddings_data = data.get("data", []) # Sort by index to guarantee order matches input order. embeddings_data.sort(key=lambda d: d.get("index", 0)) return tuple(item["embedding"] for item in embeddings_data) def _cosine_similarity(a: list[float], b: list[float]) -> float: """Compute cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b)) norm_a = math.sqrt(sum(x * x for x in a)) norm_b = math.sqrt(sum(x * x for x in b)) if norm_a == 0.0 or norm_b == 0.0: return 0.0 return dot / (norm_a * norm_b)