"""Fractafrag — Celery worker configuration.""" import logging import time import uuid as uuid_mod from celery import Celery import os from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0") celery_app = Celery( "fractafrag", broker=redis_url, backend=redis_url, ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=120, # hard kill after 2 min task_soft_time_limit=90, # soft warning at 90s worker_prefetch_multiplier=1, worker_max_tasks_per_child=100, ) # Auto-discover tasks from worker modules celery_app.autodiscover_tasks(["app.worker"]) # ── Sync DB session factory for worker tasks ────────────── def _get_sync_session_factory(): """Lazy-init sync session factory using settings.database_url_sync.""" from app.config import get_settings settings = get_settings() engine = create_engine(settings.database_url_sync, pool_pre_ping=True) return sessionmaker(bind=engine) logger = logging.getLogger(__name__) # ── Task Definitions ────────────────────────────────────── @celery_app.task(name="render_shader", bind=True, max_retries=2) def render_shader(self, shader_id: str): """Render a shader via the headless Chromium renderer. (Track C)""" # TODO: Implement in Track C # 1. Fetch shader GLSL from DB # 2. POST to renderer service # 3. Store thumbnail + preview URLs # 4. Update shader render_status pass @celery_app.task(name="embed_shader", bind=True) def embed_shader(self, shader_id: str): """Generate style embedding vector for a shader. (Track C/F)""" # TODO: Implement in Track C/F pass @celery_app.task(name="process_desire", bind=True, max_retries=3) def process_desire(self, desire_id: str): """Process a new desire: embed text, store embedding, cluster, update heat. Flow: 1. Load desire from DB by id 2. Embed prompt_text → 512-dim vector 3. Store embedding on desire row 4. Run sync clustering (find nearest or create new cluster) 5. Commit all changes On transient DB errors, retries up to 3 times with 30s backoff. On success, logs desire_id, cluster_id, heat_score, and elapsed_ms. On failure, desire keeps prompt_embedding=NULL and heat_score=1.0. """ start = time.perf_counter() desire_uuid = uuid_mod.UUID(desire_id) SessionFactory = _get_sync_session_factory() session = SessionFactory() try: from app.models.models import Desire from app.services.embedding import embed_text from app.services.clustering import cluster_desire_sync # 1. Load desire desire = session.get(Desire, desire_uuid) if desire is None: logger.warning( "process_desire: desire %s not found, skipping", desire_id ) return # 2. Embed prompt text embedding = embed_text(desire.prompt_text) # 3. Store embedding on desire desire.prompt_embedding = embedding session.flush() # 4. Cluster cluster_result = cluster_desire_sync(desire.id, embedding, session) # 5. Commit session.commit() elapsed_ms = (time.perf_counter() - start) * 1000 logger.info( "process_desire completed: desire_id=%s cluster_id=%s " "is_new=%s heat_score=%.1f elapsed_ms=%.1f", desire_id, cluster_result["cluster_id"], cluster_result["is_new"], cluster_result["heat_score"], elapsed_ms, ) except Exception as exc: session.rollback() elapsed_ms = (time.perf_counter() - start) * 1000 logger.error( "process_desire failed: desire_id=%s error=%s elapsed_ms=%.1f", desire_id, str(exc), elapsed_ms, ) raise self.retry(exc=exc, countdown=30) finally: session.close() @celery_app.task(name="ai_generate", bind=True, max_retries=3) def ai_generate(self, job_id: str, prompt: str, provider: str, user_id: str): """AI shader generation: prompt → LLM → GLSL → validate → render. (Track I)""" # TODO: Implement in Track I pass @celery_app.task(name="rebuild_feed_cache") def rebuild_feed_cache(): """Rebuild the anonymous feed cache (trending + new). Runs every 15 min. (Track F)""" # TODO: Implement in Track F pass @celery_app.task(name="expire_bounties") def expire_bounties(): """Mark old unfulfilled bounties as expired. Runs daily. (Track G)""" # TODO: Implement in Track G pass # ── Periodic Tasks (Celery Beat) ───────────────────────── celery_app.conf.beat_schedule = { "rebuild-feed-cache": { "task": "rebuild_feed_cache", "schedule": 900.0, # every 15 minutes }, "expire-bounties": { "task": "expire_bounties", "schedule": 86400.0, # daily }, }