fractafrag/services/api/app/worker/__init__.py
John Lightner 5936ab167e feat(M001): Desire Economy
Completed slices:
- S01: Desire Embedding & Clustering
- S02: Fulfillment Flow & Frontend

Branch: milestone/M001
2026-03-25 02:22:50 -05:00

176 lines
5.2 KiB
Python

"""Fractafrag — Celery worker configuration."""
import logging
import time
import uuid as uuid_mod
from celery import Celery
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app = Celery(
"fractafrag",
broker=redis_url,
backend=redis_url,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=120, # hard kill after 2 min
task_soft_time_limit=90, # soft warning at 90s
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=100,
)
# Auto-discover tasks from worker modules
celery_app.autodiscover_tasks(["app.worker"])
# ── Sync DB session factory for worker tasks ──────────────
def _get_sync_session_factory():
"""Lazy-init sync session factory using settings.database_url_sync."""
from app.config import get_settings
settings = get_settings()
engine = create_engine(settings.database_url_sync, pool_pre_ping=True)
return sessionmaker(bind=engine)
logger = logging.getLogger(__name__)
# ── Task Definitions ──────────────────────────────────────
@celery_app.task(name="render_shader", bind=True, max_retries=2)
def render_shader(self, shader_id: str):
"""Render a shader via the headless Chromium renderer. (Track C)"""
# TODO: Implement in Track C
# 1. Fetch shader GLSL from DB
# 2. POST to renderer service
# 3. Store thumbnail + preview URLs
# 4. Update shader render_status
pass
@celery_app.task(name="embed_shader", bind=True)
def embed_shader(self, shader_id: str):
"""Generate style embedding vector for a shader. (Track C/F)"""
# TODO: Implement in Track C/F
pass
@celery_app.task(name="process_desire", bind=True, max_retries=3)
def process_desire(self, desire_id: str):
"""Process a new desire: embed text, store embedding, cluster, update heat.
Flow:
1. Load desire from DB by id
2. Embed prompt_text → 512-dim vector
3. Store embedding on desire row
4. Run sync clustering (find nearest or create new cluster)
5. Commit all changes
On transient DB errors, retries up to 3 times with 30s backoff.
On success, logs desire_id, cluster_id, heat_score, and elapsed_ms.
On failure, desire keeps prompt_embedding=NULL and heat_score=1.0.
"""
start = time.perf_counter()
desire_uuid = uuid_mod.UUID(desire_id)
SessionFactory = _get_sync_session_factory()
session = SessionFactory()
try:
from app.models.models import Desire
from app.services.embedding import embed_text
from app.services.clustering import cluster_desire_sync
# 1. Load desire
desire = session.get(Desire, desire_uuid)
if desire is None:
logger.warning(
"process_desire: desire %s not found, skipping", desire_id
)
return
# 2. Embed prompt text
embedding = embed_text(desire.prompt_text)
# 3. Store embedding on desire
desire.prompt_embedding = embedding
session.flush()
# 4. Cluster
cluster_result = cluster_desire_sync(desire.id, embedding, session)
# 5. Commit
session.commit()
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"process_desire completed: desire_id=%s cluster_id=%s "
"is_new=%s heat_score=%.1f elapsed_ms=%.1f",
desire_id,
cluster_result["cluster_id"],
cluster_result["is_new"],
cluster_result["heat_score"],
elapsed_ms,
)
except Exception as exc:
session.rollback()
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"process_desire failed: desire_id=%s error=%s elapsed_ms=%.1f",
desire_id,
str(exc),
elapsed_ms,
)
raise self.retry(exc=exc, countdown=30)
finally:
session.close()
@celery_app.task(name="ai_generate", bind=True, max_retries=3)
def ai_generate(self, job_id: str, prompt: str, provider: str, user_id: str):
"""AI shader generation: prompt → LLM → GLSL → validate → render. (Track I)"""
# TODO: Implement in Track I
pass
@celery_app.task(name="rebuild_feed_cache")
def rebuild_feed_cache():
"""Rebuild the anonymous feed cache (trending + new). Runs every 15 min. (Track F)"""
# TODO: Implement in Track F
pass
@celery_app.task(name="expire_bounties")
def expire_bounties():
"""Mark old unfulfilled bounties as expired. Runs daily. (Track G)"""
# TODO: Implement in Track G
pass
# ── Periodic Tasks (Celery Beat) ─────────────────────────
celery_app.conf.beat_schedule = {
"rebuild-feed-cache": {
"task": "rebuild_feed_cache",
"schedule": 900.0, # every 15 minutes
},
"expire-bounties": {
"task": "expire_bounties",
"schedule": 86400.0, # daily
},
}