fix: LLM config, task time limits, base_url, fallback model, debug Redis caching
- Add BASE_URL setting to config.py, replace hardcoded localhost:8096 in notifications - Fix LLM_FALLBACK_MODEL default from fyn-llm-agent-chat to qwen2.5:7b - Fix docker-compose LLM_FALLBACK_MODEL to use env var with correct default - Add BASE_URL env var to API and worker in docker-compose.yml - Add soft_time_limit/time_limit to all pipeline stage tasks (prevent stuck workers) - Cache Redis connection in _is_debug_mode() instead of creating per-call - Remove duplicate test files in backend/tests/notifications/
This commit is contained in:
parent
526fd0a58c
commit
39b5d7c0be
7 changed files with 27 additions and 414 deletions
|
|
@ -31,7 +31,7 @@ class Settings(BaseSettings):
|
||||||
llm_api_key: str = "sk-placeholder"
|
llm_api_key: str = "sk-placeholder"
|
||||||
llm_model: str = "fyn-llm-agent-chat"
|
llm_model: str = "fyn-llm-agent-chat"
|
||||||
llm_fallback_url: str = "http://localhost:11434/v1"
|
llm_fallback_url: str = "http://localhost:11434/v1"
|
||||||
llm_fallback_model: str = "fyn-llm-agent-chat"
|
llm_fallback_model: str = "qwen2.5:7b"
|
||||||
|
|
||||||
# Per-stage model overrides (optional — falls back to llm_model / "chat")
|
# Per-stage model overrides (optional — falls back to llm_model / "chat")
|
||||||
llm_stage2_model: str | None = "fyn-llm-agent-chat" # segmentation — mechanical, fast chat
|
llm_stage2_model: str | None = "fyn-llm-agent-chat" # segmentation — mechanical, fast chat
|
||||||
|
|
@ -91,6 +91,9 @@ class Settings(BaseSettings):
|
||||||
smtp_from_address: str = ""
|
smtp_from_address: str = ""
|
||||||
smtp_tls: bool = True
|
smtp_tls: bool = True
|
||||||
|
|
||||||
|
# Public base URL for links in emails and external references
|
||||||
|
base_url: str = "http://localhost:8096"
|
||||||
|
|
||||||
# Rate limiting (per hour)
|
# Rate limiting (per hour)
|
||||||
rate_limit_user_per_hour: int = 30
|
rate_limit_user_per_hour: int = 30
|
||||||
rate_limit_ip_per_hour: int = 10
|
rate_limit_ip_per_hour: int = 10
|
||||||
|
|
|
||||||
|
|
@ -127,19 +127,25 @@ def _emit_event(
|
||||||
|
|
||||||
|
|
||||||
def _is_debug_mode() -> bool:
|
def _is_debug_mode() -> bool:
|
||||||
"""Check if debug mode is enabled via Redis. Falls back to config setting."""
|
"""Check if debug mode is enabled via Redis. Falls back to config setting.
|
||||||
|
|
||||||
|
Uses a module-level Redis connection to avoid per-call connection overhead.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
import redis
|
import redis
|
||||||
settings = get_settings()
|
global _debug_redis
|
||||||
r = redis.from_url(settings.redis_url)
|
if _debug_redis is None:
|
||||||
val = r.get("chrysopedia:debug_mode")
|
settings = get_settings()
|
||||||
r.close()
|
_debug_redis = redis.from_url(settings.redis_url, socket_connect_timeout=1)
|
||||||
|
val = _debug_redis.get("chrysopedia:debug_mode")
|
||||||
if val is not None:
|
if val is not None:
|
||||||
return val.decode().lower() == "true"
|
return val.decode().lower() == "true"
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
_debug_redis = None # Reset on error so next call retries
|
||||||
return getattr(get_settings(), "debug_mode", False)
|
return getattr(get_settings(), "debug_mode", False)
|
||||||
|
|
||||||
|
_debug_redis = None
|
||||||
|
|
||||||
|
|
||||||
def _make_llm_callback(
|
def _make_llm_callback(
|
||||||
video_id: str,
|
video_id: str,
|
||||||
|
|
@ -389,7 +395,7 @@ def _safe_parse_llm_response(
|
||||||
|
|
||||||
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
|
# ── Stage 2: Segmentation ───────────────────────────────────────────────────
|
||||||
|
|
||||||
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=300, time_limit=360)
|
||||||
def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
|
def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
"""Analyze transcript segments and identify topic boundaries.
|
"""Analyze transcript segments and identify topic boundaries.
|
||||||
|
|
||||||
|
|
@ -471,7 +477,7 @@ def stage2_segmentation(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
|
|
||||||
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
|
# ── Stage 3: Extraction ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=600, time_limit=660)
|
||||||
def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str:
|
def stage3_extraction(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
"""Extract key moments from each topic segment group.
|
"""Extract key moments from each topic segment group.
|
||||||
|
|
||||||
|
|
@ -639,7 +645,7 @@ def _classify_moment_batch(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=300, time_limit=360)
|
||||||
def stage4_classification(self, video_id: str, run_id: str | None = None) -> str:
|
def stage4_classification(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
"""Classify key moments against the canonical tag taxonomy.
|
"""Classify key moments against the canonical tag taxonomy.
|
||||||
|
|
||||||
|
|
@ -1248,7 +1254,7 @@ def _merge_pages_by_slug(
|
||||||
return final_pages
|
return final_pages
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=900, time_limit=960)
|
||||||
def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
|
def stage5_synthesis(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
"""Synthesize technique pages from classified key moments.
|
"""Synthesize technique pages from classified key moments.
|
||||||
|
|
||||||
|
|
@ -2442,7 +2448,7 @@ def fetch_creator_avatar(creator_id: str) -> dict:
|
||||||
|
|
||||||
# ── Highlight Detection ──────────────────────────────────────────────────────
|
# ── Highlight Detection ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30, soft_time_limit=120, time_limit=180)
|
||||||
def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str:
|
def stage_highlight_detection(self, video_id: str, run_id: str | None = None) -> str:
|
||||||
"""Score all KeyMoments for a video and upsert HighlightCandidates.
|
"""Score all KeyMoments for a video and upsert HighlightCandidates.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -218,7 +218,7 @@ def _process_user_digest(session: Session, user: User, settings) -> None:
|
||||||
if not new_posts and not new_technique_pages:
|
if not new_posts and not new_technique_pages:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
base_url = f"http://localhost:8096" # TODO: make configurable via settings
|
base_url = settings.base_url.rstrip("/")
|
||||||
|
|
||||||
group = {
|
group = {
|
||||||
"creator_name": creator.name,
|
"creator_name": creator.name,
|
||||||
|
|
|
||||||
|
|
@ -1,3 +0,0 @@
|
||||||
# Standalone conftest for notification tests — intentionally shadows
|
|
||||||
# the parent conftest.py to avoid importing the full app (which may
|
|
||||||
# reference routers not present in the running container image).
|
|
||||||
|
|
@ -1,2 +0,0 @@
|
||||||
[pytest]
|
|
||||||
asyncio_mode = auto
|
|
||||||
|
|
@ -1,395 +0,0 @@
|
||||||
"""Integration tests for notification preferences, unsubscribe, and digest task.
|
|
||||||
|
|
||||||
Tests:
|
|
||||||
- GET/PUT notification preferences (auth required)
|
|
||||||
- PUT with invalid digest_frequency value → 422
|
|
||||||
- GET/PUT without auth → 401
|
|
||||||
- Unsubscribe with valid token
|
|
||||||
- Unsubscribe with expired token
|
|
||||||
- Unsubscribe with tampered token
|
|
||||||
- Digest task happy path (mocked SMTP)
|
|
||||||
- Digest task skips when no new content
|
|
||||||
- Digest task no-op when SMTP unconfigured
|
|
||||||
|
|
||||||
Uses a standalone ASGI test client to avoid importing the full app
|
|
||||||
(which may reference routers not present in the running container image).
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import os
|
|
||||||
import pathlib
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
import jwt
|
|
||||||
import pytest
|
|
||||||
import pytest_asyncio
|
|
||||||
from httpx import ASGITransport, AsyncClient
|
|
||||||
from sqlalchemy import select, update as sa_update
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
||||||
from sqlalchemy.orm import Session, sessionmaker
|
|
||||||
from sqlalchemy.pool import NullPool
|
|
||||||
|
|
||||||
# Ensure backend/ on path
|
|
||||||
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent.parent))
|
|
||||||
|
|
||||||
from database import Base, get_session # noqa: E402
|
|
||||||
from models import ( # noqa: E402
|
|
||||||
Creator,
|
|
||||||
CreatorFollow,
|
|
||||||
EmailDigestLog,
|
|
||||||
InviteCode,
|
|
||||||
Post,
|
|
||||||
User,
|
|
||||||
)
|
|
||||||
|
|
||||||
TEST_DATABASE_URL = os.getenv(
|
|
||||||
"TEST_DATABASE_URL",
|
|
||||||
"postgresql+asyncpg://chrysopedia:changeme@localhost:5433/chrysopedia_test",
|
|
||||||
)
|
|
||||||
TEST_DATABASE_URL_SYNC = TEST_DATABASE_URL.replace(
|
|
||||||
"postgresql+asyncpg://", "postgresql+psycopg2://"
|
|
||||||
)
|
|
||||||
|
|
||||||
_SECRET = "changeme-generate-a-real-secret"
|
|
||||||
|
|
||||||
|
|
||||||
# ── Standalone test app ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _make_test_app():
|
|
||||||
"""Build a minimal FastAPI app with only the notifications router + auth."""
|
|
||||||
from fastapi import FastAPI
|
|
||||||
from routers.notifications import router as notif_router
|
|
||||||
from routers.auth import router as auth_router
|
|
||||||
|
|
||||||
test_app = FastAPI()
|
|
||||||
test_app.include_router(auth_router, prefix="/api/v1")
|
|
||||||
test_app.include_router(notif_router, prefix="/api/v1")
|
|
||||||
return test_app
|
|
||||||
|
|
||||||
|
|
||||||
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def db_engine():
|
|
||||||
engine = create_async_engine(TEST_DATABASE_URL, echo=False, poolclass=NullPool)
|
|
||||||
async with engine.begin() as conn:
|
|
||||||
await conn.run_sync(Base.metadata.drop_all)
|
|
||||||
await conn.run_sync(Base.metadata.create_all)
|
|
||||||
yield engine
|
|
||||||
async with engine.begin() as conn:
|
|
||||||
await conn.run_sync(Base.metadata.drop_all)
|
|
||||||
await engine.dispose()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def sync_engine_fix(db_engine):
|
|
||||||
"""Sync engine pointing at same test DB (tables created by db_engine)."""
|
|
||||||
from sqlalchemy import create_engine
|
|
||||||
engine = create_engine(TEST_DATABASE_URL_SYNC, echo=False, poolclass=NullPool)
|
|
||||||
yield engine
|
|
||||||
engine.dispose()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def client(db_engine, tmp_path):
|
|
||||||
test_app = _make_test_app()
|
|
||||||
session_factory = async_sessionmaker(
|
|
||||||
db_engine, class_=AsyncSession, expire_on_commit=False
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _override_get_session():
|
|
||||||
async with session_factory() as session:
|
|
||||||
yield session
|
|
||||||
|
|
||||||
test_app.dependency_overrides[get_session] = _override_get_session
|
|
||||||
|
|
||||||
# Clear settings cache so tests get fresh defaults
|
|
||||||
from config import get_settings
|
|
||||||
get_settings.cache_clear()
|
|
||||||
|
|
||||||
transport = ASGITransport(app=test_app)
|
|
||||||
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
|
|
||||||
yield ac
|
|
||||||
|
|
||||||
test_app.dependency_overrides.clear()
|
|
||||||
get_settings.cache_clear()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def invite_code(db_engine):
|
|
||||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
async with factory() as session:
|
|
||||||
code = InviteCode(code="NOTIF-TEST-2026", uses_remaining=10)
|
|
||||||
session.add(code)
|
|
||||||
await session.commit()
|
|
||||||
return "NOTIF-TEST-2026"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def registered_user(client, invite_code):
|
|
||||||
resp = await client.post("/api/v1/auth/register", json={
|
|
||||||
"email": "notiftest@chrysopedia.com",
|
|
||||||
"password": "testpass123",
|
|
||||||
"display_name": "Notif Test User",
|
|
||||||
"invite_code": invite_code,
|
|
||||||
})
|
|
||||||
assert resp.status_code == 201, f"Register failed: {resp.text}"
|
|
||||||
return resp.json()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture()
|
|
||||||
async def auth_headers(client, registered_user):
|
|
||||||
resp = await client.post("/api/v1/auth/login", json={
|
|
||||||
"email": "notiftest@chrysopedia.com",
|
|
||||||
"password": "testpass123",
|
|
||||||
})
|
|
||||||
assert resp.status_code == 200, f"Login failed: {resp.text}"
|
|
||||||
token = resp.json()["access_token"]
|
|
||||||
return {"Authorization": f"Bearer {token}"}
|
|
||||||
|
|
||||||
|
|
||||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _make_unsubscribe_token(user_id: str, secret: str = _SECRET, iat: float | None = None) -> str:
|
|
||||||
payload = {
|
|
||||||
"sub": user_id,
|
|
||||||
"purpose": "unsubscribe",
|
|
||||||
"iat": iat or datetime.now(timezone.utc).timestamp(),
|
|
||||||
}
|
|
||||||
return jwt.encode(payload, secret, algorithm="HS256")
|
|
||||||
|
|
||||||
|
|
||||||
# ── Preference endpoint tests ───────────────────────────────────────────────
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_preferences_requires_auth(client):
|
|
||||||
resp = await client.get("/api/v1/notifications/preferences")
|
|
||||||
assert resp.status_code == 401
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_put_preferences_requires_auth(client):
|
|
||||||
resp = await client.put(
|
|
||||||
"/api/v1/notifications/preferences",
|
|
||||||
json={"email_digests": False},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_preferences_defaults(client, auth_headers):
|
|
||||||
resp = await client.get("/api/v1/notifications/preferences", headers=auth_headers)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
data = resp.json()
|
|
||||||
assert data["email_digests"] is True
|
|
||||||
assert data["digest_frequency"] == "daily"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_put_preferences_update_digests(client, auth_headers):
|
|
||||||
resp = await client.put(
|
|
||||||
"/api/v1/notifications/preferences",
|
|
||||||
json={"email_digests": False},
|
|
||||||
headers=auth_headers,
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
assert resp.json()["email_digests"] is False
|
|
||||||
assert resp.json()["digest_frequency"] == "daily"
|
|
||||||
|
|
||||||
# Verify persistence
|
|
||||||
resp2 = await client.get("/api/v1/notifications/preferences", headers=auth_headers)
|
|
||||||
assert resp2.json()["email_digests"] is False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_put_preferences_update_frequency(client, auth_headers):
|
|
||||||
resp = await client.put(
|
|
||||||
"/api/v1/notifications/preferences",
|
|
||||||
json={"digest_frequency": "weekly"},
|
|
||||||
headers=auth_headers,
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
assert resp.json()["digest_frequency"] == "weekly"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_put_preferences_invalid_frequency(client, auth_headers):
|
|
||||||
resp = await client.put(
|
|
||||||
"/api/v1/notifications/preferences",
|
|
||||||
json={"digest_frequency": "monthly"},
|
|
||||||
headers=auth_headers,
|
|
||||||
)
|
|
||||||
assert resp.status_code == 422
|
|
||||||
|
|
||||||
|
|
||||||
# ── Unsubscribe endpoint tests ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_unsubscribe_valid_token(client, auth_headers, db_engine):
|
|
||||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
async with factory() as session:
|
|
||||||
result = await session.execute(select(User).limit(1))
|
|
||||||
user = result.scalar_one()
|
|
||||||
user_id = str(user.id)
|
|
||||||
|
|
||||||
token = _make_unsubscribe_token(user_id)
|
|
||||||
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
|
|
||||||
assert resp.status_code == 200
|
|
||||||
assert "Unsubscribed" in resp.text
|
|
||||||
|
|
||||||
# Verify preferences updated
|
|
||||||
async with factory() as session:
|
|
||||||
result = await session.execute(select(User).where(User.id == user.id))
|
|
||||||
updated = result.scalar_one()
|
|
||||||
assert updated.notification_preferences.get("email_digests") is False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_unsubscribe_expired_token(client, auth_headers, db_engine):
|
|
||||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
async with factory() as session:
|
|
||||||
result = await session.execute(select(User).limit(1))
|
|
||||||
user = result.scalar_one()
|
|
||||||
|
|
||||||
expired_iat = time.time() - (31 * 24 * 3600)
|
|
||||||
token = _make_unsubscribe_token(str(user.id), iat=expired_iat)
|
|
||||||
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
|
|
||||||
assert resp.status_code == 400
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_unsubscribe_tampered_token(client):
|
|
||||||
token = _make_unsubscribe_token(str(uuid.uuid4()), secret="wrong-secret")
|
|
||||||
resp = await client.get(f"/api/v1/notifications/unsubscribe?token={token}")
|
|
||||||
assert resp.status_code == 400
|
|
||||||
|
|
||||||
|
|
||||||
# ── Digest task tests ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_digest_task_smtp_unconfigured():
|
|
||||||
"""Digest task gracefully skips when SMTP is not configured."""
|
|
||||||
from tasks.notifications import send_digest_emails
|
|
||||||
with patch("tasks.notifications.is_smtp_configured", return_value=False):
|
|
||||||
result = send_digest_emails()
|
|
||||||
assert result["skipped"] is True
|
|
||||||
assert result["reason"] == "smtp_not_configured"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_digest_task_happy_path(client, auth_headers, db_engine, sync_engine_fix):
|
|
||||||
"""Full digest: user follows creator, new post exists, SMTP mocked → email sent."""
|
|
||||||
sync_factory = sessionmaker(bind=sync_engine_fix)
|
|
||||||
|
|
||||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
async with factory() as session:
|
|
||||||
result = await session.execute(select(User).limit(1))
|
|
||||||
user = result.scalar_one()
|
|
||||||
user_id = user.id
|
|
||||||
user_email = user.email
|
|
||||||
|
|
||||||
# Ensure digests enabled
|
|
||||||
await session.execute(
|
|
||||||
sa_update(User).where(User.id == user_id).values(
|
|
||||||
notification_preferences={"email_digests": True, "digest_frequency": "daily"}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
# Create creator
|
|
||||||
creator = Creator(
|
|
||||||
name="DigestTestCreator",
|
|
||||||
slug="digest-test-creator",
|
|
||||||
folder_name="DigestTestCreator",
|
|
||||||
)
|
|
||||||
session.add(creator)
|
|
||||||
await session.flush()
|
|
||||||
creator_id = creator.id
|
|
||||||
|
|
||||||
# Follow
|
|
||||||
follow = CreatorFollow(user_id=user_id, creator_id=creator_id)
|
|
||||||
session.add(follow)
|
|
||||||
|
|
||||||
# Published post
|
|
||||||
post = Post(
|
|
||||||
creator_id=creator_id,
|
|
||||||
title="Test Digest Post",
|
|
||||||
body_json={"blocks": [{"type": "paragraph", "content": "Some content"}]},
|
|
||||||
is_published=True,
|
|
||||||
)
|
|
||||||
session.add(post)
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
# Mock SMTP and sync session
|
|
||||||
mock_send = MagicMock(return_value=True)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("tasks.notifications.is_smtp_configured", return_value=True),
|
|
||||||
patch("tasks.notifications.send_email", mock_send),
|
|
||||||
patch("tasks.notifications._get_sync_session") as mock_session_fn,
|
|
||||||
):
|
|
||||||
sync_session = sync_factory()
|
|
||||||
mock_session_fn.return_value = sync_session
|
|
||||||
|
|
||||||
from tasks.notifications import send_digest_emails
|
|
||||||
result = send_digest_emails()
|
|
||||||
|
|
||||||
assert result["sent"] >= 1
|
|
||||||
assert mock_send.called
|
|
||||||
call_args = mock_send.call_args
|
|
||||||
assert user_email in str(call_args)
|
|
||||||
|
|
||||||
# Verify EmailDigestLog
|
|
||||||
verify_session = sync_factory()
|
|
||||||
try:
|
|
||||||
logs = verify_session.execute(
|
|
||||||
select(EmailDigestLog).where(EmailDigestLog.user_id == user_id)
|
|
||||||
).scalars().all()
|
|
||||||
assert len(logs) >= 1
|
|
||||||
finally:
|
|
||||||
verify_session.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_digest_task_no_new_content(client, auth_headers, db_engine, sync_engine_fix):
|
|
||||||
"""Digest task skips users with no new content."""
|
|
||||||
sync_factory = sessionmaker(bind=sync_engine_fix)
|
|
||||||
|
|
||||||
factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
async with factory() as session:
|
|
||||||
result = await session.execute(select(User).limit(1))
|
|
||||||
user = result.scalar_one()
|
|
||||||
user_id = user.id
|
|
||||||
|
|
||||||
# Digests enabled but no follows
|
|
||||||
await session.execute(
|
|
||||||
sa_update(User).where(User.id == user_id).values(
|
|
||||||
notification_preferences={"email_digests": True, "digest_frequency": "daily"}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
mock_send = MagicMock(return_value=True)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("tasks.notifications.is_smtp_configured", return_value=True),
|
|
||||||
patch("tasks.notifications.send_email", mock_send),
|
|
||||||
patch("tasks.notifications._get_sync_session") as mock_session_fn,
|
|
||||||
):
|
|
||||||
sync_session = sync_factory()
|
|
||||||
mock_session_fn.return_value = sync_session
|
|
||||||
|
|
||||||
from tasks.notifications import send_digest_emails
|
|
||||||
result = send_digest_emails()
|
|
||||||
|
|
||||||
assert result["sent"] == 0
|
|
||||||
assert result["skipped"] >= 1
|
|
||||||
assert not mock_send.called
|
|
||||||
|
|
@ -122,7 +122,8 @@ services:
|
||||||
QDRANT_URL: http://chrysopedia-qdrant:6333
|
QDRANT_URL: http://chrysopedia-qdrant:6333
|
||||||
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
|
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
|
||||||
LLM_FALLBACK_URL: http://chrysopedia-ollama:11434/v1
|
LLM_FALLBACK_URL: http://chrysopedia-ollama:11434/v1
|
||||||
LLM_FALLBACK_MODEL: fyn-llm-agent-chat
|
LLM_FALLBACK_MODEL: ${LLM_FALLBACK_MODEL:-qwen2.5:7b}
|
||||||
|
BASE_URL: ${BASE_URL:-http://ub01:8096}
|
||||||
PROMPTS_PATH: /prompts
|
PROMPTS_PATH: /prompts
|
||||||
volumes:
|
volumes:
|
||||||
- /vmPool/r/services/chrysopedia_data:/data
|
- /vmPool/r/services/chrysopedia_data:/data
|
||||||
|
|
@ -156,6 +157,9 @@ services:
|
||||||
REDIS_URL: redis://chrysopedia-redis:6379/0
|
REDIS_URL: redis://chrysopedia-redis:6379/0
|
||||||
QDRANT_URL: http://chrysopedia-qdrant:6333
|
QDRANT_URL: http://chrysopedia-qdrant:6333
|
||||||
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
|
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
|
||||||
|
LLM_FALLBACK_URL: http://chrysopedia-ollama:11434/v1
|
||||||
|
LLM_FALLBACK_MODEL: ${LLM_FALLBACK_MODEL:-qwen2.5:7b}
|
||||||
|
BASE_URL: ${BASE_URL:-http://ub01:8096}
|
||||||
PROMPTS_PATH: /prompts
|
PROMPTS_PATH: /prompts
|
||||||
command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"]
|
command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"]
|
||||||
healthcheck:
|
healthcheck:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue