- Add LLMEndpoint model to models.py with encrypted api_key field - Create encryption.py with Fernet symmetric encryption (key derived from JWT_SECRET via PBKDF2) - Implement full endpoints router: list, get, create, update, delete + test_connection - Test endpoint calls adapter.test_connection() and list_models() - API keys never exposed in responses; has_api_key boolean flag added - 25 tests in test_endpoints.py, all 444 tests passing
417 lines
14 KiB
Python
417 lines
14 KiB
Python
"""Tests for backend/routers/endpoints.py — LLM endpoint CRUD + test_connection."""
|
|
|
|
import os
|
|
import uuid
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
JWT_SECRET = "test-secret-key-for-jwt-signing"
|
|
API_KEY = "test-api-key-12345"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_settings(tmp_path):
|
|
"""Ensure tests use a temp SQLite DB and no Redis."""
|
|
env = {
|
|
"DATABASE_URL": f"sqlite:///{tmp_path / 'test.db'}",
|
|
"REDIS_URL": "",
|
|
"DATA_DIR": str(tmp_path),
|
|
"JWT_SECRET": JWT_SECRET,
|
|
"API_KEY": API_KEY,
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
import config
|
|
new_settings = config.Settings(_env_file=None)
|
|
config.settings = new_settings
|
|
|
|
import main
|
|
main.settings = new_settings
|
|
main._init_db()
|
|
main._init_redis()
|
|
|
|
from models import Base
|
|
Base.metadata.create_all(bind=main.engine)
|
|
|
|
import auth
|
|
auth.settings = new_settings
|
|
|
|
# Patch encryption module's settings reference
|
|
import encryption
|
|
encryption.settings = new_settings
|
|
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
def db_session():
|
|
from main import get_db
|
|
gen = get_db()
|
|
session = next(gen)
|
|
yield session
|
|
try:
|
|
next(gen)
|
|
except StopIteration:
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_user(db_session):
|
|
from auth import hash_password
|
|
from models import User
|
|
user = User(username="admin", password_hash=hash_password("adminpass"), is_admin=True)
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
db_session.refresh(user)
|
|
return user
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_headers():
|
|
return {"X-Api-Key": API_KEY}
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
from main import app
|
|
return TestClient(app)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Encryption module tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEncryption:
|
|
def test_encrypt_decrypt_roundtrip(self):
|
|
from encryption import encrypt_api_key, decrypt_api_key
|
|
plain = "sk-test-key-12345"
|
|
encrypted = encrypt_api_key(plain)
|
|
assert encrypted != plain
|
|
assert decrypt_api_key(encrypted) == plain
|
|
|
|
def test_different_keys_produce_different_ciphertexts(self):
|
|
from encryption import encrypt_api_key
|
|
ct1 = encrypt_api_key("my-key")
|
|
ct2 = encrypt_api_key("my-key")
|
|
# Fernet uses a random IV each time, so ciphertexts differ
|
|
assert ct1 != ct2
|
|
|
|
def test_decrypt_bad_data_raises_value_error(self):
|
|
from encryption import decrypt_api_key
|
|
with pytest.raises(ValueError, match="Failed to decrypt"):
|
|
decrypt_api_key("not-valid-fernet-token")
|
|
|
|
def test_decrypt_with_wrong_secret_fails(self):
|
|
from encryption import encrypt_api_key, decrypt_api_key
|
|
encrypted = encrypt_api_key("my-key")
|
|
|
|
# Change the settings secret
|
|
import config
|
|
old_secret = config.settings.jwt_secret
|
|
config.settings.jwt_secret = "completely-different-secret"
|
|
|
|
import encryption
|
|
encryption.settings = config.settings
|
|
|
|
with pytest.raises(ValueError, match="Failed to decrypt"):
|
|
decrypt_api_key(encrypted)
|
|
|
|
# Restore
|
|
config.settings.jwt_secret = old_secret
|
|
encryption.settings = config.settings
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoint CRUD tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateEndpoint:
|
|
def test_create_minimal(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/endpoints/", json={
|
|
"name": "Local Ollama",
|
|
"url": "http://localhost:11434/v1",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["name"] == "Local Ollama"
|
|
assert data["url"] == "http://localhost:11434/v1"
|
|
assert data["default_model"] is None
|
|
assert data["has_api_key"] is False
|
|
assert "id" in data
|
|
|
|
def test_create_with_api_key(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/endpoints/", json={
|
|
"name": "OpenAI",
|
|
"url": "https://api.openai.com/v1",
|
|
"api_key": "sk-test-12345",
|
|
"default_model": "gpt-4",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["name"] == "OpenAI"
|
|
assert data["has_api_key"] is True
|
|
assert data["default_model"] == "gpt-4"
|
|
# API key must NOT appear in response
|
|
assert "api_key" not in data
|
|
assert "api_key_encrypted" not in data
|
|
|
|
def test_create_requires_auth(self, client):
|
|
resp = client.post("/api/endpoints/", json={
|
|
"name": "Test",
|
|
"url": "http://localhost:8080/v1",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_create_validates_name(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/endpoints/", json={
|
|
"name": "",
|
|
"url": "http://localhost/v1",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 422
|
|
|
|
def test_api_key_is_encrypted_in_db(self, client, admin_user, auth_headers, db_session):
|
|
resp = client.post("/api/endpoints/", json={
|
|
"name": "Test",
|
|
"url": "http://localhost/v1",
|
|
"api_key": "sk-secret-key",
|
|
}, headers=auth_headers)
|
|
endpoint_id = resp.json()["id"]
|
|
|
|
from models import LLMEndpoint
|
|
endpoint = db_session.query(LLMEndpoint).filter(
|
|
LLMEndpoint.id == uuid.UUID(endpoint_id)
|
|
).first()
|
|
assert endpoint is not None
|
|
assert endpoint.api_key_encrypted is not None
|
|
assert endpoint.api_key_encrypted != "sk-secret-key"
|
|
|
|
from encryption import decrypt_api_key
|
|
assert decrypt_api_key(endpoint.api_key_encrypted) == "sk-secret-key"
|
|
|
|
|
|
class TestListEndpoints:
|
|
def test_list_empty(self, client, admin_user, auth_headers):
|
|
resp = client.get("/api/endpoints/", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["items"] == []
|
|
assert data["total"] == 0
|
|
|
|
def test_list_multiple(self, client, admin_user, auth_headers):
|
|
for name in ["Alpha", "Beta", "Gamma"]:
|
|
client.post("/api/endpoints/", json={
|
|
"name": name, "url": f"http://{name.lower()}/v1",
|
|
}, headers=auth_headers)
|
|
|
|
resp = client.get("/api/endpoints/", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 3
|
|
# Should be ordered by name
|
|
names = [ep["name"] for ep in data["items"]]
|
|
assert names == ["Alpha", "Beta", "Gamma"]
|
|
|
|
def test_list_requires_auth(self, client):
|
|
resp = client.get("/api/endpoints/")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
class TestGetEndpoint:
|
|
def test_get_existing(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Test EP",
|
|
"url": "http://test/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
resp = client.get(f"/api/endpoints/{ep_id}", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "Test EP"
|
|
|
|
def test_get_not_found(self, client, admin_user, auth_headers):
|
|
fake_id = str(uuid.uuid4())
|
|
resp = client.get(f"/api/endpoints/{fake_id}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestUpdateEndpoint:
|
|
def test_update_name(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Old Name",
|
|
"url": "http://test/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
resp = client.put(f"/api/endpoints/{ep_id}", json={
|
|
"name": "New Name",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "New Name"
|
|
assert resp.json()["url"] == "http://test/v1" # unchanged
|
|
|
|
def test_update_api_key(self, client, admin_user, auth_headers, db_session):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Test",
|
|
"url": "http://test/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
assert create_resp.json()["has_api_key"] is False
|
|
|
|
# Set API key
|
|
resp = client.put(f"/api/endpoints/{ep_id}", json={
|
|
"api_key": "sk-new-key",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["has_api_key"] is True
|
|
|
|
def test_clear_api_key(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Test",
|
|
"url": "http://test/v1",
|
|
"api_key": "sk-key",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
# Clear by sending empty string
|
|
resp = client.put(f"/api/endpoints/{ep_id}", json={
|
|
"api_key": "",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["has_api_key"] is False
|
|
|
|
def test_update_not_found(self, client, admin_user, auth_headers):
|
|
fake_id = str(uuid.uuid4())
|
|
resp = client.put(f"/api/endpoints/{fake_id}", json={
|
|
"name": "X",
|
|
}, headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestDeleteEndpoint:
|
|
def test_delete_existing(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "ToDelete",
|
|
"url": "http://test/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
resp = client.delete(f"/api/endpoints/{ep_id}", headers=auth_headers)
|
|
assert resp.status_code == 204
|
|
|
|
# Verify gone
|
|
resp = client.get(f"/api/endpoints/{ep_id}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_not_found(self, client, admin_user, auth_headers):
|
|
fake_id = str(uuid.uuid4())
|
|
resp = client.delete(f"/api/endpoints/{fake_id}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test connection endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTestEndpoint:
|
|
def test_test_connection_success(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Mock EP",
|
|
"url": "http://mock-llm/v1",
|
|
"api_key": "sk-mock",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
mock_adapter = AsyncMock()
|
|
mock_adapter.test_connection.return_value = True
|
|
mock_adapter.list_models.return_value = ["model-a", "model-b"]
|
|
|
|
with patch(
|
|
"routers.endpoints.OpenAICompatAdapter",
|
|
return_value=mock_adapter,
|
|
) as mock_cls:
|
|
resp = client.post(f"/api/endpoints/{ep_id}/test", headers=auth_headers)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["connected"] is True
|
|
assert data["models"] == ["model-a", "model-b"]
|
|
assert data["name"] == "Mock EP"
|
|
|
|
# Verify adapter was constructed with decrypted key
|
|
call_kwargs = mock_cls.call_args
|
|
assert call_kwargs.kwargs["base_url"] == "http://mock-llm/v1"
|
|
assert call_kwargs.kwargs["api_key"] == "sk-mock"
|
|
|
|
def test_test_connection_failure(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Bad EP",
|
|
"url": "http://bad-host/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
mock_adapter = AsyncMock()
|
|
mock_adapter.test_connection.return_value = False
|
|
|
|
with patch(
|
|
"routers.endpoints.OpenAICompatAdapter",
|
|
return_value=mock_adapter,
|
|
):
|
|
resp = client.post(f"/api/endpoints/{ep_id}/test", headers=auth_headers)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["connected"] is False
|
|
assert data["models"] == []
|
|
|
|
def test_test_connection_no_api_key(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "No Key EP",
|
|
"url": "http://local/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
mock_adapter = AsyncMock()
|
|
mock_adapter.test_connection.return_value = True
|
|
mock_adapter.list_models.return_value = ["llama3"]
|
|
|
|
with patch(
|
|
"routers.endpoints.OpenAICompatAdapter",
|
|
return_value=mock_adapter,
|
|
) as mock_cls:
|
|
resp = client.post(f"/api/endpoints/{ep_id}/test", headers=auth_headers)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["connected"] is True
|
|
# Should have been called with api_key=None
|
|
assert mock_cls.call_args.kwargs["api_key"] is None
|
|
|
|
def test_test_connection_not_found(self, client, admin_user, auth_headers):
|
|
fake_id = str(uuid.uuid4())
|
|
resp = client.post(f"/api/endpoints/{fake_id}/test", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
def test_test_connection_list_models_fails_gracefully(self, client, admin_user, auth_headers):
|
|
create_resp = client.post("/api/endpoints/", json={
|
|
"name": "Partial EP",
|
|
"url": "http://partial/v1",
|
|
}, headers=auth_headers)
|
|
ep_id = create_resp.json()["id"]
|
|
|
|
mock_adapter = AsyncMock()
|
|
mock_adapter.test_connection.return_value = True
|
|
mock_adapter.list_models.side_effect = RuntimeError("models endpoint broken")
|
|
|
|
with patch(
|
|
"routers.endpoints.OpenAICompatAdapter",
|
|
return_value=mock_adapter,
|
|
):
|
|
resp = client.post(f"/api/endpoints/{ep_id}/test", headers=auth_headers)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["connected"] is True
|
|
assert data["models"] == []
|