diff --git a/backend/tasks/__init__.py b/backend/tasks/__init__.py new file mode 100644 index 0000000..9a218ab --- /dev/null +++ b/backend/tasks/__init__.py @@ -0,0 +1 @@ +# Notification tasks package diff --git a/backend/tasks/notifications.py b/backend/tasks/notifications.py new file mode 100644 index 0000000..f9aa7cf --- /dev/null +++ b/backend/tasks/notifications.py @@ -0,0 +1,289 @@ +"""Celery task: send batched email digests to followers. + +Queries new content (posts + technique pages) published since each user's +last digest, groups by followed creator, composes HTML via the email service, +sends, and logs successful deliveries in EmailDigestLog. +""" + +from __future__ import annotations + +import json +import logging +import time +from datetime import datetime, timezone + +import jwt +from sqlalchemy import create_engine, func, select +from sqlalchemy.orm import Session, sessionmaker + +from config import get_settings +from models import ( + Creator, + CreatorFollow, + EmailDigestLog, + Post, + SourceVideo, + TechniquePage, + TechniquePageVideo, + User, +) +from services.email import compose_digest_html, is_smtp_configured, send_email +from worker import celery_app + +logger = logging.getLogger(__name__) + +# ── Sync DB (same pattern as pipeline/stages.py) ──────────────────────────── + +_engine = None +_SessionLocal = None + + +def _get_sync_engine(): + global _engine + if _engine is None: + settings = get_settings() + url = settings.database_url.replace( + "postgresql+asyncpg://", "postgresql+psycopg2://" + ) + _engine = create_engine(url, pool_pre_ping=True, pool_size=2, max_overflow=3) + return _engine + + +def _get_sync_session() -> Session: + global _SessionLocal + if _SessionLocal is None: + _SessionLocal = sessionmaker(bind=_get_sync_engine()) + return _SessionLocal() + + +# ── Unsubscribe token helpers ──────────────────────────────────────────────── + +_UNSUB_TOKEN_MAX_AGE = 30 * 24 * 3600 # 30 days in seconds + + +def generate_unsubscribe_token(user_id: str, secret_key: str) -> str: + """Create a signed JWT encoding the user_id for unsubscribe links.""" + payload = { + "sub": str(user_id), + "purpose": "unsubscribe", + "iat": datetime.now(timezone.utc), + } + return jwt.encode(payload, secret_key, algorithm="HS256") + + +def verify_unsubscribe_token(token: str, secret_key: str) -> str | None: + """Decode and verify an unsubscribe token. + + Returns user_id string on success, None on failure/expiry. + """ + try: + payload = jwt.decode( + token, + secret_key, + algorithms=["HS256"], + options={"require": ["sub", "purpose", "iat"]}, + ) + if payload.get("purpose") != "unsubscribe": + return None + # Check age manually (PyJWT exp claim would also work, but we + # want the max-age to be configurable without re-signing) + iat = payload.get("iat", 0) + if isinstance(iat, datetime): + iat = iat.timestamp() + age = time.time() - iat + if age > _UNSUB_TOKEN_MAX_AGE: + return None + return payload["sub"] + except (jwt.InvalidTokenError, KeyError): + return None + + +# ── Main digest task ───────────────────────────────────────────────────────── + + +@celery_app.task(name="tasks.notifications.send_digest_emails", bind=True) +def send_digest_emails(self): + """Query new content per followed creator and send digest emails. + + Runs as a Celery Beat scheduled task (daily at 09:00 UTC). + Graceful no-op when SMTP is not configured. + """ + settings = get_settings() + + if not is_smtp_configured(settings): + logger.warning( + "send_digest_emails: SMTP not configured, skipping digest run" + ) + return {"skipped": True, "reason": "smtp_not_configured"} + + session = _get_sync_session() + sent_count = 0 + error_count = 0 + skipped_count = 0 + + try: + logger.info("send_digest_emails: starting digest run") + + # Find users with email digests enabled + users = session.execute( + select(User).where( + User.notification_preferences["email_digests"].as_boolean() == True, # noqa: E712 + User.is_active == True, # noqa: E712 + ) + ).scalars().all() + + logger.info("send_digest_emails: found %d eligible users", len(users)) + + for user in users: + try: + _process_user_digest(session, user, settings) + sent_count += 1 + except _NoNewContent: + skipped_count += 1 + except Exception: + error_count += 1 + logger.exception( + "send_digest_emails: error processing digest for user_id=%s", + user.id, + ) + + logger.info( + "send_digest_emails: complete — sent=%d skipped=%d errors=%d", + sent_count, + skipped_count, + error_count, + ) + return { + "sent": sent_count, + "skipped": skipped_count, + "errors": error_count, + } + + finally: + session.close() + + +class _NoNewContent(Exception): + """Sentinel raised when a user has no new content to digest.""" + + +def _process_user_digest(session: Session, user: User, settings) -> None: + """Build and send a digest email for one user. Raises _NoNewContent if empty.""" + # Last digest timestamp (or epoch if never sent) + last_digest_row = session.execute( + select(func.max(EmailDigestLog.digest_sent_at)).where( + EmailDigestLog.user_id == user.id + ) + ).scalar() + last_digest_at = last_digest_row or datetime(2000, 1, 1) + + # Get followed creator IDs + followed_creator_ids = session.execute( + select(CreatorFollow.creator_id).where(CreatorFollow.user_id == user.id) + ).scalars().all() + + if not followed_creator_ids: + raise _NoNewContent() + + # Collect new content per creator + creator_content_groups: list[dict] = [] + content_summary_items: list[dict] = [] + + for creator_id in followed_creator_ids: + # Get creator name + creator = session.execute( + select(Creator).where(Creator.id == creator_id) + ).scalar_one_or_none() + if not creator: + continue + + # New published posts + new_posts = session.execute( + select(Post).where( + Post.creator_id == creator_id, + Post.is_published == True, # noqa: E712 + Post.created_at > last_digest_at, + ) + ).scalars().all() + + # New technique pages (via TechniquePageVideo → SourceVideo for creator match) + # TechniquePage has creator_id directly, so we can query that + new_technique_pages = session.execute( + select(TechniquePage).where( + TechniquePage.creator_id == creator_id, + TechniquePage.created_at > last_digest_at, + ) + ).scalars().all() + + if not new_posts and not new_technique_pages: + continue + + base_url = f"http://localhost:8096" # TODO: make configurable via settings + + group = { + "creator_name": creator.name, + "posts": [ + { + "title": p.title, + "url": f"{base_url}/creators/{creator.slug}/posts/{p.id}", + } + for p in new_posts + ], + "technique_pages": [ + { + "title": tp.title, + "url": f"{base_url}/techniques/{tp.slug}", + } + for tp in new_technique_pages + ], + } + creator_content_groups.append(group) + + # Track for content_summary log + for p in new_posts: + content_summary_items.append( + {"type": "post", "id": str(p.id), "title": p.title} + ) + for tp in new_technique_pages: + content_summary_items.append( + {"type": "technique_page", "id": str(tp.id), "title": tp.title} + ) + + if not creator_content_groups: + raise _NoNewContent() + + # Generate signed unsubscribe URL + token = generate_unsubscribe_token(str(user.id), settings.app_secret_key) + unsubscribe_url = f"{base_url}/api/v1/notifications/unsubscribe?token={token}" + + # Compose and send + html = compose_digest_html( + user_display_name=user.display_name, + creator_content_groups=creator_content_groups, + unsubscribe_url=unsubscribe_url, + ) + + subject = "Chrysopedia Digest — New content from creators you follow" + success = send_email(user.email, subject, html, settings) + + if success: + # Log successful send (deduplication anchor) + log_entry = EmailDigestLog( + user_id=user.id, + content_summary={"items": content_summary_items}, + ) + session.add(log_entry) + session.commit() + logger.info( + "send_digest_emails: sent digest to user_id=%s (%d creators, %d items)", + user.id, + len(creator_content_groups), + len(content_summary_items), + ) + else: + logger.error( + "send_digest_emails: failed to send digest to user_id=%s email=%s", + user.id, + user.email, + ) + raise RuntimeError(f"Email send failed for user {user.id}") diff --git a/backend/worker.py b/backend/worker.py index 7eea336..1a0fb76 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -2,9 +2,11 @@ Usage: celery -A worker worker --loglevel=info + celery -A worker worker --beat --loglevel=info (with Beat scheduler) """ from celery import Celery +from celery.schedules import crontab from config import get_settings @@ -30,3 +32,13 @@ celery_app.conf.update( # Import pipeline.stages so that @celery_app.task decorators register tasks. # This import must come after celery_app is defined. import pipeline.stages # noqa: E402, F401 +import tasks.notifications # noqa: E402, F401 + +# ── Celery Beat schedule ───────────────────────────────────────────────────── + +celery_app.conf.beat_schedule = { + "send-digest-emails": { + "task": "tasks.notifications.send_digest_emails", + "schedule": crontab(hour=9, minute=0), # daily at 09:00 UTC + }, +} diff --git a/docker-compose.yml b/docker-compose.yml index 5d16c98..c349d97 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -155,7 +155,7 @@ services: QDRANT_URL: http://chrysopedia-qdrant:6333 EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1 PROMPTS_PATH: /prompts - command: ["celery", "-A", "worker", "worker", "--loglevel=info", "--concurrency=1"] + command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"] healthcheck: test: ["CMD-SHELL", "celery -A worker inspect ping --timeout=5 2>/dev/null | grep -q pong || exit 1"] interval: 30s