promptlooper/backend/tests/test_worker.py
John Lightner 7dad9d97af MAESTRO: Add entrypoint migrations, worker config, and stack integration tests
Create docker/entrypoint.sh to run alembic migrations on API startup.
Create backend/worker.py with Celery app config for the compose worker service.
Fix README single-container port (8000) and add production compose documentation.
Add 27 tests (stack integration + worker) verifying all Docker/compose artifacts
are present, consistent, and the /health endpoint responds correctly.
2026-04-07 02:09:56 -05:00

47 lines
1.6 KiB
Python

"""Tests for backend/worker.py — Celery configuration."""
import importlib
import sys
from unittest.mock import patch
def test_celery_app_is_importable():
"""worker.py exports a celery_app instance."""
# Need to ensure config module is importable
backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1])
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
import worker
assert hasattr(worker, "celery_app")
assert worker.celery_app.main == "promptlooper"
def test_celery_app_serializer_settings():
"""Verify JSON serialization is configured."""
backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1])
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
import worker
assert worker.celery_app.conf.task_serializer == "json"
assert worker.celery_app.conf.result_serializer == "json"
def test_celery_defaults_to_memory_broker_without_redis():
"""Without REDIS_URL, broker falls back to memory://."""
backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1])
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
with patch.dict("os.environ", {"REDIS_URL": ""}, clear=False):
# Force reload to pick up env change
if "config" in sys.modules:
importlib.reload(sys.modules["config"])
if "worker" in sys.modules:
importlib.reload(sys.modules["worker"])
import worker
# In no-redis mode, broker should be memory://
# (may have been set from settings.redis_url == None)
assert worker.celery_app is not None