feat: Built send_digest_emails Celery task with per-user content queryi…
- "backend/tasks/__init__.py" - "backend/tasks/notifications.py" - "backend/worker.py" - "docker-compose.yml" GSD-Task: S01/T02
This commit is contained in:
parent
067d7ed332
commit
a3f2c4f332
4 changed files with 303 additions and 1 deletions
1
backend/tasks/__init__.py
Normal file
1
backend/tasks/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
# Notification tasks package
|
||||
289
backend/tasks/notifications.py
Normal file
289
backend/tasks/notifications.py
Normal file
|
|
@ -0,0 +1,289 @@
|
|||
"""Celery task: send batched email digests to followers.
|
||||
|
||||
Queries new content (posts + technique pages) published since each user's
|
||||
last digest, groups by followed creator, composes HTML via the email service,
|
||||
sends, and logs successful deliveries in EmailDigestLog.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import jwt
|
||||
from sqlalchemy import create_engine, func, select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from config import get_settings
|
||||
from models import (
|
||||
Creator,
|
||||
CreatorFollow,
|
||||
EmailDigestLog,
|
||||
Post,
|
||||
SourceVideo,
|
||||
TechniquePage,
|
||||
TechniquePageVideo,
|
||||
User,
|
||||
)
|
||||
from services.email import compose_digest_html, is_smtp_configured, send_email
|
||||
from worker import celery_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Sync DB (same pattern as pipeline/stages.py) ────────────────────────────
|
||||
|
||||
_engine = None
|
||||
_SessionLocal = None
|
||||
|
||||
|
||||
def _get_sync_engine():
|
||||
global _engine
|
||||
if _engine is None:
|
||||
settings = get_settings()
|
||||
url = settings.database_url.replace(
|
||||
"postgresql+asyncpg://", "postgresql+psycopg2://"
|
||||
)
|
||||
_engine = create_engine(url, pool_pre_ping=True, pool_size=2, max_overflow=3)
|
||||
return _engine
|
||||
|
||||
|
||||
def _get_sync_session() -> Session:
|
||||
global _SessionLocal
|
||||
if _SessionLocal is None:
|
||||
_SessionLocal = sessionmaker(bind=_get_sync_engine())
|
||||
return _SessionLocal()
|
||||
|
||||
|
||||
# ── Unsubscribe token helpers ────────────────────────────────────────────────
|
||||
|
||||
_UNSUB_TOKEN_MAX_AGE = 30 * 24 * 3600 # 30 days in seconds
|
||||
|
||||
|
||||
def generate_unsubscribe_token(user_id: str, secret_key: str) -> str:
|
||||
"""Create a signed JWT encoding the user_id for unsubscribe links."""
|
||||
payload = {
|
||||
"sub": str(user_id),
|
||||
"purpose": "unsubscribe",
|
||||
"iat": datetime.now(timezone.utc),
|
||||
}
|
||||
return jwt.encode(payload, secret_key, algorithm="HS256")
|
||||
|
||||
|
||||
def verify_unsubscribe_token(token: str, secret_key: str) -> str | None:
|
||||
"""Decode and verify an unsubscribe token.
|
||||
|
||||
Returns user_id string on success, None on failure/expiry.
|
||||
"""
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
secret_key,
|
||||
algorithms=["HS256"],
|
||||
options={"require": ["sub", "purpose", "iat"]},
|
||||
)
|
||||
if payload.get("purpose") != "unsubscribe":
|
||||
return None
|
||||
# Check age manually (PyJWT exp claim would also work, but we
|
||||
# want the max-age to be configurable without re-signing)
|
||||
iat = payload.get("iat", 0)
|
||||
if isinstance(iat, datetime):
|
||||
iat = iat.timestamp()
|
||||
age = time.time() - iat
|
||||
if age > _UNSUB_TOKEN_MAX_AGE:
|
||||
return None
|
||||
return payload["sub"]
|
||||
except (jwt.InvalidTokenError, KeyError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Main digest task ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@celery_app.task(name="tasks.notifications.send_digest_emails", bind=True)
|
||||
def send_digest_emails(self):
|
||||
"""Query new content per followed creator and send digest emails.
|
||||
|
||||
Runs as a Celery Beat scheduled task (daily at 09:00 UTC).
|
||||
Graceful no-op when SMTP is not configured.
|
||||
"""
|
||||
settings = get_settings()
|
||||
|
||||
if not is_smtp_configured(settings):
|
||||
logger.warning(
|
||||
"send_digest_emails: SMTP not configured, skipping digest run"
|
||||
)
|
||||
return {"skipped": True, "reason": "smtp_not_configured"}
|
||||
|
||||
session = _get_sync_session()
|
||||
sent_count = 0
|
||||
error_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
try:
|
||||
logger.info("send_digest_emails: starting digest run")
|
||||
|
||||
# Find users with email digests enabled
|
||||
users = session.execute(
|
||||
select(User).where(
|
||||
User.notification_preferences["email_digests"].as_boolean() == True, # noqa: E712
|
||||
User.is_active == True, # noqa: E712
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
logger.info("send_digest_emails: found %d eligible users", len(users))
|
||||
|
||||
for user in users:
|
||||
try:
|
||||
_process_user_digest(session, user, settings)
|
||||
sent_count += 1
|
||||
except _NoNewContent:
|
||||
skipped_count += 1
|
||||
except Exception:
|
||||
error_count += 1
|
||||
logger.exception(
|
||||
"send_digest_emails: error processing digest for user_id=%s",
|
||||
user.id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"send_digest_emails: complete — sent=%d skipped=%d errors=%d",
|
||||
sent_count,
|
||||
skipped_count,
|
||||
error_count,
|
||||
)
|
||||
return {
|
||||
"sent": sent_count,
|
||||
"skipped": skipped_count,
|
||||
"errors": error_count,
|
||||
}
|
||||
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
class _NoNewContent(Exception):
|
||||
"""Sentinel raised when a user has no new content to digest."""
|
||||
|
||||
|
||||
def _process_user_digest(session: Session, user: User, settings) -> None:
|
||||
"""Build and send a digest email for one user. Raises _NoNewContent if empty."""
|
||||
# Last digest timestamp (or epoch if never sent)
|
||||
last_digest_row = session.execute(
|
||||
select(func.max(EmailDigestLog.digest_sent_at)).where(
|
||||
EmailDigestLog.user_id == user.id
|
||||
)
|
||||
).scalar()
|
||||
last_digest_at = last_digest_row or datetime(2000, 1, 1)
|
||||
|
||||
# Get followed creator IDs
|
||||
followed_creator_ids = session.execute(
|
||||
select(CreatorFollow.creator_id).where(CreatorFollow.user_id == user.id)
|
||||
).scalars().all()
|
||||
|
||||
if not followed_creator_ids:
|
||||
raise _NoNewContent()
|
||||
|
||||
# Collect new content per creator
|
||||
creator_content_groups: list[dict] = []
|
||||
content_summary_items: list[dict] = []
|
||||
|
||||
for creator_id in followed_creator_ids:
|
||||
# Get creator name
|
||||
creator = session.execute(
|
||||
select(Creator).where(Creator.id == creator_id)
|
||||
).scalar_one_or_none()
|
||||
if not creator:
|
||||
continue
|
||||
|
||||
# New published posts
|
||||
new_posts = session.execute(
|
||||
select(Post).where(
|
||||
Post.creator_id == creator_id,
|
||||
Post.is_published == True, # noqa: E712
|
||||
Post.created_at > last_digest_at,
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
# New technique pages (via TechniquePageVideo → SourceVideo for creator match)
|
||||
# TechniquePage has creator_id directly, so we can query that
|
||||
new_technique_pages = session.execute(
|
||||
select(TechniquePage).where(
|
||||
TechniquePage.creator_id == creator_id,
|
||||
TechniquePage.created_at > last_digest_at,
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if not new_posts and not new_technique_pages:
|
||||
continue
|
||||
|
||||
base_url = f"http://localhost:8096" # TODO: make configurable via settings
|
||||
|
||||
group = {
|
||||
"creator_name": creator.name,
|
||||
"posts": [
|
||||
{
|
||||
"title": p.title,
|
||||
"url": f"{base_url}/creators/{creator.slug}/posts/{p.id}",
|
||||
}
|
||||
for p in new_posts
|
||||
],
|
||||
"technique_pages": [
|
||||
{
|
||||
"title": tp.title,
|
||||
"url": f"{base_url}/techniques/{tp.slug}",
|
||||
}
|
||||
for tp in new_technique_pages
|
||||
],
|
||||
}
|
||||
creator_content_groups.append(group)
|
||||
|
||||
# Track for content_summary log
|
||||
for p in new_posts:
|
||||
content_summary_items.append(
|
||||
{"type": "post", "id": str(p.id), "title": p.title}
|
||||
)
|
||||
for tp in new_technique_pages:
|
||||
content_summary_items.append(
|
||||
{"type": "technique_page", "id": str(tp.id), "title": tp.title}
|
||||
)
|
||||
|
||||
if not creator_content_groups:
|
||||
raise _NoNewContent()
|
||||
|
||||
# Generate signed unsubscribe URL
|
||||
token = generate_unsubscribe_token(str(user.id), settings.app_secret_key)
|
||||
unsubscribe_url = f"{base_url}/api/v1/notifications/unsubscribe?token={token}"
|
||||
|
||||
# Compose and send
|
||||
html = compose_digest_html(
|
||||
user_display_name=user.display_name,
|
||||
creator_content_groups=creator_content_groups,
|
||||
unsubscribe_url=unsubscribe_url,
|
||||
)
|
||||
|
||||
subject = "Chrysopedia Digest — New content from creators you follow"
|
||||
success = send_email(user.email, subject, html, settings)
|
||||
|
||||
if success:
|
||||
# Log successful send (deduplication anchor)
|
||||
log_entry = EmailDigestLog(
|
||||
user_id=user.id,
|
||||
content_summary={"items": content_summary_items},
|
||||
)
|
||||
session.add(log_entry)
|
||||
session.commit()
|
||||
logger.info(
|
||||
"send_digest_emails: sent digest to user_id=%s (%d creators, %d items)",
|
||||
user.id,
|
||||
len(creator_content_groups),
|
||||
len(content_summary_items),
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"send_digest_emails: failed to send digest to user_id=%s email=%s",
|
||||
user.id,
|
||||
user.email,
|
||||
)
|
||||
raise RuntimeError(f"Email send failed for user {user.id}")
|
||||
|
|
@ -2,9 +2,11 @@
|
|||
|
||||
Usage:
|
||||
celery -A worker worker --loglevel=info
|
||||
celery -A worker worker --beat --loglevel=info (with Beat scheduler)
|
||||
"""
|
||||
|
||||
from celery import Celery
|
||||
from celery.schedules import crontab
|
||||
|
||||
from config import get_settings
|
||||
|
||||
|
|
@ -30,3 +32,13 @@ celery_app.conf.update(
|
|||
# Import pipeline.stages so that @celery_app.task decorators register tasks.
|
||||
# This import must come after celery_app is defined.
|
||||
import pipeline.stages # noqa: E402, F401
|
||||
import tasks.notifications # noqa: E402, F401
|
||||
|
||||
# ── Celery Beat schedule ─────────────────────────────────────────────────────
|
||||
|
||||
celery_app.conf.beat_schedule = {
|
||||
"send-digest-emails": {
|
||||
"task": "tasks.notifications.send_digest_emails",
|
||||
"schedule": crontab(hour=9, minute=0), # daily at 09:00 UTC
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ services:
|
|||
QDRANT_URL: http://chrysopedia-qdrant:6333
|
||||
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
|
||||
PROMPTS_PATH: /prompts
|
||||
command: ["celery", "-A", "worker", "worker", "--loglevel=info", "--concurrency=1"]
|
||||
command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"]
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "celery -A worker inspect ping --timeout=5 2>/dev/null | grep -q pong || exit 1"]
|
||||
interval: 30s
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue