feat: Built send_digest_emails Celery task with per-user content queryi…

- "backend/tasks/__init__.py"
- "backend/tasks/notifications.py"
- "backend/worker.py"
- "docker-compose.yml"

GSD-Task: S01/T02
This commit is contained in:
jlightner 2026-04-04 12:15:43 +00:00
parent 34a45d1c8e
commit 5e4b173917
7 changed files with 412 additions and 2 deletions

View file

@ -52,7 +52,7 @@
- Estimate: 1.5h
- Files: backend/config.py, backend/models.py, backend/schemas.py, backend/services/email.py, alembic/versions/029_add_email_digest.py
- Verify: cd backend && python -c "from models import EmailDigestLog, User; from services.email import compose_digest_html, send_email, is_smtp_configured; from config import get_settings; s = get_settings(); assert not is_smtp_configured(s); print('OK')" && cd ../alembic && python -c "import importlib.util; spec = importlib.util.spec_from_file_location('m', 'versions/029_add_email_digest.py'); mod = importlib.util.module_from_spec(spec); print('Migration module loads OK')"
- [ ] **T02: Digest Celery task with Beat scheduling and Docker config** — Build the digest orchestration: a Celery task that queries new content since each user's last digest, groups by followed creator, composes and sends emails via the email service, and logs successful sends. Configure Celery Beat to run it daily. Update docker-compose.yml to add --beat flag to the worker.
- [x] **T02: Built send_digest_emails Celery task with per-user content querying, Beat schedule at 09:00 UTC daily, and --beat flag in Docker worker command** — Build the digest orchestration: a Celery task that queries new content since each user's last digest, groups by followed creator, composes and sends emails via the email service, and logs successful sends. Configure Celery Beat to run it daily. Update docker-compose.yml to add --beat flag to the worker.
## Steps

View file

@ -0,0 +1,24 @@
{
"schemaVersion": 1,
"taskId": "T01",
"unitId": "M025/S01/T01",
"timestamp": 1775304673940,
"passed": false,
"discoverySource": "task-plan",
"checks": [
{
"command": "cd backend",
"exitCode": 0,
"durationMs": 9,
"verdict": "pass"
},
{
"command": "cd ../alembic",
"exitCode": 2,
"durationMs": 8,
"verdict": "fail"
}
],
"retryAttempt": 1,
"maxRetries": 2
}

View file

@ -0,0 +1,84 @@
---
id: T02
parent: S01
milestone: M025
provides: []
requires: []
affects: []
key_files: ["backend/tasks/__init__.py", "backend/tasks/notifications.py", "backend/worker.py", "docker-compose.yml"]
key_decisions: ["Used PyJWT for signed unsubscribe tokens instead of itsdangerous — already a dependency, avoids adding new package"]
patterns_established: []
drill_down_paths: []
observability_surfaces: []
duration: ""
verification_result: "Ran task plan verification: task imports OK, Beat schedule assertion passes, docker-compose grep finds --beat flag. Token round-trip verified (generate, verify, wrong-secret rejection, tampered-token rejection). SMTP-unconfigured no-op returns expected result."
completed_at: 2026-04-04T12:15:39.956Z
blocker_discovered: false
---
# T02: Built send_digest_emails Celery task with per-user content querying, Beat schedule at 09:00 UTC daily, and --beat flag in Docker worker command
> Built send_digest_emails Celery task with per-user content querying, Beat schedule at 09:00 UTC daily, and --beat flag in Docker worker command
## What Happened
---
id: T02
parent: S01
milestone: M025
key_files:
- backend/tasks/__init__.py
- backend/tasks/notifications.py
- backend/worker.py
- docker-compose.yml
key_decisions:
- Used PyJWT for signed unsubscribe tokens instead of itsdangerous — already a dependency, avoids adding new package
duration: ""
verification_result: passed
completed_at: 2026-04-04T12:15:39.956Z
blocker_discovered: false
---
# T02: Built send_digest_emails Celery task with per-user content querying, Beat schedule at 09:00 UTC daily, and --beat flag in Docker worker command
**Built send_digest_emails Celery task with per-user content querying, Beat schedule at 09:00 UTC daily, and --beat flag in Docker worker command**
## What Happened
Created backend/tasks/notifications.py with the send_digest_emails Celery task. The task queries users with email_digests enabled, finds each user's last digest timestamp from EmailDigestLog, queries followed creators via CreatorFollow, then finds new Posts and TechniquePages per creator. Content is grouped by creator and passed to compose_digest_html. Used PyJWT (already in requirements) instead of itsdangerous for signed unsubscribe tokens. Added Celery Beat schedule to worker.py with crontab(hour=9, minute=0). Updated docker-compose.yml worker command to include --beat flag. Task is a graceful no-op when SMTP is unconfigured. EmailDigestLog is only written on successful send.
## Verification
Ran task plan verification: task imports OK, Beat schedule assertion passes, docker-compose grep finds --beat flag. Token round-trip verified (generate, verify, wrong-secret rejection, tampered-token rejection). SMTP-unconfigured no-op returns expected result.
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd backend && python -c "from tasks.notifications import send_digest_emails; print('Task imports OK')"` | 0 | ✅ pass | 800ms |
| 2 | `cd backend && python -c "from worker import celery_app; assert 'send-digest-emails' in celery_app.conf.beat_schedule; print('Beat schedule OK')"` | 0 | ✅ pass | 700ms |
| 3 | `grep -q '\-\-beat' docker-compose.yml` | 0 | ✅ pass | 10ms |
| 4 | `Token round-trip + rejection tests` | 0 | ✅ pass | 200ms |
| 5 | `SMTP-unconfigured graceful no-op` | 0 | ✅ pass | 600ms |
## Deviations
Used PyJWT instead of itsdangerous for unsubscribe tokens — PyJWT already in requirements, avoids new dependency. T03 unsubscribe endpoint will use verify_unsubscribe_token from this module.
## Known Issues
base_url for content links and unsubscribe URL is hardcoded to http://localhost:8096. Should be configurable via settings for production.
## Files Created/Modified
- `backend/tasks/__init__.py`
- `backend/tasks/notifications.py`
- `backend/worker.py`
- `docker-compose.yml`
## Deviations
Used PyJWT instead of itsdangerous for unsubscribe tokens — PyJWT already in requirements, avoids new dependency. T03 unsubscribe endpoint will use verify_unsubscribe_token from this module.
## Known Issues
base_url for content links and unsubscribe URL is hardcoded to http://localhost:8096. Should be configurable via settings for production.

View file

@ -0,0 +1 @@
# Notification tasks package

View file

@ -0,0 +1,289 @@
"""Celery task: send batched email digests to followers.
Queries new content (posts + technique pages) published since each user's
last digest, groups by followed creator, composes HTML via the email service,
sends, and logs successful deliveries in EmailDigestLog.
"""
from __future__ import annotations
import json
import logging
import time
from datetime import datetime, timezone
import jwt
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from config import get_settings
from models import (
Creator,
CreatorFollow,
EmailDigestLog,
Post,
SourceVideo,
TechniquePage,
TechniquePageVideo,
User,
)
from services.email import compose_digest_html, is_smtp_configured, send_email
from worker import celery_app
logger = logging.getLogger(__name__)
# ── Sync DB (same pattern as pipeline/stages.py) ────────────────────────────
_engine = None
_SessionLocal = None
def _get_sync_engine():
global _engine
if _engine is None:
settings = get_settings()
url = settings.database_url.replace(
"postgresql+asyncpg://", "postgresql+psycopg2://"
)
_engine = create_engine(url, pool_pre_ping=True, pool_size=2, max_overflow=3)
return _engine
def _get_sync_session() -> Session:
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=_get_sync_engine())
return _SessionLocal()
# ── Unsubscribe token helpers ────────────────────────────────────────────────
_UNSUB_TOKEN_MAX_AGE = 30 * 24 * 3600 # 30 days in seconds
def generate_unsubscribe_token(user_id: str, secret_key: str) -> str:
"""Create a signed JWT encoding the user_id for unsubscribe links."""
payload = {
"sub": str(user_id),
"purpose": "unsubscribe",
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, secret_key, algorithm="HS256")
def verify_unsubscribe_token(token: str, secret_key: str) -> str | None:
"""Decode and verify an unsubscribe token.
Returns user_id string on success, None on failure/expiry.
"""
try:
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
options={"require": ["sub", "purpose", "iat"]},
)
if payload.get("purpose") != "unsubscribe":
return None
# Check age manually (PyJWT exp claim would also work, but we
# want the max-age to be configurable without re-signing)
iat = payload.get("iat", 0)
if isinstance(iat, datetime):
iat = iat.timestamp()
age = time.time() - iat
if age > _UNSUB_TOKEN_MAX_AGE:
return None
return payload["sub"]
except (jwt.InvalidTokenError, KeyError):
return None
# ── Main digest task ─────────────────────────────────────────────────────────
@celery_app.task(name="tasks.notifications.send_digest_emails", bind=True)
def send_digest_emails(self):
"""Query new content per followed creator and send digest emails.
Runs as a Celery Beat scheduled task (daily at 09:00 UTC).
Graceful no-op when SMTP is not configured.
"""
settings = get_settings()
if not is_smtp_configured(settings):
logger.warning(
"send_digest_emails: SMTP not configured, skipping digest run"
)
return {"skipped": True, "reason": "smtp_not_configured"}
session = _get_sync_session()
sent_count = 0
error_count = 0
skipped_count = 0
try:
logger.info("send_digest_emails: starting digest run")
# Find users with email digests enabled
users = session.execute(
select(User).where(
User.notification_preferences["email_digests"].as_boolean() == True, # noqa: E712
User.is_active == True, # noqa: E712
)
).scalars().all()
logger.info("send_digest_emails: found %d eligible users", len(users))
for user in users:
try:
_process_user_digest(session, user, settings)
sent_count += 1
except _NoNewContent:
skipped_count += 1
except Exception:
error_count += 1
logger.exception(
"send_digest_emails: error processing digest for user_id=%s",
user.id,
)
logger.info(
"send_digest_emails: complete — sent=%d skipped=%d errors=%d",
sent_count,
skipped_count,
error_count,
)
return {
"sent": sent_count,
"skipped": skipped_count,
"errors": error_count,
}
finally:
session.close()
class _NoNewContent(Exception):
"""Sentinel raised when a user has no new content to digest."""
def _process_user_digest(session: Session, user: User, settings) -> None:
"""Build and send a digest email for one user. Raises _NoNewContent if empty."""
# Last digest timestamp (or epoch if never sent)
last_digest_row = session.execute(
select(func.max(EmailDigestLog.digest_sent_at)).where(
EmailDigestLog.user_id == user.id
)
).scalar()
last_digest_at = last_digest_row or datetime(2000, 1, 1)
# Get followed creator IDs
followed_creator_ids = session.execute(
select(CreatorFollow.creator_id).where(CreatorFollow.user_id == user.id)
).scalars().all()
if not followed_creator_ids:
raise _NoNewContent()
# Collect new content per creator
creator_content_groups: list[dict] = []
content_summary_items: list[dict] = []
for creator_id in followed_creator_ids:
# Get creator name
creator = session.execute(
select(Creator).where(Creator.id == creator_id)
).scalar_one_or_none()
if not creator:
continue
# New published posts
new_posts = session.execute(
select(Post).where(
Post.creator_id == creator_id,
Post.is_published == True, # noqa: E712
Post.created_at > last_digest_at,
)
).scalars().all()
# New technique pages (via TechniquePageVideo → SourceVideo for creator match)
# TechniquePage has creator_id directly, so we can query that
new_technique_pages = session.execute(
select(TechniquePage).where(
TechniquePage.creator_id == creator_id,
TechniquePage.created_at > last_digest_at,
)
).scalars().all()
if not new_posts and not new_technique_pages:
continue
base_url = f"http://localhost:8096" # TODO: make configurable via settings
group = {
"creator_name": creator.name,
"posts": [
{
"title": p.title,
"url": f"{base_url}/creators/{creator.slug}/posts/{p.id}",
}
for p in new_posts
],
"technique_pages": [
{
"title": tp.title,
"url": f"{base_url}/techniques/{tp.slug}",
}
for tp in new_technique_pages
],
}
creator_content_groups.append(group)
# Track for content_summary log
for p in new_posts:
content_summary_items.append(
{"type": "post", "id": str(p.id), "title": p.title}
)
for tp in new_technique_pages:
content_summary_items.append(
{"type": "technique_page", "id": str(tp.id), "title": tp.title}
)
if not creator_content_groups:
raise _NoNewContent()
# Generate signed unsubscribe URL
token = generate_unsubscribe_token(str(user.id), settings.app_secret_key)
unsubscribe_url = f"{base_url}/api/v1/notifications/unsubscribe?token={token}"
# Compose and send
html = compose_digest_html(
user_display_name=user.display_name,
creator_content_groups=creator_content_groups,
unsubscribe_url=unsubscribe_url,
)
subject = "Chrysopedia Digest — New content from creators you follow"
success = send_email(user.email, subject, html, settings)
if success:
# Log successful send (deduplication anchor)
log_entry = EmailDigestLog(
user_id=user.id,
content_summary={"items": content_summary_items},
)
session.add(log_entry)
session.commit()
logger.info(
"send_digest_emails: sent digest to user_id=%s (%d creators, %d items)",
user.id,
len(creator_content_groups),
len(content_summary_items),
)
else:
logger.error(
"send_digest_emails: failed to send digest to user_id=%s email=%s",
user.id,
user.email,
)
raise RuntimeError(f"Email send failed for user {user.id}")

View file

@ -2,9 +2,11 @@
Usage:
celery -A worker worker --loglevel=info
celery -A worker worker --beat --loglevel=info (with Beat scheduler)
"""
from celery import Celery
from celery.schedules import crontab
from config import get_settings
@ -30,3 +32,13 @@ celery_app.conf.update(
# Import pipeline.stages so that @celery_app.task decorators register tasks.
# This import must come after celery_app is defined.
import pipeline.stages # noqa: E402, F401
import tasks.notifications # noqa: E402, F401
# ── Celery Beat schedule ─────────────────────────────────────────────────────
celery_app.conf.beat_schedule = {
"send-digest-emails": {
"task": "tasks.notifications.send_digest_emails",
"schedule": crontab(hour=9, minute=0), # daily at 09:00 UTC
},
}

View file

@ -155,7 +155,7 @@ services:
QDRANT_URL: http://chrysopedia-qdrant:6333
EMBEDDING_API_URL: http://chrysopedia-ollama:11434/v1
PROMPTS_PATH: /prompts
command: ["celery", "-A", "worker", "worker", "--loglevel=info", "--concurrency=1"]
command: ["celery", "-A", "worker", "worker", "--beat", "--loglevel=info", "--concurrency=1"]
healthcheck:
test: ["CMD-SHELL", "celery -A worker inspect ping --timeout=5 2>/dev/null | grep -q pong || exit 1"]
interval: 30s