Full webhook system: CRUD endpoints (list/filter/get/create/update/delete), WebhookDelivery model for delivery audit trail, dispatch engine with 3-attempt retry and exponential backoff, Celery task integration with sync fallback, and webhook firing hooks in runner.py and sweep.py event paths.
555 lines
21 KiB
Python
555 lines
21 KiB
Python
"""Tests for backend/routers/webhooks.py — Webhook CRUD + dispatch logic."""
|
|
|
|
import os
|
|
import uuid
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
JWT_SECRET = "test-secret-key-for-jwt-signing"
|
|
API_KEY = "test-api-key-12345"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_settings(tmp_path):
|
|
"""Ensure tests use a temp SQLite DB and no Redis."""
|
|
env = {
|
|
"DATABASE_URL": f"sqlite:///{tmp_path / 'test.db'}",
|
|
"REDIS_URL": "",
|
|
"DATA_DIR": str(tmp_path),
|
|
"JWT_SECRET": JWT_SECRET,
|
|
"API_KEY": API_KEY,
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
import config
|
|
new_settings = config.Settings(_env_file=None)
|
|
config.settings = new_settings
|
|
|
|
import main
|
|
main.settings = new_settings
|
|
main._init_db()
|
|
main._init_redis()
|
|
|
|
from models import Base
|
|
Base.metadata.create_all(bind=main.engine)
|
|
|
|
import auth
|
|
auth.settings = new_settings
|
|
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
def db_session():
|
|
from main import get_db
|
|
gen = get_db()
|
|
session = next(gen)
|
|
yield session
|
|
try:
|
|
next(gen)
|
|
except StopIteration:
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_user(db_session):
|
|
from auth import hash_password
|
|
from models import User
|
|
user = User(username="admin", password_hash=hash_password("adminpass"), is_admin=True)
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
db_session.refresh(user)
|
|
return user
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_headers():
|
|
return {"X-Api-Key": API_KEY}
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
from main import app
|
|
return TestClient(app)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CRUD Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListWebhooks:
|
|
def test_list_empty(self, client, admin_user, auth_headers):
|
|
resp = client.get("/api/webhooks/", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["items"] == []
|
|
assert data["total"] == 0
|
|
|
|
def test_list_returns_created(self, client, admin_user, auth_headers):
|
|
client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed",
|
|
"url": "https://example.com/hook",
|
|
})
|
|
resp = client.get("/api/webhooks/", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["total"] == 1
|
|
|
|
def test_list_filter_by_event_type(self, client, admin_user, auth_headers):
|
|
client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/a",
|
|
})
|
|
client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "sweep.completed", "url": "https://example.com/b",
|
|
})
|
|
resp = client.get("/api/webhooks/?event_type=run.completed", headers=auth_headers)
|
|
assert resp.json()["total"] == 1
|
|
assert resp.json()["items"][0]["event_type"] == "run.completed"
|
|
|
|
def test_list_requires_auth(self, client):
|
|
resp = client.get("/api/webhooks/")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
class TestCreateWebhook:
|
|
def test_create_basic(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed",
|
|
"url": "https://example.com/hook",
|
|
})
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["event_type"] == "run.completed"
|
|
assert data["url"] == "https://example.com/hook"
|
|
assert data["is_active"] is True
|
|
assert data["headers"] is None
|
|
assert "id" in data
|
|
|
|
def test_create_with_headers(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "sweep.completed",
|
|
"url": "https://example.com/hook",
|
|
"headers": {"X-Custom": "value"},
|
|
})
|
|
assert resp.status_code == 201
|
|
assert resp.json()["headers"] == {"X-Custom": "value"}
|
|
|
|
def test_create_inactive(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.failed",
|
|
"url": "https://example.com/hook",
|
|
"is_active": False,
|
|
})
|
|
assert resp.status_code == 201
|
|
assert resp.json()["is_active"] is False
|
|
|
|
def test_create_requires_auth(self, client):
|
|
resp = client.post("/api/webhooks/", json={
|
|
"event_type": "run.completed",
|
|
"url": "https://example.com/hook",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_create_missing_fields(self, client, admin_user, auth_headers):
|
|
resp = client.post("/api/webhooks/", headers=auth_headers, json={})
|
|
assert resp.status_code == 422
|
|
|
|
|
|
class TestGetWebhook:
|
|
def test_get_existing(self, client, admin_user, auth_headers):
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/hook",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
resp = client.get(f"/api/webhooks/{wh_id}", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["id"] == wh_id
|
|
|
|
def test_get_not_found(self, client, admin_user, auth_headers):
|
|
resp = client.get(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestUpdateWebhook:
|
|
def test_update_event_type(self, client, admin_user, auth_headers):
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/hook",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={
|
|
"event_type": "run.failed",
|
|
})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["event_type"] == "run.failed"
|
|
assert resp.json()["url"] == "https://example.com/hook" # unchanged
|
|
|
|
def test_update_url(self, client, admin_user, auth_headers):
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/old",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={
|
|
"url": "https://example.com/new",
|
|
})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["url"] == "https://example.com/new"
|
|
|
|
def test_update_deactivate(self, client, admin_user, auth_headers):
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/hook",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={
|
|
"is_active": False,
|
|
})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["is_active"] is False
|
|
|
|
def test_update_not_found(self, client, admin_user, auth_headers):
|
|
resp = client.put(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers, json={
|
|
"url": "https://example.com/new",
|
|
})
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestDeleteWebhook:
|
|
def test_delete_existing(self, client, admin_user, auth_headers):
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/hook",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
resp = client.delete(f"/api/webhooks/{wh_id}", headers=auth_headers)
|
|
assert resp.status_code == 204
|
|
|
|
# Verify deleted
|
|
resp = client.get(f"/api/webhooks/{wh_id}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_not_found(self, client, admin_user, auth_headers):
|
|
resp = client.delete(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_cascades_deliveries(self, client, admin_user, auth_headers, db_session):
|
|
"""Deleting a webhook should cascade-delete its delivery records."""
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
create = client.post("/api/webhooks/", headers=auth_headers, json={
|
|
"event_type": "run.completed", "url": "https://example.com/hook",
|
|
})
|
|
wh_id = create.json()["id"]
|
|
|
|
# Manually add a delivery record
|
|
delivery = WebhookDelivery(
|
|
webhook_id=uuid.UUID(wh_id),
|
|
event_type="run.completed",
|
|
payload={"test": True},
|
|
success=True,
|
|
attempts=1,
|
|
)
|
|
db_session.add(delivery)
|
|
db_session.commit()
|
|
|
|
# Delete webhook — should cascade
|
|
resp = client.delete(f"/api/webhooks/{wh_id}", headers=auth_headers)
|
|
assert resp.status_code == 204
|
|
|
|
# Verify delivery is gone too
|
|
assert db_session.query(WebhookDelivery).filter(
|
|
WebhookDelivery.webhook_id == uuid.UUID(wh_id)
|
|
).count() == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Webhook Dispatch Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWebhookDispatch:
|
|
def test_get_active_webhooks(self, db_session, admin_user):
|
|
from engine.webhooks import get_active_webhooks
|
|
from models import WebhookConfig
|
|
|
|
wh1 = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=True)
|
|
wh2 = WebhookConfig(event_type="run.completed", url="https://b.com/hook", is_active=False)
|
|
wh3 = WebhookConfig(event_type="sweep.completed", url="https://c.com/hook", is_active=True)
|
|
db_session.add_all([wh1, wh2, wh3])
|
|
db_session.commit()
|
|
|
|
active = get_active_webhooks(db_session, "run.completed")
|
|
assert len(active) == 1
|
|
assert active[0].url == "https://a.com/hook"
|
|
|
|
def test_deliver_webhook_success(self, db_session, admin_user):
|
|
from engine.webhooks import deliver_webhook
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
import httpx
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.text = "OK"
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls:
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.return_value = mock_response
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
result = deliver_webhook(db_session, wh, "run.completed", {"test": True})
|
|
|
|
assert result is True
|
|
deliveries = db_session.query(WebhookDelivery).all()
|
|
assert len(deliveries) == 1
|
|
assert deliveries[0].success is True
|
|
assert deliveries[0].status_code == 200
|
|
assert deliveries[0].attempts == 1
|
|
|
|
def test_deliver_webhook_retries_on_failure(self, db_session, admin_user):
|
|
from engine.webhooks import deliver_webhook
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 500
|
|
mock_response.text = "Internal Server Error"
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls, \
|
|
patch("engine.webhooks.time.sleep"): # skip backoff delays
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.return_value = mock_response
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
result = deliver_webhook(db_session, wh, "run.completed", {"test": True})
|
|
|
|
assert result is False
|
|
# Should have retried 3 times
|
|
assert mock_client.post.call_count == 3
|
|
deliveries = db_session.query(WebhookDelivery).all()
|
|
assert len(deliveries) == 1
|
|
assert deliveries[0].success is False
|
|
assert deliveries[0].attempts == 3
|
|
|
|
def test_deliver_webhook_retries_on_exception(self, db_session, admin_user):
|
|
from engine.webhooks import deliver_webhook
|
|
from models import WebhookConfig, WebhookDelivery
|
|
import httpx as httpx_lib
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls, \
|
|
patch("engine.webhooks.time.sleep"):
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.side_effect = httpx_lib.ConnectError("Connection refused")
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
result = deliver_webhook(db_session, wh, "run.completed", {"test": True})
|
|
|
|
assert result is False
|
|
deliveries = db_session.query(WebhookDelivery).all()
|
|
assert len(deliveries) == 1
|
|
assert deliveries[0].success is False
|
|
assert "ConnectError" in deliveries[0].error_message
|
|
|
|
def test_deliver_webhook_succeeds_on_retry(self, db_session, admin_user):
|
|
"""Test that delivery succeeds if a later retry succeeds."""
|
|
from engine.webhooks import deliver_webhook
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
fail_resp = MagicMock()
|
|
fail_resp.status_code = 503
|
|
fail_resp.text = "Service Unavailable"
|
|
|
|
ok_resp = MagicMock()
|
|
ok_resp.status_code = 200
|
|
ok_resp.text = "OK"
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls, \
|
|
patch("engine.webhooks.time.sleep"):
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.side_effect = [fail_resp, ok_resp]
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
result = deliver_webhook(db_session, wh, "run.completed", {"test": True})
|
|
|
|
assert result is True
|
|
deliveries = db_session.query(WebhookDelivery).all()
|
|
assert len(deliveries) == 1
|
|
assert deliveries[0].success is True
|
|
assert deliveries[0].attempts == 2
|
|
|
|
def test_dispatch_webhooks(self, db_session, admin_user):
|
|
from engine.webhooks import dispatch_webhooks
|
|
from models import WebhookConfig
|
|
|
|
wh1 = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=True)
|
|
wh2 = WebhookConfig(event_type="run.completed", url="https://b.com/hook", is_active=True)
|
|
db_session.add_all([wh1, wh2])
|
|
db_session.commit()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.text = "OK"
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls:
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.return_value = mock_response
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
successes = dispatch_webhooks(db_session, "run.completed", {"run_id": "123"})
|
|
|
|
assert successes == 2
|
|
|
|
def test_dispatch_webhooks_no_matches(self, db_session, admin_user):
|
|
from engine.webhooks import dispatch_webhooks
|
|
successes = dispatch_webhooks(db_session, "nonexistent.event", {"test": True})
|
|
assert successes == 0
|
|
|
|
def test_dispatch_webhooks_skips_inactive(self, db_session, admin_user):
|
|
from engine.webhooks import dispatch_webhooks
|
|
from models import WebhookConfig
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=False)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
|
|
successes = dispatch_webhooks(db_session, "run.completed", {"test": True})
|
|
assert successes == 0
|
|
|
|
def test_deliver_webhook_with_custom_headers(self, db_session, admin_user):
|
|
from engine.webhooks import deliver_webhook
|
|
from models import WebhookConfig
|
|
|
|
wh = WebhookConfig(
|
|
event_type="run.completed",
|
|
url="https://example.com/hook",
|
|
headers={"Authorization": "Bearer secret", "X-Custom": "value"},
|
|
is_active=True,
|
|
)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.text = "OK"
|
|
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls:
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.return_value = mock_response
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
deliver_webhook(db_session, wh, "run.completed", {"test": True})
|
|
|
|
# Verify custom headers were sent
|
|
call_kwargs = mock_client.post.call_args
|
|
headers = call_kwargs.kwargs.get("headers") or call_kwargs[1].get("headers")
|
|
assert headers["Authorization"] == "Bearer secret"
|
|
assert headers["X-Custom"] == "value"
|
|
assert headers["Content-Type"] == "application/json"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fire Webhooks (task dispatch) Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFireWebhooks:
|
|
def test_fire_webhooks_sync_fallback(self, db_session, admin_user):
|
|
"""In single-container mode, fire_webhooks runs synchronously."""
|
|
from engine.tasks import fire_webhooks
|
|
from models import WebhookConfig
|
|
import engine.tasks as tasks_mod
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.text = "OK"
|
|
|
|
# Force sync fallback: ensure redis_url is None (use_in_process_queue → True)
|
|
tasks_mod.settings.redis_url = None
|
|
with patch("engine.webhooks.httpx.Client") as mock_client_cls:
|
|
mock_client = MagicMock()
|
|
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client.__exit__ = MagicMock(return_value=False)
|
|
mock_client.post.return_value = mock_response
|
|
mock_client_cls.return_value = mock_client
|
|
|
|
result = fire_webhooks("run.completed", {"run_id": "123"})
|
|
|
|
assert result.successful()
|
|
assert result.result["dispatched"] == 1
|
|
|
|
|
|
class TestWebhookDeliveryModel:
|
|
def test_delivery_model_creation(self, db_session, admin_user):
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
delivery = WebhookDelivery(
|
|
webhook_id=wh.id,
|
|
event_type="run.completed",
|
|
payload={"run_id": "123"},
|
|
status_code=200,
|
|
success=True,
|
|
attempts=1,
|
|
)
|
|
db_session.add(delivery)
|
|
db_session.commit()
|
|
db_session.refresh(delivery)
|
|
|
|
assert delivery.id is not None
|
|
assert delivery.webhook_id == wh.id
|
|
assert delivery.success is True
|
|
assert delivery.created_at is not None
|
|
|
|
def test_delivery_relationship(self, db_session, admin_user):
|
|
from models import WebhookConfig, WebhookDelivery
|
|
|
|
wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True)
|
|
db_session.add(wh)
|
|
db_session.commit()
|
|
db_session.refresh(wh)
|
|
|
|
d1 = WebhookDelivery(webhook_id=wh.id, event_type="run.completed", success=True, attempts=1)
|
|
d2 = WebhookDelivery(webhook_id=wh.id, event_type="run.completed", success=False, attempts=3)
|
|
db_session.add_all([d1, d2])
|
|
db_session.commit()
|
|
|
|
db_session.refresh(wh)
|
|
assert len(wh.deliveries) == 2
|