"""Tests for backend/worker.py — Celery configuration.""" import importlib import sys from unittest.mock import patch def test_celery_app_is_importable(): """worker.py exports a celery_app instance.""" # Need to ensure config module is importable backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1]) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) import worker assert hasattr(worker, "celery_app") assert worker.celery_app.main == "promptlooper" def test_celery_app_serializer_settings(): """Verify JSON serialization is configured.""" backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1]) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) import worker assert worker.celery_app.conf.task_serializer == "json" assert worker.celery_app.conf.result_serializer == "json" def test_celery_defaults_to_memory_broker_without_redis(): """Without REDIS_URL, broker falls back to memory://.""" backend_dir = str(__import__("pathlib").Path(__file__).resolve().parents[1]) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) with patch.dict("os.environ", {"REDIS_URL": ""}, clear=False): # Force reload to pick up env change if "config" in sys.modules: importlib.reload(sys.modules["config"]) if "worker" in sys.modules: importlib.reload(sys.modules["worker"]) import worker # In no-redis mode, broker should be memory:// # (may have been set from settings.redis_url == None) assert worker.celery_app is not None