"""Celery task: send batched email digests to followers. Queries new content (posts + technique pages) published since each user's last digest, groups by followed creator, composes HTML via the email service, sends, and logs successful deliveries in EmailDigestLog. """ from __future__ import annotations import json import logging import time from datetime import datetime, timezone import jwt from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker from config import get_settings from models import ( Creator, CreatorFollow, EmailDigestLog, Post, SourceVideo, TechniquePage, TechniquePageVideo, User, ) from services.email import compose_digest_html, is_smtp_configured, send_email from worker import celery_app logger = logging.getLogger(__name__) # ── Sync DB (same pattern as pipeline/stages.py) ──────────────────────────── _engine = None _SessionLocal = None def _get_sync_engine(): global _engine if _engine is None: settings = get_settings() url = settings.database_url.replace( "postgresql+asyncpg://", "postgresql+psycopg2://" ) _engine = create_engine(url, pool_pre_ping=True, pool_size=2, max_overflow=3) return _engine def _get_sync_session() -> Session: global _SessionLocal if _SessionLocal is None: _SessionLocal = sessionmaker(bind=_get_sync_engine()) return _SessionLocal() # ── Unsubscribe token helpers ──────────────────────────────────────────────── _UNSUB_TOKEN_MAX_AGE = 30 * 24 * 3600 # 30 days in seconds def generate_unsubscribe_token(user_id: str, secret_key: str) -> str: """Create a signed JWT encoding the user_id for unsubscribe links.""" payload = { "sub": str(user_id), "purpose": "unsubscribe", "iat": datetime.now(timezone.utc), } return jwt.encode(payload, secret_key, algorithm="HS256") def verify_unsubscribe_token(token: str, secret_key: str) -> str | None: """Decode and verify an unsubscribe token. Returns user_id string on success, None on failure/expiry. """ try: payload = jwt.decode( token, secret_key, algorithms=["HS256"], options={"require": ["sub", "purpose", "iat"]}, ) if payload.get("purpose") != "unsubscribe": return None # Check age manually (PyJWT exp claim would also work, but we # want the max-age to be configurable without re-signing) iat = payload.get("iat", 0) if isinstance(iat, datetime): iat = iat.timestamp() age = time.time() - iat if age > _UNSUB_TOKEN_MAX_AGE: return None return payload["sub"] except (jwt.InvalidTokenError, KeyError): return None # ── Main digest task ───────────────────────────────────────────────────────── @celery_app.task(name="tasks.notifications.send_digest_emails", bind=True) def send_digest_emails(self): """Query new content per followed creator and send digest emails. Runs as a Celery Beat scheduled task (daily at 09:00 UTC). Graceful no-op when SMTP is not configured. """ settings = get_settings() if not is_smtp_configured(settings): logger.warning( "send_digest_emails: SMTP not configured, skipping digest run" ) return {"skipped": True, "reason": "smtp_not_configured"} session = _get_sync_session() sent_count = 0 error_count = 0 skipped_count = 0 try: logger.info("send_digest_emails: starting digest run") # Find users with email digests enabled users = session.execute( select(User).where( User.notification_preferences["email_digests"].as_boolean() == True, # noqa: E712 User.is_active == True, # noqa: E712 ) ).scalars().all() logger.info("send_digest_emails: found %d eligible users", len(users)) for user in users: try: _process_user_digest(session, user, settings) sent_count += 1 except _NoNewContent: skipped_count += 1 except Exception: error_count += 1 logger.exception( "send_digest_emails: error processing digest for user_id=%s", user.id, ) logger.info( "send_digest_emails: complete — sent=%d skipped=%d errors=%d", sent_count, skipped_count, error_count, ) return { "sent": sent_count, "skipped": skipped_count, "errors": error_count, } finally: session.close() class _NoNewContent(Exception): """Sentinel raised when a user has no new content to digest.""" def _process_user_digest(session: Session, user: User, settings) -> None: """Build and send a digest email for one user. Raises _NoNewContent if empty.""" # Last digest timestamp (or epoch if never sent) last_digest_row = session.execute( select(func.max(EmailDigestLog.digest_sent_at)).where( EmailDigestLog.user_id == user.id ) ).scalar() last_digest_at = last_digest_row or datetime(2000, 1, 1) # Get followed creator IDs followed_creator_ids = session.execute( select(CreatorFollow.creator_id).where(CreatorFollow.user_id == user.id) ).scalars().all() if not followed_creator_ids: raise _NoNewContent() # Collect new content per creator creator_content_groups: list[dict] = [] content_summary_items: list[dict] = [] for creator_id in followed_creator_ids: # Get creator name creator = session.execute( select(Creator).where(Creator.id == creator_id) ).scalar_one_or_none() if not creator: continue # New published posts new_posts = session.execute( select(Post).where( Post.creator_id == creator_id, Post.is_published == True, # noqa: E712 Post.created_at > last_digest_at, ) ).scalars().all() # New technique pages (via TechniquePageVideo → SourceVideo for creator match) # TechniquePage has creator_id directly, so we can query that new_technique_pages = session.execute( select(TechniquePage).where( TechniquePage.creator_id == creator_id, TechniquePage.created_at > last_digest_at, ) ).scalars().all() if not new_posts and not new_technique_pages: continue base_url = f"http://localhost:8096" # TODO: make configurable via settings group = { "creator_name": creator.name, "posts": [ { "title": p.title, "url": f"{base_url}/creators/{creator.slug}/posts/{p.id}", } for p in new_posts ], "technique_pages": [ { "title": tp.title, "url": f"{base_url}/techniques/{tp.slug}", } for tp in new_technique_pages ], } creator_content_groups.append(group) # Track for content_summary log for p in new_posts: content_summary_items.append( {"type": "post", "id": str(p.id), "title": p.title} ) for tp in new_technique_pages: content_summary_items.append( {"type": "technique_page", "id": str(tp.id), "title": tp.title} ) if not creator_content_groups: raise _NoNewContent() # Generate signed unsubscribe URL token = generate_unsubscribe_token(str(user.id), settings.app_secret_key) unsubscribe_url = f"{base_url}/api/v1/notifications/unsubscribe?token={token}" # Compose and send html = compose_digest_html( user_display_name=user.display_name, creator_content_groups=creator_content_groups, unsubscribe_url=unsubscribe_url, ) subject = "Chrysopedia Digest — New content from creators you follow" success = send_email(user.email, subject, html, settings) if success: # Log successful send (deduplication anchor) log_entry = EmailDigestLog( user_id=user.id, content_summary={"items": content_summary_items}, ) session.add(log_entry) session.commit() logger.info( "send_digest_emails: sent digest to user_id=%s (%d creators, %d items)", user.id, len(creator_content_groups), len(content_summary_items), ) else: logger.error( "send_digest_emails: failed to send digest to user_id=%s email=%s", user.id, user.email, ) raise RuntimeError(f"Email send failed for user {user.id}")