""" Chrysopedia MCP Server — Docker-native management tools. Runs inside the Docker Compose stack with direct access to PostgreSQL, Redis (Celery broker), Docker socket, and the filesystem. 25 tools across 6 domains: pipeline execution, infrastructure health, content queries, pipeline observability, embeddings, and prompts. """ from __future__ import annotations import hashlib import json import os import subprocess from datetime import datetime, timezone import docker import psycopg2 import psycopg2.extras import redis from mcp.server.fastmcp import FastMCP # ── Configuration ───────────────────────────────────────────────────────────── DB_URL = os.getenv( "DATABASE_URL", "postgresql://chrysopedia:changeme@chrysopedia-db:5432/chrysopedia", ) REDIS_URL = os.getenv("REDIS_URL", "redis://chrysopedia-redis:6379/0") QDRANT_URL = os.getenv("QDRANT_URL", "http://chrysopedia-qdrant:6333") API_URL = os.getenv("API_URL", "http://chrysopedia-api:8000") PROMPTS_PATH = os.getenv("PROMPTS_PATH", "/prompts") COMPOSE_PROJECT = "xpltd_chrysopedia" mcp = FastMCP( "Chrysopedia", stateless_http=True, json_response=True, host="0.0.0.0", port=8101, ) # ── Helpers ─────────────────────────────────────────────────────────────────── def _db(): """Return a new psycopg2 connection (sync, short-lived).""" return psycopg2.connect(DB_URL, cursor_factory=psycopg2.extras.RealDictCursor) def _redis(): """Return a Redis client.""" return redis.from_url(REDIS_URL, decode_responses=True) def _docker(): """Return a Docker client from the mounted socket.""" return docker.from_env() def _query(sql: str, params: tuple = ()) -> list[dict]: """Execute a read query and return rows as dicts.""" with _db() as conn: with conn.cursor() as cur: cur.execute(sql, params) return [dict(r) for r in cur.fetchall()] def _query_one(sql: str, params: tuple = ()) -> dict | None: rows = _query(sql, params) return rows[0] if rows else None def _execute(sql: str, params: tuple = ()) -> int: """Execute a write query and return rowcount.""" with _db() as conn: with conn.cursor() as cur: cur.execute(sql, params) conn.commit() return cur.rowcount def _json_default(obj): """JSON serializer for types not handled by default.""" if isinstance(obj, datetime): return obj.isoformat() if hasattr(obj, "__str__"): return str(obj) raise TypeError(f"Object of type {type(obj)} is not JSON serializable") def _json(data) -> str: return json.dumps(data, default=_json_default, indent=2) # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 1: Pipeline Execution # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def pipeline_status() -> str: """Show currently processing, queued, and backlogged videos with counts.""" processing = _query( "SELECT id, filename, processing_status, updated_at " "FROM source_videos WHERE processing_status = 'processing' " "ORDER BY updated_at DESC" ) queued = _query( "SELECT id, filename, processing_status, updated_at " "FROM source_videos WHERE processing_status = 'queued' " "ORDER BY updated_at DESC" ) not_started = _query( "SELECT id, filename, updated_at " "FROM source_videos WHERE processing_status = 'not_started' " "ORDER BY updated_at DESC" ) errored = _query( "SELECT id, filename, updated_at " "FROM source_videos WHERE processing_status = 'error' " "ORDER BY updated_at DESC" ) # Check Celery queue r = _redis() celery_queue_len = r.llen("celery") return _json({ "processing": processing, "queued": queued, "not_started": not_started, "errored": errored, "celery_queue_depth": celery_queue_len, "summary": { "processing": len(processing), "queued": len(queued), "not_started": len(not_started), "errored": len(errored), "celery_pending": celery_queue_len, }, }) @mcp.tool() def trigger_pipeline(video_id: str = "", filename: str = "") -> str: """Trigger pipeline for a video by ID or filename substring. Provide one.""" if not video_id and not filename: return "Error: provide either video_id or filename" if not video_id: row = _query_one( "SELECT id, filename FROM source_videos " "WHERE filename ILIKE %s LIMIT 1", (f"%{filename}%",), ) if not row: return f"No video matching '{filename}'" video_id = str(row["id"]) filename = row["filename"] else: row = _query_one("SELECT filename FROM source_videos WHERE id = %s", (video_id,)) filename = row["filename"] if row else "unknown" import httpx resp = httpx.post( f"{API_URL}/api/v1/admin/pipeline/trigger/{video_id}", timeout=10, ) return _json({ "video_id": video_id, "filename": filename, "status_code": resp.status_code, "response": resp.json() if resp.status_code < 400 else resp.text, }) @mcp.tool() def trigger_batch( creator: str = "", status: str = "not_started", limit: int = 5, ) -> str: """Trigger pipeline for multiple videos. Filter by creator slug and/or status. Default: up to 5 not_started videos.""" sql = "SELECT sv.id, sv.filename FROM source_videos sv" conditions = [] params: list = [] if status: conditions.append("sv.processing_status = %s") params.append(status) if creator: sql += " JOIN creators c ON sv.creator_id = c.id" conditions.append("c.slug = %s") params.append(creator) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY sv.updated_at DESC LIMIT %s" params.append(limit) videos = _query(sql, tuple(params)) if not videos: return "No videos match the filter." import httpx results = [] for v in videos: try: resp = httpx.post( f"{API_URL}/api/v1/admin/pipeline/trigger/{v['id']}", timeout=10, ) results.append({ "video_id": str(v["id"]), "filename": v["filename"], "status": resp.status_code, }) except Exception as e: results.append({ "video_id": str(v["id"]), "filename": v["filename"], "error": str(e), }) return _json({"triggered": len(results), "results": results}) @mcp.tool() def revoke_pipeline(video_id: str) -> str: """Revoke/cancel a pipeline run for a video.""" import httpx resp = httpx.post( f"{API_URL}/api/v1/admin/pipeline/revoke/{video_id}", timeout=10, ) return _json({ "video_id": video_id, "status_code": resp.status_code, "response": resp.json() if resp.status_code < 400 else resp.text, }) @mcp.tool() def rerun_stage(video_id: str, stage: int) -> str: """Re-run a specific pipeline stage (2-6) for a video.""" if stage < 2 or stage > 6: return "Error: stage must be 2-6" import httpx stage_name = {2: "stage2_segmentation", 3: "stage3_extraction", 4: "stage4_classification", 5: "stage5_synthesis", 6: "stage6_embed_and_index"}.get(stage, f"stage{stage}") resp = httpx.post( f"{API_URL}/api/v1/admin/pipeline/rerun-stage/{video_id}/{stage_name}", timeout=10, ) return _json({ "video_id": video_id, "stage": stage, "status_code": resp.status_code, "response": resp.json() if resp.status_code < 400 else resp.text, }) @mcp.tool() def queue_depth() -> str: """Check Celery queue depth and active/reserved/scheduled tasks.""" r = _redis() queue_len = r.llen("celery") # Inspect via celery CLI inside the worker container client = _docker() try: worker = client.containers.get("chrysopedia-worker") active_result = worker.exec_run( "celery -A worker inspect active --json --timeout=5" ) active = active_result.output.decode() if active_result.exit_code == 0 else "error" reserved_result = worker.exec_run( "celery -A worker inspect reserved --json --timeout=5" ) reserved = reserved_result.output.decode() if reserved_result.exit_code == 0 else "error" except Exception as e: active = f"error: {e}" reserved = f"error: {e}" return _json({ "celery_queue_length": queue_len, "active_tasks": active, "reserved_tasks": reserved, }) # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 2: Infrastructure Health # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def worker_health() -> str: """Check Celery worker status: online, pool size, active task.""" client = _docker() try: worker = client.containers.get("chrysopedia-worker") result = worker.exec_run( "celery -A worker inspect ping --json --timeout=5" ) ping = result.output.decode() if result.exit_code == 0 else "no response" stats_result = worker.exec_run( "celery -A worker inspect stats --json --timeout=5" ) stats = stats_result.output.decode() if stats_result.exit_code == 0 else "error" except docker.errors.NotFound: return _json({"status": "container not found", "container": "chrysopedia-worker"}) except Exception as e: return _json({"status": "error", "error": str(e)}) return _json({"ping": ping, "stats": stats}) @mcp.tool() def container_status() -> str: """Show status of all Chrysopedia Docker containers.""" client = _docker() containers = client.containers.list(all=True, filters={"name": "chrysopedia"}) result = [] for c in containers: health = "unknown" if c.attrs.get("State", {}).get("Health"): health = c.attrs["State"]["Health"].get("Status", "unknown") result.append({ "name": c.name, "status": c.status, "health": health, "image": c.image.tags[0] if c.image.tags else "untagged", "started_at": c.attrs.get("State", {}).get("StartedAt", ""), }) return _json(sorted(result, key=lambda x: x["name"])) @mcp.tool() def service_health() -> str: """Combined health check: DB, Redis, Qdrant, Ollama, API, Worker.""" checks = {} # DB try: with _db() as conn: with conn.cursor() as cur: cur.execute("SELECT 1") checks["postgresql"] = "healthy" except Exception as e: checks["postgresql"] = f"error: {e}" # Redis try: r = _redis() r.ping() checks["redis"] = "healthy" except Exception as e: checks["redis"] = f"error: {e}" # Qdrant try: import httpx resp = httpx.get(f"{QDRANT_URL}/healthz", timeout=5) checks["qdrant"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}" except Exception as e: checks["qdrant"] = f"error: {e}" # API try: import httpx resp = httpx.get(f"{API_URL}/health", timeout=5) checks["api"] = "healthy" if resp.status_code == 200 else f"status {resp.status_code}" except Exception as e: checks["api"] = f"error: {e}" # Worker (via container health) try: client = _docker() worker = client.containers.get("chrysopedia-worker") health = worker.attrs.get("State", {}).get("Health", {}).get("Status", "unknown") checks["worker"] = health except Exception as e: checks["worker"] = f"error: {e}" all_healthy = all(v == "healthy" for v in checks.values()) return _json({"overall": "healthy" if all_healthy else "degraded", "services": checks}) @mcp.tool() def restart_service(service: str) -> str: """Restart a Chrysopedia Docker container by short name. Valid: api, worker, watcher, web, db, redis, qdrant, ollama""" name_map = { "api": "chrysopedia-api", "worker": "chrysopedia-worker", "watcher": "chrysopedia-watcher", "web": "chrysopedia-web-8096", "db": "chrysopedia-db", "redis": "chrysopedia-redis", "qdrant": "chrysopedia-qdrant", "ollama": "chrysopedia-ollama", } container_name = name_map.get(service) if not container_name: return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}" try: client = _docker() container = client.containers.get(container_name) container.restart(timeout=30) return _json({"restarted": container_name, "status": "ok"}) except Exception as e: return _json({"error": str(e), "container": container_name}) @mcp.tool() def tail_logs(service: str = "worker", lines: int = 50) -> str: """Get recent log lines from a Chrysopedia container. service: api, worker, watcher, web. Default: worker.""" name_map = { "api": "chrysopedia-api", "worker": "chrysopedia-worker", "watcher": "chrysopedia-watcher", "web": "chrysopedia-web-8096", } container_name = name_map.get(service) if not container_name: return f"Unknown service '{service}'. Valid: {', '.join(name_map.keys())}" try: client = _docker() container = client.containers.get(container_name) logs = container.logs(tail=lines, timestamps=True).decode("utf-8", errors="replace") # Truncate to 40KB to stay within MCP limits if len(logs) > 40000: logs = logs[-40000:] return logs except Exception as e: return f"Error: {e}" # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 3: Content & Data # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def video_stats() -> str: """Counts of technique pages, source videos, creators, with per-creator breakdown.""" totals = _query(""" SELECT (SELECT count(*) FROM technique_pages) as technique_pages, (SELECT count(*) FROM source_videos) as source_videos, (SELECT count(*) FROM creators) as creators, (SELECT count(*) FROM key_moments) as key_moments """) by_creator = _query(""" SELECT c.name, c.slug, count(DISTINCT tp.id) as pages, count(DISTINCT sv.id) as videos FROM creators c LEFT JOIN technique_pages tp ON tp.creator_id = c.id LEFT JOIN source_videos sv ON sv.creator_id = c.id GROUP BY c.id, c.name, c.slug ORDER BY pages DESC, videos DESC """) by_status = _query(""" SELECT processing_status::text as status, count(*) as count FROM source_videos GROUP BY processing_status ORDER BY count DESC """) return _json({ "totals": totals[0] if totals else {}, "by_creator": by_creator, "by_processing_status": by_status, }) @mcp.tool() def list_videos( status: str = "", creator: str = "", limit: int = 50, ) -> str: """List source videos, optionally filtered by processing status and/or creator slug.""" sql = """ SELECT sv.id, sv.filename, sv.processing_status::text as status, c.name as creator, c.slug as creator_slug, sv.updated_at FROM source_videos sv LEFT JOIN creators c ON sv.creator_id = c.id """ conditions = [] params: list = [] if status: conditions.append("sv.processing_status = %s") params.append(status) if creator: conditions.append("c.slug = %s") params.append(creator) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY sv.updated_at DESC LIMIT %s" params.append(limit) rows = _query(sql, tuple(params)) return _json({"count": len(rows), "videos": rows}) @mcp.tool() def list_technique_pages( creator: str = "", format: str = "", limit: int = 50, ) -> str: """List technique pages, optionally filtered by creator slug and/or format (v1/v2).""" sql = """ SELECT tp.id, tp.title, tp.slug, tp.topic_category, tp.body_sections_format, c.name as creator, c.slug as creator_slug, tp.updated_at FROM technique_pages tp LEFT JOIN creators c ON tp.creator_id = c.id """ conditions = [] params: list = [] if creator: conditions.append("c.slug = %s") params.append(creator) if format: conditions.append("tp.body_sections_format = %s") params.append(format) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY tp.updated_at DESC LIMIT %s" params.append(limit) rows = _query(sql, tuple(params)) return _json({"count": len(rows), "pages": rows}) @mcp.tool() def list_creators() -> str: """List all creators with video and technique page counts.""" rows = _query(""" SELECT c.id, c.name, c.slug, c.genre_tags, count(DISTINCT sv.id) as video_count, count(DISTINCT tp.id) as page_count FROM creators c LEFT JOIN source_videos sv ON sv.creator_id = c.id LEFT JOIN technique_pages tp ON tp.creator_id = c.id GROUP BY c.id, c.name, c.slug, c.genre_tags ORDER BY page_count DESC, video_count DESC """) return _json({"count": len(rows), "creators": rows}) @mcp.tool() def search_content(query: str, scope: str = "all", limit: int = 10) -> str: """Run a search query against the Chrysopedia API. scope: all, topics, creators.""" import httpx resp = httpx.get( f"{API_URL}/api/v1/search", params={"q": query, "scope": scope, "limit": limit}, timeout=15, ) if resp.status_code != 200: return f"Search failed: {resp.status_code} {resp.text}" return _json(resp.json()) # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 4: Pipeline Observability # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def recent_events(limit: int = 20, video_id: str = "", stage: str = "") -> str: """Last N pipeline events. Optionally filter by video_id or stage name.""" sql = """ SELECT pe.id, pe.stage, pe.event_type, pe.model, pe.prompt_tokens, pe.completion_tokens, pe.duration_ms, pe.created_at, sv.filename FROM pipeline_events pe LEFT JOIN source_videos sv ON pe.video_id = sv.id """ conditions = [] params: list = [] if video_id: conditions.append("pe.video_id = %s") params.append(video_id) if stage: conditions.append("pe.stage = %s") params.append(stage) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY pe.created_at DESC LIMIT %s" params.append(limit) rows = _query(sql, tuple(params)) return _json({"count": len(rows), "events": rows}) @mcp.tool() def token_usage(video_id: str = "", stage: str = "") -> str: """Token consumption summary. Optionally filter by video_id or stage.""" sql = """ SELECT pe.stage, count(*) as calls, coalesce(sum(pe.prompt_tokens), 0) as total_prompt_tokens, coalesce(sum(pe.completion_tokens), 0) as total_completion_tokens, coalesce(sum(pe.prompt_tokens), 0) + coalesce(sum(pe.completion_tokens), 0) as total_tokens FROM pipeline_events pe WHERE pe.event_type = 'llm_call' """ params: list = [] if video_id: sql += " AND pe.video_id = %s" params.append(video_id) if stage: sql += " AND pe.stage = %s" params.append(stage) sql += " GROUP BY pe.stage ORDER BY total_tokens DESC" rows = _query(sql, tuple(params)) grand_total = sum(r.get("total_tokens", 0) for r in rows) return _json({"by_stage": rows, "grand_total_tokens": grand_total}) @mcp.tool() def processing_times(limit: int = 10) -> str: """Processing duration for recent pipeline runs (start to last event).""" rows = _query(""" SELECT pe.video_id, sv.filename, c.name as creator, min(pe.created_at) as started_at, max(pe.created_at) as finished_at, EXTRACT(EPOCH FROM (max(pe.created_at) - min(pe.created_at)))::int as duration_seconds, count(pe.id) as event_count FROM pipeline_events pe JOIN source_videos sv ON pe.video_id = sv.id LEFT JOIN creators c ON sv.creator_id = c.id GROUP BY pe.video_id, sv.filename, c.name ORDER BY max(pe.created_at) DESC LIMIT %s """, (limit,)) return _json({"count": len(rows), "runs": rows}) @mcp.tool() def debug_mode_toggle(enable: bool | None = None) -> str: """Get or set pipeline debug mode. Pass enable=true/false to change, omit to check.""" r = _redis() key = "chrysopedia:debug_mode" if enable is None: current = r.get(key) return _json({"debug_mode": current == "1" if current else False}) r.set(key, "1" if enable else "0") return _json({"debug_mode": enable, "set": True}) # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 5: Qdrant / Embeddings # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def qdrant_stats() -> str: """Qdrant collection point counts and index health.""" import httpx try: resp = httpx.get(f"{QDRANT_URL}/collections", timeout=10) collections_data = resp.json() result = [] for col in collections_data.get("result", {}).get("collections", []): name = col["name"] detail_resp = httpx.get(f"{QDRANT_URL}/collections/{name}", timeout=10) detail = detail_resp.json().get("result", {}) result.append({ "name": name, "points_count": detail.get("points_count", 0), "vectors_count": detail.get("vectors_count", 0), "status": detail.get("status", "unknown"), "segments_count": detail.get("segments_count", 0), }) return _json({"collections": result}) except Exception as e: return _json({"error": str(e)}) @mcp.tool() def reindex_all() -> str: """Trigger full re-embedding of all technique pages and key moments via the API.""" import httpx resp = httpx.post(f"{API_URL}/api/v1/admin/pipeline/reindex-all", timeout=30) return _json({ "status_code": resp.status_code, "response": resp.json() if resp.status_code < 400 else resp.text, }) @mcp.tool() def reindex_video(video_id: str) -> str: """Re-embed content for a specific video (re-runs stage 6).""" return rerun_stage(video_id=video_id, stage=6) # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN 6: Prompt Management # ═══════════════════════════════════════════════════════════════════════════════ @mcp.tool() def list_prompts() -> str: """List current prompt files with sizes and SHA-256 hashes.""" prompts_dir = PROMPTS_PATH if not os.path.isdir(prompts_dir): return _json({"error": f"Prompts directory not found: {prompts_dir}"}) result = [] for fname in sorted(os.listdir(prompts_dir)): fpath = os.path.join(prompts_dir, fname) if not os.path.isfile(fpath): continue with open(fpath, "rb") as f: content = f.read() result.append({ "filename": fname, "size_bytes": len(content), "sha256": hashlib.sha256(content).hexdigest()[:16], "lines": content.count(b"\n"), }) return _json({"count": len(result), "prompts": result}) @mcp.tool() def prompt_diff(filename: str) -> str: """Compare a current prompt file to its most recent .bak version. Returns a summary of changes (not full diff to save tokens).""" prompts_dir = PROMPTS_PATH current_path = os.path.join(prompts_dir, filename) if not os.path.isfile(current_path): return f"Prompt file not found: {filename}" # Find most recent .bak bak_files = sorted( [f for f in os.listdir(prompts_dir) if f.startswith(filename) and f.endswith(".bak")], reverse=True, ) if not bak_files: return _json({"filename": filename, "backup_found": False, "message": "No .bak file found"}) bak_path = os.path.join(prompts_dir, bak_files[0]) with open(current_path) as f: current = f.readlines() with open(bak_path) as f: backup = f.readlines() import difflib diff = list(difflib.unified_diff(backup, current, fromfile=bak_files[0], tofile=filename, n=1)) added = sum(1 for l in diff if l.startswith("+") and not l.startswith("+++")) removed = sum(1 for l in diff if l.startswith("-") and not l.startswith("---")) # Truncate diff output diff_text = "".join(diff[:200]) if len(diff) > 200: diff_text += f"\n... ({len(diff) - 200} more lines)" return _json({ "filename": filename, "backup": bak_files[0], "current_lines": len(current), "backup_lines": len(backup), "lines_added": added, "lines_removed": removed, "diff": diff_text, }) # ═══════════════════════════════════════════════════════════════════════════════ # Entry point # ═══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": mcp.run(transport="streamable-http")