diff --git a/backend/routers/pipeline.py b/backend/routers/pipeline.py index 5299739..521da12 100644 --- a/backend/routers/pipeline.py +++ b/backend/routers/pipeline.py @@ -11,6 +11,8 @@ Admin: GET /admin/pipeline/worker-status Active/reserved tasks from Celery inspect """ +import asyncio +import json import logging import uuid from datetime import datetime, timezone @@ -1263,51 +1265,79 @@ async def reindex_all( # ── Admin: Worker status ───────────────────────────────────────────────────── +WORKER_STATUS_CACHE_KEY = "chrysopedia:worker_status" +WORKER_STATUS_CACHE_TTL = 10 # seconds + + +def _inspect_workers(): + """Synchronous Celery inspect — runs in a thread to avoid blocking the event loop.""" + from worker import celery_app + + inspector = celery_app.control.inspect(timeout=0.5) + active = inspector.active() or {} + reserved = inspector.reserved() or {} + stats = inspector.stats() or {} + + workers = [] + for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())): + worker_active = active.get(worker_name, []) + worker_reserved = reserved.get(worker_name, []) + worker_stats = stats.get(worker_name, {}) + + workers.append({ + "name": worker_name, + "active_tasks": [ + { + "id": t.get("id"), + "name": t.get("name"), + "args": t.get("args", []), + "time_start": t.get("time_start"), + } + for t in worker_active + ], + "reserved_tasks": len(worker_reserved), + "total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0) + + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0), + "uptime": worker_stats.get("clock", None), + "pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None, + }) + + return {"online": len(workers) > 0, "workers": workers} + + @router.get("/admin/pipeline/worker-status") async def worker_status(): - """Get current Celery worker status — active, reserved, and stats.""" - from worker import celery_app - + """Get current Celery worker status — active, reserved, and stats. + + Results are cached in Redis for 10 seconds to avoid repeated slow + Celery inspect round-trips. The synchronous inspect calls run in a + thread so they never block the async event loop. + """ + # Try Redis cache first try: - inspector = celery_app.control.inspect() - active = inspector.active() or {} - reserved = inspector.reserved() or {} - stats = inspector.stats() or {} - - workers = [] - for worker_name in set(list(active.keys()) + list(reserved.keys()) + list(stats.keys())): - worker_active = active.get(worker_name, []) - worker_reserved = reserved.get(worker_name, []) - worker_stats = stats.get(worker_name, {}) - - workers.append({ - "name": worker_name, - "active_tasks": [ - { - "id": t.get("id"), - "name": t.get("name"), - "args": t.get("args", []), - "time_start": t.get("time_start"), - } - for t in worker_active - ], - "reserved_tasks": len(worker_reserved), - "total_completed": worker_stats.get("total", {}).get("tasks.pipeline.stages.stage2_segmentation", 0) - + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage3_extraction", 0) - + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage4_classification", 0) - + worker_stats.get("total", {}).get("tasks.pipeline.stages.stage5_synthesis", 0), - "uptime": worker_stats.get("clock", None), - "pool_size": worker_stats.get("pool", {}).get("max-concurrency") if isinstance(worker_stats.get("pool"), dict) else None, - }) - - return { - "online": len(workers) > 0, - "workers": workers, - } + redis = await get_redis() + cached = await redis.get(WORKER_STATUS_CACHE_KEY) + await redis.aclose() + if cached: + return json.loads(cached) + except Exception: + pass + + # Cache miss — run synchronous inspect in a thread + try: + result = await asyncio.to_thread(_inspect_workers) except Exception as exc: logger.warning("Failed to inspect Celery workers: %s", exc) - return { - "online": False, - "workers": [], - "error": str(exc), - } + return {"online": False, "workers": [], "error": str(exc)} + + # Write to Redis cache (best-effort) + try: + redis = await get_redis() + await redis.set(WORKER_STATUS_CACHE_KEY, json.dumps(result), ex=WORKER_STATUS_CACHE_TTL) + await redis.aclose() + except Exception: + pass + + return result diff --git a/frontend/index.html b/frontend/index.html index bd904b8..06c6e5e 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -4,6 +4,18 @@ + + + + + + + + + + + + Chrysopedia diff --git a/frontend/public/apple-touch-icon.png b/frontend/public/apple-touch-icon.png new file mode 100644 index 0000000..bf88f11 Binary files /dev/null and b/frontend/public/apple-touch-icon.png differ diff --git a/frontend/public/favicon-32.png b/frontend/public/favicon-32.png new file mode 100644 index 0000000..fe65ca0 Binary files /dev/null and b/frontend/public/favicon-32.png differ diff --git a/frontend/public/favicon.svg b/frontend/public/favicon.svg new file mode 100644 index 0000000..1701732 --- /dev/null +++ b/frontend/public/favicon.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/frontend/public/og-image.png b/frontend/public/og-image.png new file mode 100644 index 0000000..12f0dcb Binary files /dev/null and b/frontend/public/og-image.png differ