diff --git a/Auto Run Docs/02a-backend-engine.md b/Auto Run Docs/02a-backend-engine.md index f8cc8f9..788df79 100644 --- a/Auto Run Docs/02a-backend-engine.md +++ b/Auto Run Docs/02a-backend-engine.md @@ -47,6 +47,7 @@ Implement the core experiment execution engine: LLM adapters, response caching, - [x] Implement backend/websocket/manager.py — WebSocket connection manager that: maintains active connections per experiment and globally, receives Redis pub/sub messages and broadcasts to relevant connections, handles connection/disconnection cleanly, supports reconnection with message replay (last N events). -- [ ] Implement backend/routers/webhooks.py — CRUD for webhook configs. When events occur (in runner.py and sweep.py), dispatch webhook calls asynchronously via Celery. Include retry logic (3 attempts) and log delivery status. +- [x] Implement backend/routers/webhooks.py — CRUD for webhook configs. When events occur (in runner.py and sweep.py), dispatch webhook calls asynchronously via Celery. Include retry logic (3 attempts) and log delivery status. + - [ ] Write tests for the core engine: test cache hash determinism, test adapter mock calls, test scorer implementations with known inputs, test sweep configuration generation (grid should produce correct number of combos, random should respect ranges). Aim for >80% coverage on engine/ directory. diff --git a/backend/engine/runner.py b/backend/engine/runner.py index 1a640b2..67b4913 100644 --- a/backend/engine/runner.py +++ b/backend/engine/runner.py @@ -256,15 +256,23 @@ async def run_single( run.tokens_out = total_tokens_out db.commit() + completed_event = { + "type": "run.completed", + "run_id": str(run.id), + "experiment_id": str(run.experiment_id), + "duration_ms": duration_ms, + "tokens_in": total_tokens_in, + "tokens_out": total_tokens_out, + } if event_bus: - event_bus.publish({ - "type": "run.completed", - "run_id": str(run.id), - "experiment_id": str(run.experiment_id), - "duration_ms": duration_ms, - "tokens_in": total_tokens_in, - "tokens_out": total_tokens_out, - }) + event_bus.publish(completed_event) + + # Fire webhooks asynchronously + try: + from engine.tasks import fire_webhooks + fire_webhooks("run.completed", completed_event) + except Exception: + logger.warning("Failed to dispatch run.completed webhooks", exc_info=True) except Exception as exc: logger.error("Run %s failed: %s", run.id, exc, exc_info=True) @@ -273,13 +281,21 @@ async def run_single( run.duration_ms = int((time.perf_counter() - t_start) * 1000) db.commit() + failed_event = { + "type": "run.failed", + "run_id": str(run.id), + "experiment_id": str(run.experiment_id), + "error": str(exc), + } if event_bus: - event_bus.publish({ - "type": "run.failed", - "run_id": str(run.id), - "experiment_id": str(run.experiment_id), - "error": str(exc), - }) + event_bus.publish(failed_event) + + # Fire webhooks asynchronously + try: + from engine.tasks import fire_webhooks + fire_webhooks("run.failed", failed_event) + except Exception: + logger.warning("Failed to dispatch run.failed webhooks", exc_info=True) raise return run diff --git a/backend/engine/sweep.py b/backend/engine/sweep.py index b6d6786..1573848 100644 --- a/backend/engine/sweep.py +++ b/backend/engine/sweep.py @@ -278,15 +278,23 @@ async def run_sweep( experiment.status = ExperimentStatus.completed db.commit() + sweep_completed_event = { + "type": "sweep.completed", + "experiment_id": str(experiment.id), + "total_runs": result.total_runs, + "completed_runs": result.completed_runs, + "failed_runs": result.failed_runs, + "stopped_reason": result.stopped_reason, + } if event_bus: - event_bus.publish({ - "type": "sweep.completed", - "experiment_id": str(experiment.id), - "total_runs": result.total_runs, - "completed_runs": result.completed_runs, - "failed_runs": result.failed_runs, - "stopped_reason": result.stopped_reason, - }) + event_bus.publish(sweep_completed_event) + + # Fire webhooks asynchronously + try: + from engine.tasks import fire_webhooks + fire_webhooks("sweep.completed", sweep_completed_event) + except Exception: + logger.warning("Failed to dispatch sweep.completed webhooks", exc_info=True) return result diff --git a/backend/engine/tasks.py b/backend/engine/tasks.py index 0e8bb4b..371cba4 100644 --- a/backend/engine/tasks.py +++ b/backend/engine/tasks.py @@ -56,6 +56,18 @@ def _get_event_bus(): return EventBus(redis_client=redis_client) +def _do_dispatch_webhooks(event_type: str, payload: dict[str, Any]) -> dict[str, Any]: + """Core logic for dispatching webhooks (used by both Celery and sync paths).""" + from engine.webhooks import dispatch_webhooks + + db = _get_db_session() + try: + successes = dispatch_webhooks(db, event_type, payload) + return {"event_type": event_type, "dispatched": successes} + finally: + db.close() + + def _get_cache(): """Create a ResponseCacheLayer.""" from engine.cache import ResponseCacheLayer @@ -193,10 +205,17 @@ try: logger.info("Celery task execute_sweep started: experiment_id=%s", experiment_id) return _do_execute_sweep(experiment_id, sweep_config, endpoint_config) + @celery_app.task(name="engine.dispatch_webhooks", bind=True, max_retries=0) + def dispatch_webhooks_task(self, event_type: str, payload: dict[str, Any]) -> dict[str, Any]: + """Celery task: dispatch webhooks for an event.""" + logger.info("Celery task dispatch_webhooks started: event_type=%s", event_type) + return _do_dispatch_webhooks(event_type, payload) + except ImportError: # Celery not available — tasks will only be usable via synchronous fallback execute_run = None execute_sweep = None + dispatch_webhooks_task = None # --------------------------------------------------------------------------- @@ -264,3 +283,22 @@ def dispatch_sweep( except Exception as exc: logger.error("Sync sweep %s failed: %s", experiment_id, exc, exc_info=True) return SyncTaskResult(error=exc) + + +def fire_webhooks(event_type: str, payload: dict[str, Any]) -> Any: + """Dispatch webhook delivery — Celery if available, synchronous otherwise. + + Call this from runner.py / sweep.py whenever an event occurs that + should trigger webhooks. + """ + if not settings.use_in_process_queue and dispatch_webhooks_task is not None: + return dispatch_webhooks_task.delay(event_type, payload) + + # Synchronous fallback + logger.info("Sync fallback: dispatching webhooks for event_type=%s", event_type) + try: + result = _do_dispatch_webhooks(event_type, payload) + return SyncTaskResult(result=result) + except Exception as exc: + logger.error("Sync webhook dispatch failed: %s", exc, exc_info=True) + return SyncTaskResult(error=exc) diff --git a/backend/engine/webhooks.py b/backend/engine/webhooks.py new file mode 100644 index 0000000..76602de --- /dev/null +++ b/backend/engine/webhooks.py @@ -0,0 +1,184 @@ +"""Webhook dispatch — fire HTTP callbacks when events occur. + +Active webhook configs matching the event_type are fetched from the DB, +and each gets an async HTTP POST with the event payload. Retries up to +3 attempts with exponential backoff. Delivery status is logged to the +WebhookDelivery table. +""" + +import json +import logging +import time +import uuid +from datetime import datetime, timezone +from typing import Any + +import httpx +from sqlalchemy.orm import Session + +from models import WebhookConfig, WebhookDelivery + +logger = logging.getLogger(__name__) + +MAX_RETRIES = 3 +BACKOFF_BASE = 2 # seconds +TIMEOUT = 10 # seconds per request + + +def get_active_webhooks(db: Session, event_type: str) -> list[WebhookConfig]: + """Return all active webhooks matching the given event_type.""" + return ( + db.query(WebhookConfig) + .filter(WebhookConfig.event_type == event_type, WebhookConfig.is_active.is_(True)) + .all() + ) + + +def _log_delivery( + db: Session, + webhook_id: uuid.UUID, + event_type: str, + payload: dict[str, Any], + status_code: int | None, + success: bool, + attempts: int, + error_message: str | None = None, +) -> WebhookDelivery: + """Create a WebhookDelivery record.""" + delivery = WebhookDelivery( + webhook_id=webhook_id, + event_type=event_type, + payload=payload, + status_code=status_code, + success=success, + attempts=attempts, + error_message=error_message, + ) + db.add(delivery) + db.commit() + db.refresh(delivery) + return delivery + + +def deliver_webhook( + db: Session, + webhook: WebhookConfig, + event_type: str, + payload: dict[str, Any], +) -> bool: + """Deliver a webhook synchronously with retry logic. + + Returns True if delivery succeeded, False otherwise. + """ + headers = {"Content-Type": "application/json"} + if webhook.headers: + headers.update(webhook.headers) + + last_error: str | None = None + last_status_code: int | None = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + with httpx.Client(timeout=TIMEOUT) as client: + response = client.post( + webhook.url, + content=json.dumps(payload, default=str), + headers=headers, + ) + last_status_code = response.status_code + if 200 <= response.status_code < 300: + _log_delivery(db, webhook.id, event_type, payload, response.status_code, True, attempt) + return True + last_error = f"HTTP {response.status_code}: {response.text[:500]}" + except Exception as exc: + last_error = f"{type(exc).__name__}: {str(exc)[:500]}" + last_status_code = None + + if attempt < MAX_RETRIES: + time.sleep(BACKOFF_BASE ** attempt) + + # All retries exhausted + logger.warning( + "Webhook delivery failed after %d attempts: webhook_id=%s url=%s error=%s", + MAX_RETRIES, webhook.id, webhook.url, last_error, + ) + _log_delivery(db, webhook.id, event_type, payload, last_status_code, False, MAX_RETRIES, last_error) + return False + + +async def deliver_webhook_async( + db: Session, + webhook: WebhookConfig, + event_type: str, + payload: dict[str, Any], +) -> bool: + """Deliver a webhook asynchronously with retry logic. + + Returns True if delivery succeeded, False otherwise. + """ + headers = {"Content-Type": "application/json"} + if webhook.headers: + headers.update(webhook.headers) + + last_error: str | None = None + last_status_code: int | None = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + async with httpx.AsyncClient(timeout=TIMEOUT) as client: + response = await client.post( + webhook.url, + content=json.dumps(payload, default=str), + headers=headers, + ) + last_status_code = response.status_code + if 200 <= response.status_code < 300: + _log_delivery(db, webhook.id, event_type, payload, response.status_code, True, attempt) + return True + last_error = f"HTTP {response.status_code}: {response.text[:500]}" + except Exception as exc: + last_error = f"{type(exc).__name__}: {str(exc)[:500]}" + last_status_code = None + + if attempt < MAX_RETRIES: + import asyncio + await asyncio.sleep(BACKOFF_BASE ** attempt) + + logger.warning( + "Webhook delivery failed after %d attempts: webhook_id=%s url=%s error=%s", + MAX_RETRIES, webhook.id, webhook.url, last_error, + ) + _log_delivery(db, webhook.id, event_type, payload, last_status_code, False, MAX_RETRIES, last_error) + return False + + +def dispatch_webhooks(db: Session, event_type: str, payload: dict[str, Any]) -> int: + """Find active webhooks for event_type and deliver to each. + + Returns the number of successful deliveries. + """ + webhooks = get_active_webhooks(db, event_type) + if not webhooks: + return 0 + + successes = 0 + for webhook in webhooks: + if deliver_webhook(db, webhook, event_type, payload): + successes += 1 + return successes + + +async def dispatch_webhooks_async(db: Session, event_type: str, payload: dict[str, Any]) -> int: + """Async variant — find active webhooks and deliver to each. + + Returns the number of successful deliveries. + """ + webhooks = get_active_webhooks(db, event_type) + if not webhooks: + return 0 + + successes = 0 + for webhook in webhooks: + if await deliver_webhook_async(db, webhook, event_type, payload): + successes += 1 + return successes diff --git a/backend/models.py b/backend/models.py index 00191ff..54212eb 100644 --- a/backend/models.py +++ b/backend/models.py @@ -289,6 +289,38 @@ class WebhookConfig(Base): headers: Mapped[dict | None] = mapped_column(JSON, nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) + # Relationships + deliveries: Mapped[list["WebhookDelivery"]] = relationship( + back_populates="webhook", cascade="all, delete-orphan" + ) + __table_args__ = ( Index("ix_webhook_configs_event_type", "event_type"), ) + + +class WebhookDelivery(Base): + __tablename__ = "webhook_deliveries" + + id: Mapped[uuid.UUID] = mapped_column( + primary_key=True, default=_new_uuid + ) + webhook_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("webhook_configs.id", ondelete="CASCADE"), nullable=False + ) + event_type: Mapped[str] = mapped_column(String(255), nullable=False) + payload: Mapped[dict | None] = mapped_column(JSON, nullable=True) + status_code: Mapped[int | None] = mapped_column(Integer, nullable=True) + success: Mapped[bool] = mapped_column(Boolean, nullable=False) + attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=_utcnow, nullable=False + ) + + # Relationships + webhook: Mapped["WebhookConfig"] = relationship(back_populates="deliveries") + + __table_args__ = ( + Index("ix_webhook_deliveries_webhook_id", "webhook_id"), + ) diff --git a/backend/routers/webhooks.py b/backend/routers/webhooks.py index 17bc4b4..5dce3f0 100644 --- a/backend/routers/webhooks.py +++ b/backend/routers/webhooks.py @@ -1,25 +1,124 @@ -"""Webhooks router — manage webhook configurations.""" +"""Webhooks router — CRUD for webhook configurations and async dispatch. + +Webhooks fire when events occur in runner.py and sweep.py. Delivery is +dispatched asynchronously via Celery (or synchronous fallback). Each delivery +attempt retries up to 3 times with exponential backoff. Delivery status is +logged to the WebhookDelivery table. +""" import uuid -from fastapi import APIRouter, Response +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from auth import get_current_user +from main import get_db +from models import User, WebhookConfig +from schemas import ( + WebhookCreate, + WebhookListResponse, + WebhookResponse, + WebhookUpdate, +) router = APIRouter() -@router.get("/", status_code=501) -def list_webhooks(): - """List webhook configs.""" - return Response(status_code=501, content="Not Implemented") +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- -@router.post("/", status_code=501) -def create_webhook(): - """Create webhook.""" - return Response(status_code=501, content="Not Implemented") +def _get_webhook_or_404(db: Session, webhook_id: uuid.UUID) -> WebhookConfig: + webhook = db.query(WebhookConfig).filter(WebhookConfig.id == webhook_id).first() + if webhook is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Webhook not found") + return webhook -@router.delete("/{webhook_id}", status_code=501) -def delete_webhook(webhook_id: uuid.UUID): - """Remove webhook.""" - return Response(status_code=501, content="Not Implemented") +# --------------------------------------------------------------------------- +# CRUD +# --------------------------------------------------------------------------- + + +@router.get("/", response_model=WebhookListResponse) +def list_webhooks( + event_type: str | None = None, + db: Session = Depends(get_db), + _user: User = Depends(get_current_user), +) -> WebhookListResponse: + """List all webhook configurations, optionally filtered by event_type.""" + query = db.query(WebhookConfig) + if event_type: + query = query.filter(WebhookConfig.event_type == event_type) + webhooks = query.order_by(WebhookConfig.event_type).all() + return WebhookListResponse( + items=[WebhookResponse.model_validate(wh) for wh in webhooks], + total=len(webhooks), + ) + + +@router.post("/", response_model=WebhookResponse, status_code=status.HTTP_201_CREATED) +def create_webhook( + body: WebhookCreate, + db: Session = Depends(get_db), + _user: User = Depends(get_current_user), +) -> WebhookResponse: + """Create a new webhook configuration.""" + webhook = WebhookConfig( + event_type=body.event_type, + url=body.url, + headers=body.headers, + is_active=body.is_active, + ) + db.add(webhook) + db.commit() + db.refresh(webhook) + return WebhookResponse.model_validate(webhook) + + +@router.get("/{webhook_id}", response_model=WebhookResponse) +def get_webhook( + webhook_id: uuid.UUID, + db: Session = Depends(get_db), + _user: User = Depends(get_current_user), +) -> WebhookResponse: + """Get a single webhook configuration.""" + webhook = _get_webhook_or_404(db, webhook_id) + return WebhookResponse.model_validate(webhook) + + +@router.put("/{webhook_id}", response_model=WebhookResponse) +def update_webhook( + webhook_id: uuid.UUID, + body: WebhookUpdate, + db: Session = Depends(get_db), + _user: User = Depends(get_current_user), +) -> WebhookResponse: + """Update a webhook configuration.""" + webhook = _get_webhook_or_404(db, webhook_id) + + if body.event_type is not None: + webhook.event_type = body.event_type + if body.url is not None: + webhook.url = body.url + if body.headers is not None: + webhook.headers = body.headers + if body.is_active is not None: + webhook.is_active = body.is_active + + db.commit() + db.refresh(webhook) + return WebhookResponse.model_validate(webhook) + + +@router.delete("/{webhook_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_webhook( + webhook_id: uuid.UUID, + db: Session = Depends(get_db), + _user: User = Depends(get_current_user), +) -> None: + """Delete a webhook configuration.""" + webhook = _get_webhook_or_404(db, webhook_id) + db.delete(webhook) + db.commit() diff --git a/backend/tests/test_routers.py b/backend/tests/test_routers.py index 5652b1c..ddbcf64 100644 --- a/backend/tests/test_routers.py +++ b/backend/tests/test_routers.py @@ -196,17 +196,17 @@ def test_export_report(client): def test_webhooks_list(client): resp = client.get("/api/webhooks/") - assert resp.status_code == 501 + assert resp.status_code == 401 # auth required def test_webhooks_create(client): resp = client.post("/api/webhooks/") - assert resp.status_code == 501 + assert resp.status_code == 401 # auth required def test_webhooks_delete(client): resp = client.delete("/api/webhooks/00000000-0000-0000-0000-000000000001") - assert resp.status_code == 501 + assert resp.status_code == 401 # auth required # ---- Admin router (/api/admin) ---- diff --git a/backend/tests/test_webhooks.py b/backend/tests/test_webhooks.py new file mode 100644 index 0000000..a5f92ef --- /dev/null +++ b/backend/tests/test_webhooks.py @@ -0,0 +1,555 @@ +"""Tests for backend/routers/webhooks.py — Webhook CRUD + dispatch logic.""" + +import os +import uuid +from unittest.mock import patch, MagicMock + +import pytest +from fastapi.testclient import TestClient + + +JWT_SECRET = "test-secret-key-for-jwt-signing" +API_KEY = "test-api-key-12345" + + +@pytest.fixture(autouse=True) +def _isolate_settings(tmp_path): + """Ensure tests use a temp SQLite DB and no Redis.""" + env = { + "DATABASE_URL": f"sqlite:///{tmp_path / 'test.db'}", + "REDIS_URL": "", + "DATA_DIR": str(tmp_path), + "JWT_SECRET": JWT_SECRET, + "API_KEY": API_KEY, + } + with patch.dict(os.environ, env, clear=False): + import config + new_settings = config.Settings(_env_file=None) + config.settings = new_settings + + import main + main.settings = new_settings + main._init_db() + main._init_redis() + + from models import Base + Base.metadata.create_all(bind=main.engine) + + import auth + auth.settings = new_settings + + yield + + +@pytest.fixture +def db_session(): + from main import get_db + gen = get_db() + session = next(gen) + yield session + try: + next(gen) + except StopIteration: + pass + + +@pytest.fixture +def admin_user(db_session): + from auth import hash_password + from models import User + user = User(username="admin", password_hash=hash_password("adminpass"), is_admin=True) + db_session.add(user) + db_session.commit() + db_session.refresh(user) + return user + + +@pytest.fixture +def auth_headers(): + return {"X-Api-Key": API_KEY} + + +@pytest.fixture +def client(): + from main import app + return TestClient(app) + + +# --------------------------------------------------------------------------- +# CRUD Tests +# --------------------------------------------------------------------------- + + +class TestListWebhooks: + def test_list_empty(self, client, admin_user, auth_headers): + resp = client.get("/api/webhooks/", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["items"] == [] + assert data["total"] == 0 + + def test_list_returns_created(self, client, admin_user, auth_headers): + client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", + "url": "https://example.com/hook", + }) + resp = client.get("/api/webhooks/", headers=auth_headers) + assert resp.status_code == 200 + assert resp.json()["total"] == 1 + + def test_list_filter_by_event_type(self, client, admin_user, auth_headers): + client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/a", + }) + client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "sweep.completed", "url": "https://example.com/b", + }) + resp = client.get("/api/webhooks/?event_type=run.completed", headers=auth_headers) + assert resp.json()["total"] == 1 + assert resp.json()["items"][0]["event_type"] == "run.completed" + + def test_list_requires_auth(self, client): + resp = client.get("/api/webhooks/") + assert resp.status_code == 401 + + +class TestCreateWebhook: + def test_create_basic(self, client, admin_user, auth_headers): + resp = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", + "url": "https://example.com/hook", + }) + assert resp.status_code == 201 + data = resp.json() + assert data["event_type"] == "run.completed" + assert data["url"] == "https://example.com/hook" + assert data["is_active"] is True + assert data["headers"] is None + assert "id" in data + + def test_create_with_headers(self, client, admin_user, auth_headers): + resp = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "sweep.completed", + "url": "https://example.com/hook", + "headers": {"X-Custom": "value"}, + }) + assert resp.status_code == 201 + assert resp.json()["headers"] == {"X-Custom": "value"} + + def test_create_inactive(self, client, admin_user, auth_headers): + resp = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.failed", + "url": "https://example.com/hook", + "is_active": False, + }) + assert resp.status_code == 201 + assert resp.json()["is_active"] is False + + def test_create_requires_auth(self, client): + resp = client.post("/api/webhooks/", json={ + "event_type": "run.completed", + "url": "https://example.com/hook", + }) + assert resp.status_code == 401 + + def test_create_missing_fields(self, client, admin_user, auth_headers): + resp = client.post("/api/webhooks/", headers=auth_headers, json={}) + assert resp.status_code == 422 + + +class TestGetWebhook: + def test_get_existing(self, client, admin_user, auth_headers): + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/hook", + }) + wh_id = create.json()["id"] + resp = client.get(f"/api/webhooks/{wh_id}", headers=auth_headers) + assert resp.status_code == 200 + assert resp.json()["id"] == wh_id + + def test_get_not_found(self, client, admin_user, auth_headers): + resp = client.get(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers) + assert resp.status_code == 404 + + +class TestUpdateWebhook: + def test_update_event_type(self, client, admin_user, auth_headers): + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/hook", + }) + wh_id = create.json()["id"] + resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={ + "event_type": "run.failed", + }) + assert resp.status_code == 200 + assert resp.json()["event_type"] == "run.failed" + assert resp.json()["url"] == "https://example.com/hook" # unchanged + + def test_update_url(self, client, admin_user, auth_headers): + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/old", + }) + wh_id = create.json()["id"] + resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={ + "url": "https://example.com/new", + }) + assert resp.status_code == 200 + assert resp.json()["url"] == "https://example.com/new" + + def test_update_deactivate(self, client, admin_user, auth_headers): + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/hook", + }) + wh_id = create.json()["id"] + resp = client.put(f"/api/webhooks/{wh_id}", headers=auth_headers, json={ + "is_active": False, + }) + assert resp.status_code == 200 + assert resp.json()["is_active"] is False + + def test_update_not_found(self, client, admin_user, auth_headers): + resp = client.put(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers, json={ + "url": "https://example.com/new", + }) + assert resp.status_code == 404 + + +class TestDeleteWebhook: + def test_delete_existing(self, client, admin_user, auth_headers): + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/hook", + }) + wh_id = create.json()["id"] + resp = client.delete(f"/api/webhooks/{wh_id}", headers=auth_headers) + assert resp.status_code == 204 + + # Verify deleted + resp = client.get(f"/api/webhooks/{wh_id}", headers=auth_headers) + assert resp.status_code == 404 + + def test_delete_not_found(self, client, admin_user, auth_headers): + resp = client.delete(f"/api/webhooks/{uuid.uuid4()}", headers=auth_headers) + assert resp.status_code == 404 + + def test_delete_cascades_deliveries(self, client, admin_user, auth_headers, db_session): + """Deleting a webhook should cascade-delete its delivery records.""" + from models import WebhookConfig, WebhookDelivery + + create = client.post("/api/webhooks/", headers=auth_headers, json={ + "event_type": "run.completed", "url": "https://example.com/hook", + }) + wh_id = create.json()["id"] + + # Manually add a delivery record + delivery = WebhookDelivery( + webhook_id=uuid.UUID(wh_id), + event_type="run.completed", + payload={"test": True}, + success=True, + attempts=1, + ) + db_session.add(delivery) + db_session.commit() + + # Delete webhook — should cascade + resp = client.delete(f"/api/webhooks/{wh_id}", headers=auth_headers) + assert resp.status_code == 204 + + # Verify delivery is gone too + assert db_session.query(WebhookDelivery).filter( + WebhookDelivery.webhook_id == uuid.UUID(wh_id) + ).count() == 0 + + +# --------------------------------------------------------------------------- +# Webhook Dispatch Tests +# --------------------------------------------------------------------------- + + +class TestWebhookDispatch: + def test_get_active_webhooks(self, db_session, admin_user): + from engine.webhooks import get_active_webhooks + from models import WebhookConfig + + wh1 = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=True) + wh2 = WebhookConfig(event_type="run.completed", url="https://b.com/hook", is_active=False) + wh3 = WebhookConfig(event_type="sweep.completed", url="https://c.com/hook", is_active=True) + db_session.add_all([wh1, wh2, wh3]) + db_session.commit() + + active = get_active_webhooks(db_session, "run.completed") + assert len(active) == 1 + assert active[0].url == "https://a.com/hook" + + def test_deliver_webhook_success(self, db_session, admin_user): + from engine.webhooks import deliver_webhook + from models import WebhookConfig, WebhookDelivery + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + import httpx + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = "OK" + + with patch("engine.webhooks.httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.return_value = mock_response + mock_client_cls.return_value = mock_client + + result = deliver_webhook(db_session, wh, "run.completed", {"test": True}) + + assert result is True + deliveries = db_session.query(WebhookDelivery).all() + assert len(deliveries) == 1 + assert deliveries[0].success is True + assert deliveries[0].status_code == 200 + assert deliveries[0].attempts == 1 + + def test_deliver_webhook_retries_on_failure(self, db_session, admin_user): + from engine.webhooks import deliver_webhook + from models import WebhookConfig, WebhookDelivery + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + + with patch("engine.webhooks.httpx.Client") as mock_client_cls, \ + patch("engine.webhooks.time.sleep"): # skip backoff delays + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.return_value = mock_response + mock_client_cls.return_value = mock_client + + result = deliver_webhook(db_session, wh, "run.completed", {"test": True}) + + assert result is False + # Should have retried 3 times + assert mock_client.post.call_count == 3 + deliveries = db_session.query(WebhookDelivery).all() + assert len(deliveries) == 1 + assert deliveries[0].success is False + assert deliveries[0].attempts == 3 + + def test_deliver_webhook_retries_on_exception(self, db_session, admin_user): + from engine.webhooks import deliver_webhook + from models import WebhookConfig, WebhookDelivery + import httpx as httpx_lib + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + with patch("engine.webhooks.httpx.Client") as mock_client_cls, \ + patch("engine.webhooks.time.sleep"): + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.side_effect = httpx_lib.ConnectError("Connection refused") + mock_client_cls.return_value = mock_client + + result = deliver_webhook(db_session, wh, "run.completed", {"test": True}) + + assert result is False + deliveries = db_session.query(WebhookDelivery).all() + assert len(deliveries) == 1 + assert deliveries[0].success is False + assert "ConnectError" in deliveries[0].error_message + + def test_deliver_webhook_succeeds_on_retry(self, db_session, admin_user): + """Test that delivery succeeds if a later retry succeeds.""" + from engine.webhooks import deliver_webhook + from models import WebhookConfig, WebhookDelivery + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + fail_resp = MagicMock() + fail_resp.status_code = 503 + fail_resp.text = "Service Unavailable" + + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = "OK" + + with patch("engine.webhooks.httpx.Client") as mock_client_cls, \ + patch("engine.webhooks.time.sleep"): + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.side_effect = [fail_resp, ok_resp] + mock_client_cls.return_value = mock_client + + result = deliver_webhook(db_session, wh, "run.completed", {"test": True}) + + assert result is True + deliveries = db_session.query(WebhookDelivery).all() + assert len(deliveries) == 1 + assert deliveries[0].success is True + assert deliveries[0].attempts == 2 + + def test_dispatch_webhooks(self, db_session, admin_user): + from engine.webhooks import dispatch_webhooks + from models import WebhookConfig + + wh1 = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=True) + wh2 = WebhookConfig(event_type="run.completed", url="https://b.com/hook", is_active=True) + db_session.add_all([wh1, wh2]) + db_session.commit() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = "OK" + + with patch("engine.webhooks.httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.return_value = mock_response + mock_client_cls.return_value = mock_client + + successes = dispatch_webhooks(db_session, "run.completed", {"run_id": "123"}) + + assert successes == 2 + + def test_dispatch_webhooks_no_matches(self, db_session, admin_user): + from engine.webhooks import dispatch_webhooks + successes = dispatch_webhooks(db_session, "nonexistent.event", {"test": True}) + assert successes == 0 + + def test_dispatch_webhooks_skips_inactive(self, db_session, admin_user): + from engine.webhooks import dispatch_webhooks + from models import WebhookConfig + + wh = WebhookConfig(event_type="run.completed", url="https://a.com/hook", is_active=False) + db_session.add(wh) + db_session.commit() + + successes = dispatch_webhooks(db_session, "run.completed", {"test": True}) + assert successes == 0 + + def test_deliver_webhook_with_custom_headers(self, db_session, admin_user): + from engine.webhooks import deliver_webhook + from models import WebhookConfig + + wh = WebhookConfig( + event_type="run.completed", + url="https://example.com/hook", + headers={"Authorization": "Bearer secret", "X-Custom": "value"}, + is_active=True, + ) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = "OK" + + with patch("engine.webhooks.httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.return_value = mock_response + mock_client_cls.return_value = mock_client + + deliver_webhook(db_session, wh, "run.completed", {"test": True}) + + # Verify custom headers were sent + call_kwargs = mock_client.post.call_args + headers = call_kwargs.kwargs.get("headers") or call_kwargs[1].get("headers") + assert headers["Authorization"] == "Bearer secret" + assert headers["X-Custom"] == "value" + assert headers["Content-Type"] == "application/json" + + +# --------------------------------------------------------------------------- +# Fire Webhooks (task dispatch) Tests +# --------------------------------------------------------------------------- + + +class TestFireWebhooks: + def test_fire_webhooks_sync_fallback(self, db_session, admin_user): + """In single-container mode, fire_webhooks runs synchronously.""" + from engine.tasks import fire_webhooks + from models import WebhookConfig + import engine.tasks as tasks_mod + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = "OK" + + # Force sync fallback: ensure redis_url is None (use_in_process_queue → True) + tasks_mod.settings.redis_url = None + with patch("engine.webhooks.httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.post.return_value = mock_response + mock_client_cls.return_value = mock_client + + result = fire_webhooks("run.completed", {"run_id": "123"}) + + assert result.successful() + assert result.result["dispatched"] == 1 + + +class TestWebhookDeliveryModel: + def test_delivery_model_creation(self, db_session, admin_user): + from models import WebhookConfig, WebhookDelivery + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + delivery = WebhookDelivery( + webhook_id=wh.id, + event_type="run.completed", + payload={"run_id": "123"}, + status_code=200, + success=True, + attempts=1, + ) + db_session.add(delivery) + db_session.commit() + db_session.refresh(delivery) + + assert delivery.id is not None + assert delivery.webhook_id == wh.id + assert delivery.success is True + assert delivery.created_at is not None + + def test_delivery_relationship(self, db_session, admin_user): + from models import WebhookConfig, WebhookDelivery + + wh = WebhookConfig(event_type="run.completed", url="https://example.com/hook", is_active=True) + db_session.add(wh) + db_session.commit() + db_session.refresh(wh) + + d1 = WebhookDelivery(webhook_id=wh.id, event_type="run.completed", success=True, attempts=1) + d2 = WebhookDelivery(webhook_id=wh.id, event_type="run.completed", success=False, attempts=3) + db_session.add_all([d1, d2]) + db_session.commit() + + db_session.refresh(wh) + assert len(wh.deliveries) == 2