MAESTRO: Implement ResponseCache layer with SHA-256 config hashing and hit-rate tracking

This commit is contained in:
John Lightner 2026-04-07 02:37:58 -05:00
parent bf1e9d1c84
commit f60128604f
3 changed files with 319 additions and 1 deletions

View file

@ -6,7 +6,7 @@ Implement the core experiment execution engine: LLM adapters, response caching,
- [x] Implement backend/engine/adapters/openai_compat.py as the primary adapter. It should work with any OpenAI-compatible API (OpenWebUI, vLLM, Ollama, OpenAI, Anthropic via proxy). Use httpx for async HTTP calls. Support chat completions format with system + user messages. Parse token usage from the response. Handle errors gracefully with retries (3 attempts, exponential backoff). Support both streaming and non-streaming modes. - [x] Implement backend/engine/adapters/openai_compat.py as the primary adapter. It should work with any OpenAI-compatible API (OpenWebUI, vLLM, Ollama, OpenAI, Anthropic via proxy). Use httpx for async HTTP calls. Support chat completions format with system + user messages. Parse token usage from the response. Handle errors gracefully with retries (3 attempts, exponential backoff). Support both streaming and non-streaming modes.
- [ ] Implement backend/engine/cache.py with the ResponseCache layer. Key function: compute_config_hash(prompt, model, params, input_data) → SHA-256 hex string. Methods: get(config_hash) → CachedResponse or None, put(config_hash, response, metadata). In SQLite mode, use the ResponseCache table directly. In Postgres mode, same table but with connection pooling. Include a cache_stats() method returning hit rate, total entries, and storage size. - [x] Implement backend/engine/cache.py with the ResponseCache layer. Key function: compute_config_hash(prompt, model, params, input_data) → SHA-256 hex string. Methods: get(config_hash) → CachedResponse or None, put(config_hash, response, metadata). In SQLite mode, use the ResponseCache table directly. In Postgres mode, same table but with connection pooling. Include a cache_stats() method returning hit rate, total entries, and storage size.
- [ ] Implement backend/engine/runner.py for individual run execution. The run_single function should: (1) iterate through pipeline stages, (2) render prompt templates with Jinja2 (allowing previous stage output as context), (3) check cache before calling LLM, (4) call the LLM adapter if cache miss, (5) store response in cache, (6) create StageResult records, (7) run all configured scorers, (8) create Score records, (9) update Run status and timing, (10) publish progress events via Redis pub/sub (or in-process event bus). - [ ] Implement backend/engine/runner.py for individual run execution. The run_single function should: (1) iterate through pipeline stages, (2) render prompt templates with Jinja2 (allowing previous stage output as context), (3) check cache before calling LLM, (4) call the LLM adapter if cache miss, (5) store response in cache, (6) create StageResult records, (7) run all configured scorers, (8) create Score records, (9) update Run status and timing, (10) publish progress events via Redis pub/sub (or in-process event bus).

146
backend/engine/cache.py Normal file
View file

@ -0,0 +1,146 @@
"""Response cache layer for PromptLooper.
Caches LLM responses by a SHA-256 hash of the full configuration
(prompt + model + params + input_data) to avoid redundant API calls.
"""
import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import func, text
from sqlalchemy.orm import Session
from models import ResponseCache
@dataclass
class CachedResponse:
"""A cached LLM response retrieved from the database."""
config_hash: str
response: str
model: str
tokens_in: int | None
tokens_out: int | None
latency_ms: int | None
created_at: datetime
@dataclass
class CacheStats:
"""Cache statistics."""
total_entries: int
hit_rate: float
storage_size_bytes: int
def compute_config_hash(
prompt: str,
model: str,
params: dict[str, Any],
input_data: Any = None,
) -> str:
"""Compute a deterministic SHA-256 hash for a given configuration.
The hash covers the full config so that any parameter change produces
a different key. Dict keys are sorted for determinism.
"""
payload = {
"prompt": prompt,
"model": model,
"params": params,
"input_data": input_data,
}
canonical = json.dumps(payload, sort_keys=True, ensure_ascii=True, default=str)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
class ResponseCacheLayer:
"""Database-backed response cache.
Works with both SQLite and PostgreSQL the caller provides a
SQLAlchemy session.
"""
def __init__(self) -> None:
self._hits: int = 0
self._misses: int = 0
def get(self, db: Session, config_hash: str) -> CachedResponse | None:
"""Look up a cached response by config hash.
Returns None on cache miss.
"""
row = db.get(ResponseCache, config_hash)
if row is None:
self._misses += 1
return None
self._hits += 1
return CachedResponse(
config_hash=row.config_hash,
response=row.response,
model=row.model,
tokens_in=row.tokens_in,
tokens_out=row.tokens_out,
latency_ms=row.latency_ms,
created_at=row.created_at,
)
def put(
self,
db: Session,
config_hash: str,
response: str,
model: str,
tokens_in: int | None = None,
tokens_out: int | None = None,
latency_ms: int | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Store a response in the cache.
If the config_hash already exists, the entry is updated (upsert).
"""
existing = db.get(ResponseCache, config_hash)
if existing is not None:
existing.response = response
existing.model = model
existing.tokens_in = tokens_in
existing.tokens_out = tokens_out
existing.latency_ms = latency_ms
else:
entry = ResponseCache(
config_hash=config_hash,
response=response,
model=model,
tokens_in=tokens_in,
tokens_out=tokens_out,
latency_ms=latency_ms,
)
db.add(entry)
db.commit()
def cache_stats(self, db: Session) -> CacheStats:
"""Return cache statistics: hit rate, total entries, storage size."""
total: int = db.query(func.count(ResponseCache.config_hash)).scalar() or 0
total_lookups = self._hits + self._misses
hit_rate = self._hits / total_lookups if total_lookups > 0 else 0.0
# Approximate storage: sum of response text lengths.
# For SQLite, length() returns character count; for Postgres, octet_length
# would be more accurate, but length() works everywhere.
size: int = (
db.query(func.sum(func.length(ResponseCache.response))).scalar() or 0
)
return CacheStats(
total_entries=total,
hit_rate=hit_rate,
storage_size_bytes=size,
)

172
backend/tests/test_cache.py Normal file
View file

@ -0,0 +1,172 @@
"""Tests for the response cache layer."""
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from engine.cache import (
CachedResponse,
CacheStats,
ResponseCacheLayer,
compute_config_hash,
)
from models import Base
def _engine():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
return engine
def _session(engine):
return Session(engine)
# ---------------------------------------------------------------------------
# compute_config_hash tests
# ---------------------------------------------------------------------------
class TestComputeConfigHash:
def test_deterministic(self):
h1 = compute_config_hash("hello", "gpt-4", {"temperature": 0.7})
h2 = compute_config_hash("hello", "gpt-4", {"temperature": 0.7})
assert h1 == h2
def test_different_prompt_different_hash(self):
h1 = compute_config_hash("hello", "gpt-4", {"temperature": 0.7})
h2 = compute_config_hash("world", "gpt-4", {"temperature": 0.7})
assert h1 != h2
def test_different_model_different_hash(self):
h1 = compute_config_hash("hello", "gpt-4", {"temperature": 0.7})
h2 = compute_config_hash("hello", "gpt-3.5", {"temperature": 0.7})
assert h1 != h2
def test_different_params_different_hash(self):
h1 = compute_config_hash("hello", "gpt-4", {"temperature": 0.7})
h2 = compute_config_hash("hello", "gpt-4", {"temperature": 0.9})
assert h1 != h2
def test_different_input_data_different_hash(self):
h1 = compute_config_hash("hello", "gpt-4", {}, input_data="data1")
h2 = compute_config_hash("hello", "gpt-4", {}, input_data="data2")
assert h1 != h2
def test_param_order_irrelevant(self):
h1 = compute_config_hash("p", "m", {"a": 1, "b": 2})
h2 = compute_config_hash("p", "m", {"b": 2, "a": 1})
assert h1 == h2
def test_returns_hex_string_64_chars(self):
h = compute_config_hash("test", "model", {})
assert len(h) == 64
assert all(c in "0123456789abcdef" for c in h)
def test_none_input_data_is_default(self):
h1 = compute_config_hash("p", "m", {})
h2 = compute_config_hash("p", "m", {}, input_data=None)
assert h1 == h2
# ---------------------------------------------------------------------------
# ResponseCacheLayer tests
# ---------------------------------------------------------------------------
class TestResponseCacheLayer:
def test_get_miss_returns_none(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
result = cache.get(db, "nonexistent_hash")
assert result is None
def test_put_and_get(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
config_hash = compute_config_hash("hello", "gpt-4", {"temp": 0.5})
cache.put(
db,
config_hash=config_hash,
response="Hello world!",
model="gpt-4",
tokens_in=10,
tokens_out=5,
latency_ms=150,
)
result = cache.get(db, config_hash)
assert result is not None
assert isinstance(result, CachedResponse)
assert result.response == "Hello world!"
assert result.model == "gpt-4"
assert result.tokens_in == 10
assert result.tokens_out == 5
assert result.latency_ms == 150
assert result.config_hash == config_hash
def test_put_upsert(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
h = "a" * 64
cache.put(db, h, response="first", model="m1")
cache.put(db, h, response="second", model="m2")
result = cache.get(db, h)
assert result is not None
assert result.response == "second"
assert result.model == "m2"
def test_hit_rate_tracking(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
h = compute_config_hash("p", "m", {})
cache.put(db, h, response="r", model="m")
# 1 hit, 1 miss
cache.get(db, h)
cache.get(db, "missing")
stats = cache.cache_stats(db)
assert stats.hit_rate == 0.5
assert stats.total_entries == 1
def test_cache_stats_empty(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
stats = cache.cache_stats(db)
assert isinstance(stats, CacheStats)
assert stats.total_entries == 0
assert stats.hit_rate == 0.0
assert stats.storage_size_bytes == 0
def test_cache_stats_storage_size(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
cache.put(db, "a" * 64, response="hello", model="m")
cache.put(db, "b" * 64, response="world!", model="m")
stats = cache.cache_stats(db)
assert stats.total_entries == 2
# "hello" = 5 chars, "world!" = 6 chars
assert stats.storage_size_bytes == 11
def test_multiple_entries(self):
engine = _engine()
with _session(engine) as db:
cache = ResponseCacheLayer()
for i in range(5):
h = compute_config_hash(f"prompt_{i}", "model", {})
cache.put(db, h, response=f"resp_{i}", model="model")
stats = cache.cache_stats(db)
assert stats.total_entries == 5