fractafrag/services/api/app/worker/__init__.py
John Lightner 05d39fdda8 M0: Foundation scaffold — Docker Compose, DB schema, FastAPI app, all service stubs
Track A (Infrastructure & Data Layer):
- docker-compose.yml with all 7 services (nginx, frontend, api, mcp, renderer, worker, postgres, redis)
- docker-compose.override.yml for local dev (hot reload, port exposure)
- PostgreSQL init.sql with full schema (15 tables, pgvector indexes, creator economy stubs)
- .env.example with all required environment variables

Track A+B (API Layer):
- FastAPI app with 10 routers (auth, shaders, feed, votes, generate, desires, users, payments, mcp_keys, health)
- SQLAlchemy ORM models for all 15 tables
- Pydantic schemas for all request/response types
- JWT auth middleware (access + refresh tokens, Redis blocklist)
- Redis rate limiting middleware
- Celery worker config with job stubs (render, embed, generate, feed cache, expire bounties)
- Alembic migration framework

Service stubs:
- MCP server (health endpoint, 501 for all tools)
- Renderer service (Express + Puppeteer scaffold, 501 for /render)
- Frontend (package.json with React/Vite/Three.js/TanStack/Tailwind deps)
- Nginx reverse proxy config (/, /api, /mcp, /renders)

Project:
- DECISIONS.md with 11 recorded architectural decisions
- README.md with architecture overview
- Sample shader seed data (plasma, fractal noise, raymarched sphere)
2026-03-24 20:45:08 -05:00

90 lines
2.6 KiB
Python

"""Fractafrag — Celery worker configuration."""
from celery import Celery
import os
redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app = Celery(
"fractafrag",
broker=redis_url,
backend=redis_url,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=120, # hard kill after 2 min
task_soft_time_limit=90, # soft warning at 90s
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=100,
)
# Auto-discover tasks from worker modules
celery_app.autodiscover_tasks(["app.worker"])
# ── Task Definitions ──────────────────────────────────────
@celery_app.task(name="render_shader", bind=True, max_retries=2)
def render_shader(self, shader_id: str):
"""Render a shader via the headless Chromium renderer. (Track C)"""
# TODO: Implement in Track C
# 1. Fetch shader GLSL from DB
# 2. POST to renderer service
# 3. Store thumbnail + preview URLs
# 4. Update shader render_status
pass
@celery_app.task(name="embed_shader", bind=True)
def embed_shader(self, shader_id: str):
"""Generate style embedding vector for a shader. (Track C/F)"""
# TODO: Implement in Track C/F
pass
@celery_app.task(name="process_desire", bind=True)
def process_desire(self, desire_id: str):
"""Process a new desire: embed, cluster, optionally auto-fulfill. (Track G)"""
# TODO: Implement in Track G
pass
@celery_app.task(name="ai_generate", bind=True, max_retries=3)
def ai_generate(self, job_id: str, prompt: str, provider: str, user_id: str):
"""AI shader generation: prompt → LLM → GLSL → validate → render. (Track I)"""
# TODO: Implement in Track I
pass
@celery_app.task(name="rebuild_feed_cache")
def rebuild_feed_cache():
"""Rebuild the anonymous feed cache (trending + new). Runs every 15 min. (Track F)"""
# TODO: Implement in Track F
pass
@celery_app.task(name="expire_bounties")
def expire_bounties():
"""Mark old unfulfilled bounties as expired. Runs daily. (Track G)"""
# TODO: Implement in Track G
pass
# ── Periodic Tasks (Celery Beat) ─────────────────────────
celery_app.conf.beat_schedule = {
"rebuild-feed-cache": {
"task": "rebuild_feed_cache",
"schedule": 900.0, # every 15 minutes
},
"expire-bounties": {
"task": "expire_bounties",
"schedule": 86400.0, # daily
},
}