From 39b5d7c0be93d7c7b80c47c3d4c0eee789ad0c79 Mon Sep 17 00:00:00 2001 From: jlightner Date: Sun, 5 Apr 2026 06:01:54 +0000 Subject: [PATCH] fix: LLM config, task time limits, base_url, fallback model, debug Redis caching - Add BASE_URL setting to config.py, replace hardcoded localhost:8096 in notifications - Fix LLM_FALLBACK_MODEL default from fyn-llm-agent-chat to qwen2.5:7b - Fix docker-compose LLM_FALLBACK_MODEL to use env var with correct default - Add BASE_URL env var to API and worker in docker-compose.yml - Add soft_time_limit/time_limit to all pipeline stage tasks (prevent stuck workers) - Cache Redis connection in _is_debug_mode() instead of creating per-call - Remove duplicate test files in backend/tests/notifications/ --- backend/config.py | 5 +- backend/pipeline/stages.py | 28 +- backend/tasks/notifications.py | 2 +- backend/tests/notifications/conftest.py | 3 - backend/tests/notifications/pytest.ini | 2 - .../tests/notifications/test_notifications.py | 395 ------------------ docker-compose.yml | 6 +- 7 files changed, 27 insertions(+), 414 deletions(-) delete mode 100644 backend/tests/notifications/conftest.py delete mode 100644 backend/tests/notifications/pytest.ini delete mode 100644 backend/tests/notifications/test_notifications.py diff --git a/backend/config.py b/backend/config.py index 2b0f8a8..b15e993 100644 --- a/backend/config.py +++ b/backend/config.py @@ -31,7 +31,7 @@ class Settings(BaseSettings): llm_api_key: str = "sk-placeholder" llm_model: str = "fyn-llm-agent-chat" llm_fallback_url: str = "http://localhost:11434/v1" - llm_fallback_model: str = "fyn-llm-agent-chat" + llm_fallback_model: str = "qwen2.5:7b" # Per-stage model overrides (optional — falls back to llm_model / "chat") llm_stage2_model: str | None = "fyn-llm-agent-chat" # segmentation — mechanical, fast chat @@ -91,6 +91,9 @@ class Settings(BaseSettings): smtp_from_address: str = "" smtp_tls: bool = True + # Public base URL for links in emails and external references + base_url: str = "http://localhost:8096" + # Rate limiting (per hour) rate_limit_user_per_hour: int = 30 rate_limit_ip_per_hour: int = 10 diff --git a/backend/pipeline/stages.py b/backend/pipeline/stages.py index a0d2269..950d0d8 100644 --- a/backend/pipeline/stages.py +++ b/backend/pipeline/stages.py @@ -127,19 +127,25 @@ def _emit_event( def _is_debug_mode() -> bool: - """Check if debug mode is enabled via Redis. Falls back to config setting.""" + """Check if debug mode is enabled via Redis. Falls back to config setting. + + Uses a module-level Redis connection to avoid per-call connection overhead. + """ try: import redis - settings = get_settings() - r = redis.from_url(settings.redis_url) - val = r.get("chrysopedia:debug_mode") - r.close() + global _debug_redis + if _debug_redis is None: + settings = get_settings() + _debug_redis = redis.from_url(settings.redis_url, socket_connect_timeout=1) + val = _debug_redis.get("chrysopedia:debug_mode") if val is not None: return val.decode().lower() == "true" except Exception: - pass + _debug_redis = None # Reset on error so next call retries return getattr(get_settings(), "debug_mode", False) +_debug_redis = None + def _make_llm_callback( video_id: str, @@ -389,7 +395,7 @@ def _safe_parse_llm_response( # ── Stage 2: Segmentation ─────────────────────────────────────────────────── -@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=300, time_limit=360) def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str: """Analyze transcript segments and identify topic boundaries. @@ -471,7 +477,7 @@ def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str: # ── Stage 3: Extraction ───────────────────────────────────────────────────── -@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=600, time_limit=660) def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str: """Extract key moments from each topic segment group. @@ -639,7 +645,7 @@ def _classify_moment_batch( ) -@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=300, time_limit=360) def stage4_classification(self, video_id: str, run_id: str | None = None) -> str: """Classify key moments against the canonical tag taxonomy. @@ -1248,7 +1254,7 @@ def _merge_pages_by_slug( return final_pages -@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=900, time_limit=960) def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str: """Synthesize technique pages from classified key moments. @@ -2442,7 +2448,7 @@ def fetch_creator_avatar(creator_id: str) -> dict: # ── Highlight Detection ────────────────────────────────────────────────────── -@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=120, time_limit=180) def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str: """Score all KeyMoments for a video and upsert HighlightCandidates. diff --git a/backend/tasks/notifications.py b/backend/tasks/notifications.py index f9aa7cf..789fa0d 100644 --- a/backend/tasks/notifications.py +++ b/backend/tasks/notifications.py @@ -218,7 +218,7 @@ def _process_user_digest(session: Session, user: User, settings) -> None: if not new_posts and not new_technique_pages: continue - base_url = f"http://localhost:8096" # TODO: make configurable via settings + base_url = settings.base_url.rstrip("/") group = { "creator_name": creator.name, diff --git a/backend/tests/notifications/conftest.py b/backend/tests/notifications/conftest.py deleted file mode 100644 index 8610828..0000000 --- a/backend/tests/notifications/conftest.py +++ /dev/null @@ -1,3 +0,0 @@ -# Standalone conftest for notification tests — intentionally shadows -# the parent conftest.py to avoid importing the full app (which may -# reference routers not present in the running container image). diff --git a/backend/tests/notifications/pytest.ini b/backend/tests/notifications/pytest.ini deleted file mode 100644 index 2f4c80e..0000000 --- a/backend/tests/notifications/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -asyncio_mode = auto diff --git a/backend/tests/notifications/test_notifications.py b/backend/tests/notifications/test_notifications.py deleted file mode 100644 index 207bbf5..0000000 --- a/backend/tests/notifications/test_notifications.py +++ /dev/null @@ -1,395 +0,0 @@ -"""Integration tests for notification preferences, unsubscribe, and digest task. - -Tests: - - GET/PUT notification preferences (auth required) - - PUT with invalid digest_frequency value → 422 - - GET/PUT without auth → 401 - - Unsubscribe with valid token - - Unsubscribe with expired token - - Unsubscribe with tampered token - - Digest task happy path (mocked SMTP) - - Digest task skips when no new content - - Digest task no-op when SMTP unconfigured - -Uses a standalone ASGI test client to avoid importing the full app -(which may reference routers not present in the running container image). -""" - -from __future__ import annotations - -import os -import pathlib -import sys -import time -import uuid -from datetime import datetime, timezone -from unittest.mock import MagicMock, patch - -import jwt -import pytest -import pytest_asyncio -from httpx import ASGITransport, AsyncClient -from sqlalchemy import select, update as sa_update -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine -from sqlalchemy.orm import Session, sessionmaker -from sqlalchemy.pool import NullPool - -# Ensure backend/ on path -sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent)) - -from database import Base, get_session # noqa: E402 -from models import ( # noqa: E402 - Creator, - CreatorFollow, - EmailDigestLog, - InviteCode, - Post, - User, -) - -TEST_DATABASE_URL = os.getenv( - "TEST_DATABASE_URL", - "postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test", -) -TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace( - "postgresql+asyncpg://", "postgresql+psycopg2://" -) - -_SECRET = "changeme-generate-a-real-secret" - - -# ── Standalone test app ────────────────────────────────────────────────────── - -def _make_test_app(): - """Build a minimal FastAPI app with only the notifications router + auth.""" - from fastapi import FastAPI - from routers.notifications import router as notif_router - from routers.auth import router as auth_router - - test_app = FastAPI() - test_app.include_router(auth_router, prefix="/api/v1") - test_app.include_router(notif_router, prefix="/api/v1") - return test_app - - -# ── Fixtures ───────────────────────────────────────────────────────────────── - -@pytest_asyncio.fixture() -async def db_engine(): - engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool) - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await conn.run_sync(Base.metadata.create_all) - yield engine - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await engine.dispose() - - -@pytest_asyncio.fixture() -async def sync_engine_fix(db_engine): - """Sync engine pointing at same test DB (tables created by db_engine).""" - from sqlalchemy import create_engine - engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool) - yield engine - engine.dispose() - - -@pytest_asyncio.fixture() -async def client(db_engine, tmp_path): - test_app = _make_test_app() - session_factory = async_sessionmaker( - db_engine, class_=AsyncSession, expire_on_commit=False - ) - - async def _override_get_session(): - async with session_factory() as session: - yield session - - test_app.dependency_overrides[get_session] = _override_get_session - - # Clear settings cache so tests get fresh defaults - from config import get_settings - get_settings.cache_clear() - - transport = ASGITransport(app=test_app) - async with AsyncClient(transport=transport, base_url="http://testserver") as ac: - yield ac - - test_app.dependency_overrides.clear() - get_settings.cache_clear() - - -@pytest_asyncio.fixture() -async def invite_code(db_engine): - factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) - async with factory() as session: - code = InviteCode(code="NOTIF-TEST-2026", uses_remaining=10) - session.add(code) - await session.commit() - return "NOTIF-TEST-2026" - - -@pytest_asyncio.fixture() -async def registered_user(client, invite_code): - resp = await client.post("/api/v1/auth/register", json={ - "email": "notiftest@chrysopedia.com", - "password": "testpass123", - "display_name": "Notif Test User", - "invite_code": invite_code, - }) - assert resp.status_code == 201, f"Register failed: {resp.text}" - return resp.json() - - -@pytest_asyncio.fixture() -async def auth_headers(client, registered_user): - resp = await client.post("/api/v1/auth/login", json={ - "email": "notiftest@chrysopedia.com", - "password": "testpass123", - }) - assert resp.status_code == 200, f"Login failed: {resp.text}" - token = resp.json()["access_token"] - return {"Authorization": f"Bearer {token}"} - - -# ── Helpers ────────────────────────────────────────────────────────────────── - -def _make_unsubscribe_token(user_id: str, secret: str = _SECRET, iat: float | None = None) -> str: - payload = { - "sub": user_id, - "purpose": "unsubscribe", - "iat": iat or datetime.now(timezone.utc).timestamp(), - } - return jwt.encode(payload, secret, algorithm="HS256") - - -# ── Preference endpoint tests ─────────────────────────────────────────────── - - -@pytest.mark.asyncio -async def test_get_preferences_requires_auth(client): - resp = await client.get("/api/v1/notifications/preferences") - assert resp.status_code == 401 - - -@pytest.mark.asyncio -async def test_put_preferences_requires_auth(client): - resp = await client.put( - "/api/v1/notifications/preferences", - json={"email_digests": False}, - ) - assert resp.status_code == 401 - - -@pytest.mark.asyncio -async def test_get_preferences_defaults(client, auth_headers): - resp = await client.get("/api/v1/notifications/preferences", headers=auth_headers) - assert resp.status_code == 200 - data = resp.json() - assert data["email_digests"] is True - assert data["digest_frequency"] == "daily" - - -@pytest.mark.asyncio -async def test_put_preferences_update_digests(client, auth_headers): - resp = await client.put( - "/api/v1/notifications/preferences", - json={"email_digests": False}, - headers=auth_headers, - ) - assert resp.status_code == 200 - assert resp.json()["email_digests"] is False - assert resp.json()["digest_frequency"] == "daily" - - # Verify persistence - resp2 = await client.get("/api/v1/notifications/preferences", headers=auth_headers) - assert resp2.json()["email_digests"] is False - - -@pytest.mark.asyncio -async def test_put_preferences_update_frequency(client, auth_headers): - resp = await client.put( - "/api/v1/notifications/preferences", - json={"digest_frequency": "weekly"}, - headers=auth_headers, - ) - assert resp.status_code == 200 - assert resp.json()["digest_frequency"] == "weekly" - - -@pytest.mark.asyncio -async def test_put_preferences_invalid_frequency(client, auth_headers): - resp = await client.put( - "/api/v1/notifications/preferences", - json={"digest_frequency": "monthly"}, - headers=auth_headers, - ) - assert resp.status_code == 422 - - -# ── Unsubscribe endpoint tests ────────────────────────────────────────────── - - -@pytest.mark.asyncio -async def test_unsubscribe_valid_token(client, auth_headers, db_engine): - factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) - async with factory() as session: - result = await session.execute(select(User).limit(1)) - user = result.scalar_one() - user_id = str(user.id) - - token = _make_unsubscribe_token(user_id) - resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}") - assert resp.status_code == 200 - assert "Unsubscribed" in resp.text - - # Verify preferences updated - async with factory() as session: - result = await session.execute(select(User).where(User.id == user.id)) - updated = result.scalar_one() - assert updated.notification_preferences.get("email_digests") is False - - -@pytest.mark.asyncio -async def test_unsubscribe_expired_token(client, auth_headers, db_engine): - factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) - async with factory() as session: - result = await session.execute(select(User).limit(1)) - user = result.scalar_one() - - expired_iat = time.time() - (31 * 24 * 3600) - token = _make_unsubscribe_token(str(user.id), iat=expired_iat) - resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}") - assert resp.status_code == 400 - - -@pytest.mark.asyncio -async def test_unsubscribe_tampered_token(client): - token = _make_unsubscribe_token(str(uuid.uuid4()), secret="wrong-secret") - resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}") - assert resp.status_code == 400 - - -# ── Digest task tests ──────────────────────────────────────────────────────── - - -@pytest.mark.asyncio -async def test_digest_task_smtp_unconfigured(): - """Digest task gracefully skips when SMTP is not configured.""" - from tasks.notifications import send_digest_emails - with patch("tasks.notifications.is_smtp_configured", return_value=False): - result = send_digest_emails() - assert result["skipped"] is True - assert result["reason"] == "smtp_not_configured" - - -@pytest.mark.asyncio -async def test_digest_task_happy_path(client, auth_headers, db_engine, sync_engine_fix): - """Full digest: user follows creator, new post exists, SMTP mocked → email sent.""" - sync_factory = sessionmaker(bind=sync_engine_fix) - - factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) - async with factory() as session: - result = await session.execute(select(User).limit(1)) - user = result.scalar_one() - user_id = user.id - user_email = user.email - - # Ensure digests enabled - await session.execute( - sa_update(User).where(User.id == user_id).values( - notification_preferences={"email_digests": True, "digest_frequency": "daily"} - ) - ) - await session.commit() - - # Create creator - creator = Creator( - name="DigestTestCreator", - slug="digest-test-creator", - folder_name="DigestTestCreator", - ) - session.add(creator) - await session.flush() - creator_id = creator.id - - # Follow - follow = CreatorFollow(user_id=user_id, creator_id=creator_id) - session.add(follow) - - # Published post - post = Post( - creator_id=creator_id, - title="Test Digest Post", - body_json={"blocks": [{"type": "paragraph", "content": "Some content"}]}, - is_published=True, - ) - session.add(post) - await session.commit() - - # Mock SMTP and sync session - mock_send = MagicMock(return_value=True) - - with ( - patch("tasks.notifications.is_smtp_configured", return_value=True), - patch("tasks.notifications.send_email", mock_send), - patch("tasks.notifications._get_sync_session") as mock_session_fn, - ): - sync_session = sync_factory() - mock_session_fn.return_value = sync_session - - from tasks.notifications import send_digest_emails - result = send_digest_emails() - - assert result["sent"] >= 1 - assert mock_send.called - call_args = mock_send.call_args - assert user_email in str(call_args) - - # Verify EmailDigestLog - verify_session = sync_factory() - try: - logs = verify_session.execute( - select(EmailDigestLog).where(EmailDigestLog.user_id == user_id) - ).scalars().all() - assert len(logs) >= 1 - finally: - verify_session.close() - - -@pytest.mark.asyncio -async def test_digest_task_no_new_content(client, auth_headers, db_engine, sync_engine_fix): - """Digest task skips users with no new content.""" - sync_factory = sessionmaker(bind=sync_engine_fix) - - factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) - async with factory() as session: - result = await session.execute(select(User).limit(1)) - user = result.scalar_one() - user_id = user.id - - # Digests enabled but no follows - await session.execute( - sa_update(User).where(User.id == user_id).values( - notification_preferences={"email_digests": True, "digest_frequency": "daily"} - ) - ) - await session.commit() - - mock_send = MagicMock(return_value=True) - - with ( - patch("tasks.notifications.is_smtp_configured", return_value=True), - patch("tasks.notifications.send_email", mock_send), - patch("tasks.notifications._get_sync_session") as mock_session_fn, - ): - sync_session = sync_factory() - mock_session_fn.return_value = sync_session - - from tasks.notifications import send_digest_emails - result = send_digest_emails() - - assert result["sent"] == 0 - assert result["skipped"] >= 1 - assert not mock_send.called diff --git a/docker-compose.yml b/docker-compose.yml index 41e4f4a..29caaea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -122,7 +122,8 @@ services: QDRANT_URL: http://chrysopedia-qdrant:6333 EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1 LLM_FALLBACK_URL: http://chrysopedia-ollama:11434/v1 - LLM_FALLBACK_MODEL: fyn-llm-agent-chat + LLM_FALLBACK_MODEL: ${LLM_FALLBACK_MODEL:-qwen2.5:7b} + BASE_URL: ${BASE_URL:-http://ub01:8096} PROMPTS_PATH: /prompts volumes: - /vmPool/r/services/chrysopedia_data:/data @@ -156,6 +157,9 @@ services: REDIS_URL: redis://chrysopedia-redis:6379/0 QDRANT_URL: http://chrysopedia-qdrant:6333 EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1 + LLM_FALLBACK_URL: http://chrysopedia-ollama:11434/v1 + LLM_FALLBACK_MODEL: ${LLM_FALLBACK_MODEL:-qwen2.5:7b} + BASE_URL: ${BASE_URL:-http://ub01:8096} PROMPTS_PATH: /prompts command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"] healthcheck: